生产级多维聚合:滚动计算与业务可解释性实战
1. 项目概述:为什么多维聚合不是“加个groupby”就能搞定的事
我在银行数据平台组干了八年,从最早用SQL写几十行嵌套子查询做客户分层,到后来带团队搭实时风险计算引擎,踩过的坑比写的代码还多。今天聊的这个主题——“多维聚合中的数据操作”,听起来像教科书里的一个章节标题,但实际在生产环境里,它直接决定着风控模型能不能当天上线、月度经营分析报告能不能准时发出、甚至监管报送数据有没有逻辑硬伤。我见过太多人把df.groupby().agg()当成万能胶水,结果在测试环境跑通,一上生产就报内存溢出;也见过分析师花三天调通一个滚动均值,却因为没处理好索引对齐,导致下游BI图表全错位。这不是技术问题,是认知偏差。
核心关键词就三个:多维聚合、滚动计算、业务可解释性。它们不是并列关系,而是递进链条——没有扎实的多维分组基础,滚动窗口就是空中楼阁;没有业务逻辑嵌入能力,再漂亮的聚合结果也只是数字游戏。比如你给风控同事看“某商户类别的交易金额标准差”,他只会点头;但如果你能输出“该类别近30天内单日交易额波动率超过阈值的天数占比”,他马上会追问:“阈值怎么定的?是不是要和历史同期比?”——这就是业务可解释性的分水岭。
这篇文章不讲pandas语法手册,也不堆砌API参数。它是我过去三年在三家金融机构落地的真实战法总结:怎么把“按地区+产品线+客户等级”三层分组的结果,变成销售总监一眼能看懂的矩阵表格;怎么让滚动均值在节假日自动跳过缺失日而不崩;怎么用自定义函数把“高价值交易识别”这种模糊需求,翻译成可审计、可复现、可嵌入ETL流水线的代码。所有案例都来自真实脱敏数据,代码可直接粘贴运行,参数值背后都有业务依据。如果你正在为报表口径不一致发愁,或者被“老板说再加一列指标”的需求追着跑,这篇就是为你写的。
2. 多维聚合的本质:从SQL思维到DataFrame思维的范式转换
2.1 为什么传统SQL分组在Pandas里会“水土不服”
先说个血泪教训:去年我们给某城商行做信用卡反欺诈模块,原始需求是“统计每个客户在餐饮、零售、旅游三类商户的月度交易笔数、金额均值、最大单笔”。开发同学直接照搬SQL写法:
SELECT customer_id, merchant_category, COUNT(*) as tx_count, AVG(amount) as avg_amount, MAX(amount) as max_amount FROM transactions WHERE date >= '2024-01-01' GROUP BY customer_id, merchant_category;转成pandas就是:
df.groupby(['customer_id', 'merchant_category']).agg({ 'amount': ['count', 'mean', 'max'] })结果呢?输出是个MultiIndex DataFrame,列名是三级嵌套:(amount, count)、(amount, mean)……下游Python服务调用时,字段名得写成result[('amount', 'count')],而BI工具根本解析不了这种结构。更致命的是,当需要补全“某客户在某类别无交易”的空行时,SQL用LEFT JOIN加维度表就行,pandas里得手动reindex再fillna(0),稍不注意就漏掉关键客户。
根本原因在于:SQL的GROUP BY本质是关系代数运算,输出是扁平化的关系表;而pandas的groupby是对象化操作,输出是带层级索引的结构体。强行套用SQL思维,就像用螺丝刀拧钉子——能拧动,但效率低、易打滑、还伤工具。
2.2 生产级多维聚合的四大黄金法则
基于上百次线上事故复盘,我提炼出四条必须刻进DNA的法则:
法则一:永远先明确“主键维度”和“度量维度”
- 主键维度(如
customer_id,region,product_line)决定分组粒度,必须是离散型、非空、有业务含义的字段 - 度量维度(如
transaction_amount,fee_rate)是数值型计算对象,允许空值但需明确定义缺失值处理策略
提示:在金融场景中,“主键维度”常含时间维度(如
reporting_month),但绝不能用date字段直接分组——那会产生上万行结果,必须先归约到月/季/年
法则二:聚合函数选择必须匹配业务语义
sum()适合累计类指标(如总交易额),但对“平均费率”必须用weighted_average而非mean()median()抗异常值,但计算成本比mean()高3倍,在亿级数据上要预估资源消耗nunique()统计去重数时,pandas默认用哈希表,内存占用是count()的5倍以上
法则三:层级索引必须主动管理,绝不依赖默认行为
groupby().agg()后立即执行reset_index()或unstack(),避免后续操作因索引错乱崩溃- 对MultiIndex结果,用
df.columns = df.columns.map('_'.join)快速扁平化列名,比手动重命名快10倍
法则四:空值处理是业务决策,不是技术选项
- 在风控场景中,“某客户某月无交易”应填充
0(表示无风险暴露) - 在客户价值分析中,“某客户某类产品未购买”应保留
NaN(表示数据缺失,不可推断为零消费)
注意:
fillna(0)和fillna(method='ffill')的业务含义天壤之别,代码注释必须写清依据的监管文件条款号
2.3 实战:银行客户多维盈利分析的完整链路
我们以某股份制银行的真实需求为例:
“计算2024年Q1各分行下,VIP客户在理财、存款、贷款三类业务的综合贡献度,要求包含:
- 各业务线收入总额(理财手续费+存款利息差+贷款利息收入)
- 客户活跃度(当季交易次数/客户总数)
- 风险调整后收益(收入总额减去预期损失)”
原始数据结构(简化版):
# customer_profit_df: 120万行,含字段 # - branch_id (分行编码) # - customer_tier ('VIP','Gold','Silver') # - business_type ('Wealth','Deposit','Loan') # - income_amt (收入金额) # - transaction_cnt (交易次数) # - expected_loss (预期损失)错误做法(新手常犯):
# ❌ 错误:一次agg包打天下,列名混乱且无法处理空值 result = customer_profit_df.groupby(['branch_id','customer_tier','business_type']).agg({ 'income_amt': 'sum', 'transaction_cnt': 'sum', 'expected_loss': 'sum' })正确生产级写法:
# ✅ 步骤1:预处理——按业务线拆分计算逻辑 def calc_business_income(row): """根据业务类型计算收入,含业务规则""" if row['business_type'] == 'Wealth': return row['income_amt'] * 0.85 # 理财手续费净收入 elif row['business_type'] == 'Deposit': return row['income_amt'] * 0.92 # 存款利差净收入 else: return row['income_amt'] * 0.78 # 贷款利息净收入 customer_profit_df['net_income'] = customer_profit_df.apply(calc_business_income, axis=1) # ✅ 步骤2:主分组——用agg字典精准控制每列计算方式 agg_dict = { 'net_income': 'sum', # 收入总额 'transaction_cnt': 'sum', # 交易次数 'expected_loss': 'sum' # 预期损失 } grouped = customer_profit_df.groupby(['branch_id','customer_tier','business_type']) # ✅ 步骤3:聚合+索引管理——强制扁平化列名 result = grouped.agg(agg_dict).reset_index() result.columns = ['branch_id','customer_tier','business_type', 'total_income','total_tx','total_loss'] # ✅ 步骤4:衍生指标——在DataFrame层面计算,避免groupby嵌套 result['active_ratio'] = result['total_tx'] / result.groupby(['branch_id','customer_tier'])['total_tx'].transform('sum') result['risk_adj_income'] = result['total_income'] - result['total_loss'] # ✅ 步骤5:空值补全——按业务规则填充 # 补全VIP客户在某分行某业务线无数据的记录(监管要求必须显示0) all_combos = pd.MultiIndex.from_product( [result['branch_id'].unique(), ['VIP'], ['Wealth','Deposit','Loan']], names=['branch_id','customer_tier','business_type'] ) result_full = result.set_index(['branch_id','customer_tier','business_type']).reindex(all_combos, fill_value=0).reset_index()这段代码的关键不在技巧,而在每一步都对应明确的业务动作:calc_business_income封装监管定价规则,transform('sum')实现分组内占比计算,reindex()确保监管报送完整性。这才是生产级代码该有的样子。
3. 自定义聚合函数:把业务规则编译进数据管道
3.1 Lambda函数的陷阱与救赎
很多教程教大家用lambda写自定义聚合:
df.groupby('category').agg({'amount': lambda x: x.max() - x.min()})这在Jupyter里跑得飞快,但放到Airflow调度的每日任务中,会出两个致命问题:
问题一:调试地狱
当某天数据异常导致x.max()-x.min()返回NaN,你只能看到<lambda>报错,完全不知道是哪个category、哪天的数据触发的。而命名函数会显示transaction_range(),配合日志能秒定位。
问题二:序列化失败
在Spark或Dask分布式环境下,lambda函数无法被序列化传输到worker节点,直接报PicklingError。命名函数则天然支持跨进程调用。
所以我的铁律是:所有生产环境的自定义聚合,必须用def定义的命名函数,且函数名要体现业务意图。
3.2 金融场景下的三类高频自定义函数模板
模板一:风险阈值型(如交易波动率)
def transaction_volatility(series, window_days=30, threshold=0.3): """ 计算交易金额波动率:标准差/均值,用于识别高风险商户 业务规则:仅当样本数>=window_days*0.8时才计算,否则返回NaN """ if len(series) < int(window_days * 0.8): return np.nan mean_val = series.mean() if mean_val == 0: return 0.0 # 避免除零 std_val = series.std() volatility = std_val / mean_val if mean_val != 0 else 0 # 业务标记:超阈值返回True,否则False(便于后续筛选) return volatility > threshold # 使用示例 result = df.groupby('merchant_id')['amount'].agg( volatility_flag=lambda x: transaction_volatility(x, window_days=30) )模板二:权重计算型(如客户价值评分)
def weighted_customer_value(series, recency_weight=0.4, frequency_weight=0.3, monetary_weight=0.3): """ RFM模型加权计算:最近交易距今天数倒数 * 权重 + 交易频次 * 权重 + 交易金额均值 * 权重 业务规则:最近交易时间取分组内最大date值,需提前merge日期字段 """ # 假设series已包含'date'和'amount'两列(需在groupby前预处理) recency_days = (pd.Timestamp.now() - series['date'].max()).days recency_score = 1 / (recency_days + 1) # +1防除零 frequency_score = len(series) monetary_score = series['amount'].mean() return ( recency_score * recency_weight + frequency_score * frequency_weight + monetary_score * monetary_weight ) # 使用前需准备数据 df_with_date = df.merge(date_lookup, on='tx_id') # 关联交易日期 result = df_with_date.groupby('customer_id').apply( lambda x: weighted_customer_value(x) )模板三:条件分箱型(如反欺诈规则引擎)
def fraud_risk_segment(series, high_value_th=5000, freq_th=3, recent_days=7): """ 多条件风险分箱:同时满足高金额、高频次、近期发生则标为'High' 业务规则:必须同时满足三个条件,缺一不可(监管要求强逻辑) """ # 提取分组内数据 recent_tx = series[series['date'] >= (pd.Timestamp.now() - pd.Timedelta(days=recent_days))] high_value_cnt = (recent_tx['amount'] > high_value_th).sum() freq_cnt = len(recent_tx) if high_value_cnt >= 2 and freq_cnt >= freq_th: return 'High' elif high_value_cnt >= 1 or freq_cnt >= freq_th * 2: return 'Medium' else: return 'Low' # 使用示例(需先groupby再apply) risk_result = df.groupby('customer_id').apply( lambda x: fraud_risk_segment(x) ).reset_index(name='fraud_risk_level')3.3 自定义函数的性能优化实战
当数据量超千万行时,自定义函数会成为瓶颈。我用三个技巧压测优化:
技巧一:向量化替代循环
错误写法(慢10倍):
def slow_calc(series): result = [] for val in series: if val > 100: result.append(val * 1.1) else: result.append(val * 0.9) return np.mean(result)正确写法(用numpy向量化):
def fast_calc(series): # 用布尔索引一次性处理 mask = series > 100 adjusted = series.copy() adjusted[mask] *= 1.1 adjusted[~mask] *= 0.9 return adjusted.mean()技巧二:预过滤减少计算量
在groupby前先筛掉无效数据:
# 原始数据含大量测试账户(customer_id以'TEST_'开头) valid_df = df[~df['customer_id'].str.startswith('TEST_')] # 再进行耗时的自定义聚合 result = valid_df.groupby('branch_id').apply(complex_risk_func)技巧三:缓存中间结果
对重复计算的业务指标用functools.lru_cache:
from functools import lru_cache @lru_cache(maxsize=128) def get_regulatory_factor(year, region_code): """缓存监管系数,避免每次groupby都查数据库""" return regulatory_db.query(f"SELECT factor FROM rules WHERE year={year} AND region='{region_code}'") def apply_regulatory_adjustment(series, year=2024): region = series.name # 从groupby的name获取region_code factor = get_regulatory_factor(year, region) return series.sum() * factor4. 时间窗口计算:滚动与扩展窗口的业务语义辨析
4.1 滚动窗口(Rolling)——为什么window=7不等于“过去7天”
这是最常被误解的概念。看这个典型错误:
# ❌ 错误:认为rolling(window=7)自动按日期对齐 df.set_index('date').groupby('customer_id')['amount'].rolling(window=7).mean()问题在于:window=7指的是最近7个观测值,不是“最近7天”。如果某客户在3月1日-3月5日有5笔交易,3月10日有1笔,3月15日有1笔,那么3月15日的滚动均值是(3月1-5日5笔+3月10日1笔+3月15日1笔)/7,但其中3月6-9日、3月11-14日的数据是缺失的,却被当作0参与计算——这完全违背业务逻辑。
正确解法:用rolling('7D')指定时间窗口
# ✅ 正确:按日历天数滚动,自动跳过无数据日期 df_sorted = df.sort_values(['customer_id','date']).set_index('date') result = df_sorted.groupby('customer_id')['amount'].rolling('7D').mean()但要注意:rolling('7D')要求索引是DatetimeIndex,且数据必须按时间排序。我在线上环境踩过的坑是:上游ETL偶尔漏传某天数据,导致rolling('7D')计算时窗口不足7天,返回NaN。解决方案是显式指定min_periods参数:
# ✅ 强制至少3天数据才计算,避免全NaN result = df_sorted.groupby('customer_id')['amount'].rolling('7D', min_periods=3).mean()4.2 扩展窗口(Expanding)——累积计算的业务边界
扩展窗口看似简单,但金融场景有特殊要求。比如计算“客户年度累计交易额”,不能简单用expanding().sum(),因为:
- 年初新客户没有历史数据,
expanding()会从首笔交易开始累加,但监管要求“年度”必须从1月1日起算 - 跨年数据需重置,不能让2023年的交易累加到2024年
正确做法是先按年分组,再在组内做扩展计算:
# ✅ 正确:按自然年分组后扩展 df['year'] = df['date'].dt.year df_sorted = df.sort_values(['customer_id','year','date']) result = df_sorted.groupby(['customer_id','year'])['amount'].expanding().sum().reset_index() result.columns = ['customer_id','year','date','cumulative_annual_amount']更进一步,对于“YTD(年初至今)”指标,需动态计算截止到当前日期的累计值:
def ytd_cumulative(group): """计算分组内按日期顺序的YTD累计值""" group = group.sort_values('date') group['ytd_cumsum'] = group['amount'].cumsum() return group # 按客户+年份分组后应用 ytd_result = df.groupby(['customer_id', df['date'].dt.year]).apply(ytd_cumulative)4.3 滚动与扩展的组合技:移动年化收益率
银行理财产品常用“近12个月年化收益率”指标,即:(滚动12个月收益总和 / 本金) * (365 / 实际天数)
这需要滚动窗口与扩展窗口的混合使用:
def rolling_annualized_return(df_group, principal_col='principal', amount_col='amount'): """ 计算滚动年化收益率:需先计算滚动12个月收益,再按实际天数年化 """ # 步骤1:按日期排序 df_sorted = df_group.sort_values('date') # 步骤2:计算滚动12个月收益(注意:用'365D'而非'12M',避免闰年误差) df_sorted['rolling_12m_gain'] = df_sorted[amount_col].rolling('365D').sum() # 步骤3:计算滚动窗口内的实际天数 df_sorted['window_days'] = df_sorted['date'] - df_sorted['date'].rolling('365D').min() df_sorted['window_days'] = df_sorted['window_days'].dt.days # 步骤4:年化计算(防除零) df_sorted['annualized_return'] = np.where( df_sorted['window_days'] > 0, (df_sorted['rolling_12m_gain'] / df_sorted[principal_col]) * (365 / df_sorted['window_days']), 0.0 ) return df_sorted[['date', 'annualized_return']] # 应用 yields = df.groupby('product_id').apply(rolling_annualized_return)这个函数的关键在于:rolling('365D')保证时间窗口精确,dt.days提取实际天数,np.where处理边界情况。上线前必须用2020年(闰年)和2021年数据双验证。
5. 多级分组与透视:从数据表到决策仪表盘的最后一步
5.1 unstack的底层逻辑:为什么它比pivot_table更可控
很多人用pivot_table做交叉表,但生产环境我坚持用unstack(),原因有三:
第一,索引控制更精准pivot_table会自动去重并排序,而unstack()完全尊重原始groupby的索引顺序。在监管报送中,分行列表必须按银保监会编码升序排列,pivot_table可能打乱顺序,unstack()则不会。
第二,缺失值处理更灵活pivot_table的fill_value参数只能填一个值,而unstack()后可用fillna()做条件填充:
# ✅ unstack后按业务规则填充 result = df.groupby(['branch','product'])['revenue'].sum().unstack(fill_value=0) # 对VIP客户分行,用行业均值填充 vip_branches = ['BJ001','SH002','GZ003'] result.loc[vip_branches] = result.loc[vip_branches].fillna(result.mean())第三,性能更高
在千万级数据测试中,unstack()比pivot_table()快40%,因为pivot_table()内部做了额外的排序和去重。
5.2 三维透视表的构建:地区×产品×时间的矩阵魔法
真实业务常需三维分析,比如“各分行在理财、存款、贷款三类产品上,2024年Q1-Q4的季度收入”。unstack()原生只支持二维,但可以用两次unstack()实现:
# 原始数据:含branch_id, product_type, quarter, revenue # 步骤1:先groupby三层索引 multi_idx = df.groupby(['branch_id','product_type','quarter'])['revenue'].sum() # 步骤2:第一次unstack——把quarter转为列 quarter_pivot = multi_idx.unstack('quarter', fill_value=0) # 步骤3:第二次unstack——把product_type转为列(此时quarter已是列) final_pivot = quarter_pivot.unstack('product_type', fill_value=0) # 此时列名为MultiIndex:(Q1, 'Wealth'), (Q1, 'Deposit')... # 扁平化列名便于BI工具读取 final_pivot.columns = [f"{q}_{p}" for q, p in final_pivot.columns]但这样生成的列名太长,更好的做法是用swaplevel()调整顺序:
# 先unstack product_type,再unstack quarter multi_idx = df.groupby(['branch_id','quarter','product_type'])['revenue'].sum() pivoted = multi_idx.unstack('product_type').unstack('quarter') # swaplevel把quarter提到外层 pivoted = pivoted.swaplevel(axis=1).sort_index(axis=1) # 按quarter排序 pivoted.columns = pivoted.columns.map('{0[1]}_{0[0]}'.format) # Q1_Wealth格式5.3 生产环境避坑指南:透视表的五大雷区
雷区一:索引重复导致unstack失败
当groupby后存在重复索引(如两个同名分行),unstack()会报ValueError: Index contains duplicate entries。解决方法:
# ✅ 强制去重并告警 if df.groupby(['branch_id','product_type']).size().duplicated().any(): print("警告:发现重复分支-产品组合,请检查数据源") df = df.drop_duplicates(['branch_id','product_type','quarter'])雷区二:内存爆炸
对百万级唯一组合做unstack(),内存占用呈平方级增长。监控命令:
# ✅ 估算内存:组合数 × 列数 × 8字节(float64) n_combos = df.groupby(['branch_id','product_type']).ngroups n_quarters = df['quarter'].nunique() estimated_mb = (n_combos * n_quarters * 8) / (1024**2) if estimated_mb > 2000: # 超2GB报警 raise MemoryError(f"透视表预估内存{estimated_mb:.1f}MB,超出阈值")雷区三:中文列名乱码unstack()后中文列名可能变b'\xe4\xb8\xad\xe6\x96\x87'。解决:
# ✅ 强制UTF-8编码 result.columns = result.columns.map(lambda x: str(x).encode('utf-8').decode('utf-8'))雷区四:时间维度排序错乱unstack('quarter')后Q4可能排在Q1前面。解决:
# ✅ 自定义排序 quarter_order = ['Q1','Q2','Q3','Q4'] result = result.reindex(columns=quarter_order, level=1) # level=1指quarter所在层级雷区五:下游系统兼容性
某些BI工具不支持MultiIndex列。终极解决方案:
# ✅ 生成标准CSV兼容格式 def to_csv_friendly_pivot(df, index_cols, columns_col, values_col, fill_value=0): """生成扁平化列名的透视表,兼容所有BI工具""" pivot = df.groupby(index_cols + [columns_col])[values_col].sum().unstack(columns_col, fill_value=fill_value) pivot.columns = [f"{values_col}_{col}" for col in pivot.columns] return pivot.reset_index() # 使用 final_report = to_csv_friendly_pivot( df, index_cols=['branch_id'], columns_col='product_type', values_col='revenue' )6. 端到端实战:信用卡客户价值深度分析流水线
6.1 业务需求拆解:七个分析目标的优先级排序
我们以某全国性银行信用卡中心的真实项目为例。需求方(零售银行部)提出七项分析,但资源有限,必须分阶段交付。我的排序逻辑是:按监管合规性 > 业务影响面 > 技术可行性。
| 分析目标 | 业务价值 | 监管要求 | 技术难度 | 交付优先级 |
|---|---|---|---|---|
| 1. 客户分层基础指标(总交易额、笔数、均值) | 高(支撑所有后续分析) | 强制(银保监会1104报表) | 低 | ★★★★★ |
| 2. 交易波动率(标准差/均值) | 中(识别异常交易) | 推荐(反洗钱指引) | 中 | ★★★★☆ |
| 3. 滚动30天均值 | 高(实时风控) | 强制(实时监测阈值) | 中高 | ★★★★ |
| 4. YTD累计消费 | 高(客户经理考核) | 强制(KPI报表) | 中 | ★★★☆ |
| 5. 地区×产品交叉表 | 中(营销活动) | 无 | 低 | ★★☆ |
| 6. 高价值交易识别(>5000元) | 高(反欺诈) | 强制(大额交易报备) | 低 | ★★★★ |
| 7. 客户生命周期价值预测 | 低(长期战略) | 无 | 高 | ★ |
最终确定MVP版本包含1、2、3、6四项,两周内上线。下面展示完整代码(已脱敏)。
6.2 数据准备与质量校验
import pandas as pd import numpy as np from datetime import datetime, timedelta # 加载原始数据(模拟) np.random.seed(42) dates = pd.date_range('2024-01-01', '2024-03-31', freq='D') customers = [f'C{str(i).zfill(3)}' for i in range(1, 5001)] products = ['Groceries','Dining','Travel','Retail','Utilities'] regions = ['North','South','East','West'] # 生成100万行交易数据 n_rows = 1000000 data = { 'tx_id': [f'TX{str(i).zfill(6)}' for i in range(n_rows)], 'customer_id': np.random.choice(customers, n_rows), 'region': np.random.choice(regions, n_rows), 'product_type': np.random.choice(products, n_rows), 'date': np.random.choice(dates, n_rows), 'amount': np.round(np.random.lognormal(8, 0.5, n_rows), 2), # 对数正态分布模拟交易额 'fee': np.round(np.random.uniform(0.5, 5.0, n_rows), 2) } df = pd.DataFrame(data) # ✅ 关键质量校验(生产环境必做) def data_quality_check(df): checks = { 'date_range': f"{df['date'].min()} to {df['date'].max()}", 'nulls_in_key_cols': df[['customer_id','date','amount']].isnull().sum().to_dict(), 'amount_outliers': ((df['amount'] < 1) | (df['amount'] > 100000)).sum(), 'duplicate_tx_id': df['tx_id'].duplicated().sum() } # 业务规则校验:交易额不能为负 if (df['amount'] < 0).any(): raise ValueError("发现负交易额,请检查数据源") # 时间范围校验:必须覆盖Q1完整三个月 if df['date'].min() > '2024-01-01' or df['date'].max() < '2024-03-31': print("警告:数据时间范围不完整,可能影响YTD计算") return checks print("数据质量校验结果:", data_quality_check(df))6.3 核心分析模块封装
class CreditCardAnalyzer: def __init__(self, df): self.df = df.copy() self.df['date'] = pd.to_datetime(self.df['date']) self.df['quarter'] = self.df['date'].dt.to_period('Q') def basic_metrics(self): """基础指标:总交易额、笔数、均值、中位数""" return self.df.groupby('customer_id').agg({ 'amount': ['sum', 'count', 'mean', 'median'], 'fee': ['sum'] }).round(2) def volatility_metrics(self): """波动率指标:标准差/均值,按客户计算""" def cv_ratio(series): if len(series) < 5: # 样本太少不计算 return np.nan return series.std() / series.mean() if series.mean() != 0 else 0 return self.df.groupby('customer_id')['amount'].agg(volatility=cv_ratio).round(4) def rolling_30d_avg(self): """滚动30天均值:按客户+日期计算""" # 按客户排序,确保rolling正确 sorted_df = self.df.sort_values(['customer_id','date']) sorted_df = sorted_df.set_index('date') # 滚动计算(min_periods=10保证基本可用性) rolling = sorted_df.groupby('customer_id')['amount'].rolling('30D', min_periods=10).mean() return rolling.reset_index(name='rolling_30d_avg') def high_value_transactions(self, threshold=5000): """高价值交易识别:单笔>5000元""" hv_df = self.df[self.df['amount'] > threshold].copy() return hv_df.groupby('customer_id').agg({ 'amount': ['count', 'sum'], 'date': ['min', 'max'] }).round(2) def generate_report(self): """生成最终报告""" print("开始生成Q1客户价值分析报告...") # 步骤1:基础指标 basic = self.basic_metrics() basic.columns = ['total_spend','tx_count','avg_spend','median_spend','total_fee'] # 步骤2:合并波动率 vol = self.volatility_metrics() report = basic.join(vol, on='customer_id') # 步骤3:添加高价值交易统计 hv_stats = self.high_value_transactions() hv_stats.columns = ['hv_tx_count','hv_total_spend','first_hv_date','last_hv_date'] report = report.join(hv_stats, on='customer_id') # 步骤4:计算衍生指标 report['fee_rate'] = (report['total_fee'] / report['total_spend'] * 100).round(2) report['hv_ratio'] = (report['hv_tx_count'] / report['tx_count'] * 100).round(2) # 步骤5:客户分层(按总交易额) report['tier'] = pd.qcut(report['total_spend'], q=[0,0.7,0.9,1.0], labels=['Standard','Premium','VIP'], duplicates='drop') print(f"报告生成完成:共{len(report)}名客户,VIP客户{report['tier'].value_counts()['VIP']}名") return report # 执行分析 analyzer = CreditCardAnalyzer(df) final_report = analyzer.generate_report() print("\nVIP客户Top 10:") print(final_report[final_report['tier']=='VIP'].sort_values('total_spend', ascending=False).head(10))