看过来
《pandas 教程》 持续更新中,提供建议、纠错、催更等加作者微信: gr99123(备注:pandas教程)和关注公众号「盖若」ID: gairuo。跟作者学习,请进入 Python学习课程。欢迎关注作者出版的书籍:《深入浅出Pandas》 和 《Python之光》。
在金融风控、支付监控或企业内审场景中,识别可疑交易是保障资金安全的第一道防线。这类任务通常依赖业务规则而非复杂模型——例如“大额转账 + 非工作时间 + 新收款人”组合即视为高风险。手动筛查效率低下,而 pandas 凭借其强大的向量化布尔运算与条件组合能力,可轻松构建一个轻量级、高性能的规则引擎。本文将通过一个模拟银行交易日志,演示如何结合 np.select、滚动窗口统计与时间特征提取,在单条链式代码中实现多维度异常规则的并行判断与标记。你将掌握一种可快速部署、易于调整的实时异常检测模式。
假设我们有一张交易流水表,包含以下字段:
我们的目标是标记满足任一高风险规则的交易为异常(is_anomaly = True):
要求:
is_anomaly 布尔列。源数据如下:
data = """
tx_id,user_id,amount,timestamp,recipient
T001,U1,60000,2025-03-10 14:30:00,R1
T002,U1,20000,2025-03-10 14:35:00,R2
T003,U1,55000,2025-03-10 23:15:00,R3
T004,U2,40000,2025-03-11 02:20:00,R4
T005,U1,30000,2025-03-11 09:10:00,R1
T006,U1,50000,2025-03-11 09:12:00,R5
"""
读取代码:
import pandas as pd
from io import StringIO
df = pd.read_csv(StringIO(data.strip()))
三大规则需分别处理:
(user_id, recipient) 组合,检查是否为该用户首次出现 → 可用 duplicated(subset=['user_id', 'recipient'], keep='first') 取反;rolling('1H') 计算过去 1 小时累计金额(注意:需设 timestamp 为时间索引)。由于规则 C 依赖时间窗口,我们先将数据转为 DatetimeIndex 并排序,再逐个构造布尔条件,最后用 | 合并。
代码如下:
import pandas as pd
import numpy as np
from io import StringIO
data = """
tx_id,user_id,amount,timestamp,recipient
T001,U1,60000,2025-03-10 14:30:00,R1
T002,U1,20000,2025-03-10 14:35:00,R2
T003,U1,55000,2025-03-10 23:15:00,R3
T004,U2,40000,2025-03-11 02:20:00,R4
T005,U1,30000,2025-03-11 09:10:00,R1
T006,U1,50000,2025-03-11 09:12:00,R5
"""
anomaly_flagged = (
pd.read_csv(StringIO(data.strip()))
.assign(timestamp=lambda df: pd.to_datetime(df['timestamp']))
.sort_values('timestamp')
.assign(
# 规则A:大额 + 新收款人
is_new_recipient=lambda df: ~df.duplicated(subset=['user_id', 'recipient'], keep='first'),
rule_a=(lambda df: (df['amount'] >= 50000) & df['is_new_recipient']),
# 规则B:非工作时间大额
hour=lambda df: df['timestamp'].dt.hour,
rule_b=(lambda df: (df['amount'] >= 30000) & ((df['hour'] >= 22) | (df['hour'] < 6)))
)
# 为规则C准备:按user分组的时间序列
.set_index('timestamp')
.assign(
rolling_sum_1h=lambda df: df.groupby('user_id')['amount']
.rolling('1H', closed='both')
.sum()
.reset_index(level=0, drop=True)
)
.assign(
rule_c=lambda df: df['rolling_sum_1h'] >= 80000
)
.reset_index()
.assign(
is_anomaly=lambda df: df['rule_a'] | df['rule_b'] | df['rule_c']
)
.drop(columns=[
'is_new_recipient', 'hour', 'rule_a', 'rule_b', 'rule_c', 'rolling_sum_1h'
])
)
print(anomaly_flagged)
'''
timestamp tx_id user_id amount recipient is_anomaly
0 2025-03-10 14:30:00 T001 U1 60000 R1 True
1 2025-03-10 14:35:00 T002 U1 20000 R2 True
2 2025-03-10 23:15:00 T003 U1 55000 R3 True
3 2025-03-11 02:20:00 T004 U2 40000 R4 True
4 2025-03-11 09:10:00 T005 U1 30000 R1 False
5 2025-03-11 09:12:00 T006 U1 50000 R5 True
'''
代码分析:
duplicated(..., keep='first') 标记首次出现为 False,取反后即“新收款人”;dt.hour 提取小时,用布尔逻辑判断是否在夜间区间;set_index('timestamp') 使 rolling('1H') 生效;groupby('user_id') 确保只统计同一用户;closed='both' 包含当前行,确保累计金额含本次交易;is_anomaly;例如:
该方案可无缝集成到实时流水处理管道中,是轻量级风控系统的理想起点。
(完)
更新时间:2025-11-26 11:44:07 标签:pandas python 规则引擎 异常交易