
1) 【一句话结论】
处理TB级历史交易数据并支持复杂分析,采用“湖仓一体+双流处理+冷热分离”架构。通过CDC捕获批量历史数据,Kafka+Spark Streaming处理实时增量数据,结合列式存储分区表,并实现数据脱敏、审计日志,确保数据安全与合规,支持客户信用评分、贷款趋势等复杂分析。
2) 【原理/概念讲解】
老师口吻解释关键概念:
类比:数据湖是“原始数据仓库”,存储所有流水;数据仓库是“分析用的处理库”,聚合数据;湖仓一体是“同时保留原始流水和分析库”,流处理是“实时补充新流水”,冷热分离是“按访问频率优化存储成本”,共同支持历史与实时分析,并确保合规与安全。
3) 【对比与适用场景】
| 架构类型 | 定义 | 特性 | 使用场景 | 注意点 |
|---|---|---|---|---|
| 数据仓库 | 预处理后的结构化聚合数据集合,用于OLAP分析 | 结构化存储,预计算聚合,支持复杂查询 | 日常报表、客户信用评分、贷款趋势分析 | 扩展性有限,延迟较高,适合稳定业务 |
| 数据湖 | 原始/半结构化数据存储,支持多种格式 | 原始数据,可扩展,支持全量存储 | 全量数据备份、探索性分析、数据挖掘 | 查询效率低,数据治理复杂,需额外处理 |
| 湖仓一体 | 结合数据湖与数据仓库,存储原始数据并构建分析视图 | 原始数据+分析视图,列式存储,分区索引 | TB级历史数据存储+复杂分析(如信用评分) | 需统一管理,数据治理复杂,需平衡存储与计算 |
| 流处理架构(实时) | 通过Kafka+Spark Streaming捕获实时数据变更 | 实时处理,低延迟,支持增量更新 | 实时查询(如实时信用评分)、实时监控 | 系统稳定性要求高,需处理消息丢失 |
| 冷热分离架构 | 按数据访问频率分离存储,热数据在数据仓库,冷数据在对象存储 | 存储成本优化,保持查询性能 | TB级历史数据存储,降低长期存储成本 | 需维护数据迁移策略,冷数据访问延迟较高 |
4) 【示例】
伪代码展示双流ETL流程(批量+实时)及冷热数据分离:
# 批量ETL(处理历史数据,热数据存储)
def batch_etl():
last_time = get_last_batch_time()
new_records = transaction_system.get_records_since(last_time)
cleaned = [validate_record(r) for r in new_records]
transformed = aggregate_by_month(cleaned)
# 加载到数据仓库(热数据)
for rec in transformed:
db.execute(f"""
INSERT INTO monthly_agg (customer_id, month, total_amount, cnt)
VALUES ({rec['customer_id']}, '{rec['month']}', {rec['amount']}, {rec['cnt']})
PARTITION BY (month)
ON CONFLICT (customer_id, month) DO UPDATE SET ...
"""
)
# 历史数据迁移(冷数据)
cold_records = transformed[config['cold_data_threshold']:] # 超过阈值的历史数据
s3_client.upload_fileobj(
BytesIO(json.dumps(cold_records).encode('utf-8')),
bucket_name,
f"cold_data/{rec['customer_id']}/{rec['month']}.json",
ExtraArgs={'ServerSideEncryption': 'AES256'}
)
# 实时ETL(处理增量数据,低延迟)
def real_time_etl():
kafka_consumer = KafkaConsumer('transaction-changes', bootstrap_servers='kafka:9092')
for msg in kafka_consumer:
record = json.loads(msg.value)
if not validate_record(record):
continue
transformed = {
'customer_id': record['customer_id'],
'month': record['date'].strftime('%Y-%m'),
'amount': record['amount'],
'type': record['type']
}
# 加载实时表(热数据)
db.execute(f"""
INSERT INTO real_time_tx (customer_id, month, amount, type)
VALUES ({transformed['customer_id']}, '{transformed['month']}', {transformed['amount']}, '{transformed['type']}')
PARTITION BY (month)
"""
)
# 审计日志记录
audit_log.append({
'user': 'system',
'action': 'insert',
'data': transformed,
'timestamp': datetime.now()
})
# 数据脱敏与加密
def anonymize_data(data):
# 客户姓名脱敏(替换为随机字符串)
data['name'] = f"ANON_{hash(data['name'])}"
return data
# 审计日志存储
def save_audit_log():
with open('audit.log', 'a') as f:
json.dump(audit_log, f)
5) 【面试口播版答案】
面试官您好,处理TB级历史交易数据并支持复杂分析,我们采用“湖仓一体+双流处理+冷热分离”架构。具体来说,通过CDC捕获批量历史数据,用Kafka+Spark Streaming处理实时增量数据,两者结合后加载到列式存储的分区表中(热数据在数据仓库,冷数据迁移到对象存储,通过压缩降低成本)。数据仓库通过时间分区(如按月)和索引优化,支持客户信用评分(分析客户交易行为模式)和贷款趋势分析(按行业、时间维度聚合数据),实时流处理保证低延迟查询,同时实现数据脱敏(如客户姓名替换为随机标识)、存储加密(KMS加密)和审计日志,满足金融行业合规要求。
6) 【追问清单】
7) 【常见坑/雷区】