
1) 【一句话结论】:针对结构化、半结构化、非结构化数据,采用分层ETL架构(数据仓库+数据湖),结合流处理与批处理,通过数据血缘追踪和动态校验规则,确保数据一致性并优化处理效率。
2) 【原理/概念讲解】:ETL流程包含抽取、转换、加载三步,针对不同数据类型设计差异化处理。结构化数据(如交易表)用SQL从数据库按时间分区抽取,通过聚合、清洗转换后加载到数据仓库事实表;半结构化数据(如JSON风控报告)用解析库解析为结构化,补充关联字段并校验时间戳(与交易数据一致)后加载;非结构化数据(如资管文档)用NLP分词提取特征,存储到数据湖,再通过ETL抽取关键信息。数据湖用于存储原始非结构化数据,数据仓库用于存储结构化事实表和维度表,通过中间层(数据湖)统一存储,便于后续分析。数据质量校验在转换步骤加入(如交易金额非负、风控评分范围),确保数据一致性。
3) 【对比与适用场景】:
| 数据类型 | 定义 | 特性 | 处理方式 | 工具示例 | 使用场景 | 注意点 |
|---|---|---|---|---|---|---|
| 结构化 | 固定字段(如交易ID、金额) | 字段固定,查询高效 | SQL抽取+转换(聚合、清洗) | MySQL、Spark SQL | 交易数据 | 需建立索引,分区按时间(如按天)优化加载;避免全表扫描 |
| 半结构化 | 自由字段(如JSON带标签) | 字段可变但结构有规律 | 解析+结构化(补充关联字段) | Jackson、Gson | 风控报告 | 字段解析需校验(如交易ID存在性);时间戳对齐(与交易数据时间一致) |
| 非结构化 | 无固定格式(如文本) | 格式自由,处理复杂 | NLP分词+特征提取 | Jieba、NLTK | 资管文档 | 流处理(如Flink)平衡延迟与吞吐量;存储压缩(如Parquet)减少I/O;延迟要求高时用流,低时用批 |
4) 【示例】:伪代码展示结构化(批处理)、半结构化(混合处理,实时校验)、非结构化(流处理)的ETL流程,包含数据一致性校验(时间戳匹配)。
# 1. 结构化数据ETL(交易数据,批处理)
def extract_trading_data():
sql = "SELECT transaction_id, amount, create_time FROM trading_table PARTITION (date='2023-10-01')"
return db.query(sql)
def transform_trading_data(data):
valid = [row for row in data if row['amount'] >= 0]
total = sum(row['amount'] for row in valid)
return {'total_amount': total, 'valid_transactions': len(valid)}
def load_trading_data(transformed_data):
db.insert('trading_fact', transformed_data)
# 2. 半结构化数据ETL(风控报告,混合处理:实时流+批校验)
def extract_risk_stream():
for report in kafka_consumer():
yield report
def transform_risk_data(report):
if 'transaction_id' not in report or 'time' not in report:
raise ValueError("风控报告缺少必要字段")
if report['time'] < min(trading_time):
return None # 超时数据丢弃
return {
'report_id': report['id'],
'risk_score': report['score'],
'transaction_id': report['transaction_id']
}
def load_risk_data(transformed_data):
db.insert('risk_fact', transformed_data)
# 3. 非结构化数据ETL(资管文档,流处理)
def extract_asset_stream():
for doc in kafka_consumer():
yield doc['content']
def transform_asset_data(text):
words = jieba.cut(text)
keywords = [w for w in words if len(w) > 2]
return {'keywords': keywords, 'doc_id': doc['id']}
def load_asset_data(transformed_data):
db.insert('asset_text', transformed_data)
5) 【面试口播版答案】:面试官您好,针对不同结构的数据,我会设计分层ETL流程。首先,结构化交易数据用SQL从数据库按日期分区抽取,通过聚合和金额非负校验转换后加载到数据仓库事实表;半结构化风控数据用实时流处理从Kafka消费,解析JSON并补充交易ID,同时校验时间戳与交易数据一致,然后加载;非结构化资管文档用流处理(如Flink)消费文本,用jieba分词提取关键词,存储到数据湖,再通过ETL抽取关键信息。通过数据湖存储原始数据,数据仓库存储结构化事实表,中间加入数据血缘追踪和动态校验规则,确保数据一致性,同时用流处理(非结构化)和批处理(结构化)优化效率,比如结构化数据分区减少I/O,非结构化流处理平衡延迟与吞吐量。
6) 【追问清单】:
7) 【常见坑/雷区】: