
针对交易所、清算机构、券商系统的多源异构金融数据,设计分阶段(数据抽取、清洗、转换、加载)的ELT流程,结合业务规则(如交易连续性、市场波动)处理缺失值与异常值,通过标准化流程、数据血缘追踪及质量监控,保障数据质量满足金融分析需求。
老师口吻:多源数据整合中的**数据清洗(Data Cleaning)**是处理数据不一致、错误的核心步骤,本质是“修复数据缺陷、统一格式、保证质量”。具体包含:
金融场景下,数据清洗需额外考虑业务逻辑(如交易记录的完整性、清算机构的数量统计规则)。类比:整理杂乱的金融数据仓库,如同整理房间,按业务规则(如交易连续性)分类整理,确保每条记录符合业务逻辑,避免逻辑错误。
| 对比项 | 缺失值处理(Missing Value Handling) | 异常值处理(Outlier Handling) |
|---|---|---|
| 定义 | 数据中缺失的值,可能因采集、传输或业务逻辑(如交易所未成交标记为无效) | 数据中偏离正常范围的极端值,如价格突变、成交量异常 |
| 常用方法 | 删除(整行/列)、插补(前向填充、均值/中位数/模型插补) | 统计方法(3σ、IQR)、规则(业务规则,如市场波动阈值) |
| 使用场景 | 缺失比例低且不影响分析(如少量记录),或业务逻辑明确(如交易所价格前值填充) | 数据分布异常(如厚尾分布),需保留有效数据(如极端市场事件) |
| 注意点 | 避免删除导致信息丢失,插补需符合业务逻辑(如交易所价格用前值填充,而非均值) | 避免误删有效数据(如3σ可能误判市场波动为异常),结合业务规则判断 |
(伪代码:处理多源金融数据的清洗流程)
def multi_source_data_cleaning():
import pandas as pd
# 1. 数据抽取(读取各数据源)
df_exchange = pd.read_csv('exchange_trade.csv')
df_clearing = pd.read_json('clearing_records.json')
df_broker = pd.read_sql("SELECT * FROM broker_trades", 'db_connection')
# 2. 统一时间戳格式(处理数据源差异)
df_exchange['trade_time'] = pd.to_datetime(df_exchange['trade_time'], format='%Y-%m-%d %H:%M:%S')
df_clearing['trade_time'] = pd.to_datetime(df_clearing['trade_time'], format='%Y%m%d%H%M%S')
df_broker['trade_time'] = pd.to_datetime(df_broker['trade_time'], format='%Y-%m-%d %H:%M:%S')
# 3. 字段命名标准化(处理数据源差异)
df_exchange.rename(columns={'stock_id': 'symbol', 'trade_price': 'price'}, inplace=True)
df_clearing.rename(columns={'trade_qty': 'quantity'}, inplace=True)
df_broker.rename(columns={'trade_status': 'status'}, inplace=True)
# 4. 按时间对齐数据
df_all = pd.concat([df_exchange, df_clearing, df_broker], ignore_index=True)
df_all.sort_values('trade_time', inplace=True)
# 5. 处理缺失值(结合业务规则)
# 交易所价格缺失:前值填充(交易连续性)
df_all['price'] = df_all.groupby('symbol')['price'].apply(lambda x: x.fillna(method='ffill'))
# 清算机构数量缺失:均值插补(统计规律)
df_all['quantity'] = df_all.groupby('symbol')['quantity'].apply(lambda x: x.fillna(x.mean()))
# 交易状态缺失:标记为“未成交”(业务逻辑)
df_all['status'] = df_all['status'].fillna('未成交')
# 6. 异常值处理(结合业务规则+统计)
price_mean, price_std = df_all['price'].mean(), df_all['price'].std()
# 市场波动阈值(±5%)
df_all['is_price_outlier'] = (df_all['price'] < price_mean * 0.95) | (df_all['price'] > price_mean * 1.05)
# 用中位数修正异常值(避免极端值影响)
df_all.loc[df_all['is_price_outlier'], 'price'] = df_all['price'].median()
# 7. 数据加载(存储清洗后数据)
df_all.to_csv('cleaned_financial_data.csv', index=False)
return df_all
(60-120秒,自然表达)
“针对交易所、清算机构、券商系统的多源异构金融数据,我会设计一个分阶段的ELT流程。首先,处理数据源差异:比如时间戳格式,交易所是‘YYYY-MM-DD HH:MM:SS’,清算机构是‘YYYYMMDDHHMMSS’,通过正则表达式统一为标准时间;字段命名也不一致,通过重命名标准化。然后,按时间对齐所有数据。处理缺失值时,考虑业务逻辑:交易所的价格缺失用前值填充(保证交易连续性),清算机构的数量用均值插补(基于统计规律);交易状态缺失标记为‘未成交’。对于异常值,比如价格突变超过市场波动阈值(±5%),用中位数修正(避免误删有效数据)。最后,通过数据质量监控(检查缺失率、异常率),确保数据质量,存储清洗后的数据,为后续分析提供可靠基础。”
数据量很大时,如何保证处理效率?
如何处理数据格式不一致的具体例子?
异常值检测方法是否唯一?
数据清洗流程是否标准化?
如何评估数据质量?