
教育数据ETL管道需分阶段(抽取-清洗-加载),通过增量抽取(如CDC)降低延迟,用统计方法(如IQR)处理缺失/异常值,以主键+时间戳保证多源数据一致性,并建立量化监控机制(如延迟、质量指标)。
老师口吻:构建ETL管道的核心是“分阶段处理”,每个阶段解决不同问题。
| 方式 | 定义 | 特性 | 使用场景 | 注意点 |
|---|---|---|---|---|
| 全量抽取 | 每次抽取所有数据 | 一次性加载,数据完整 | 初始数据加载(数据量小,如系统上线初期) | 延迟长(数小时),资源消耗大 |
| 增量抽取(CDC) | 抽取变化数据(如数据库binlog、消息队列变更) | 实时/准实时(秒级-分钟级),资源少 | 线上系统(变化频繁,如作业提交、直播互动) | 需维护变更日志(如数据库binlog),确保数据不丢失 |
| 方法 | 处理逻辑 | 适用场景 | 注意点 |
|---|---|---|---|
| 时间序列填充(前序有效值) | 用前一个有效值填充缺失值(如课程ID) | 缺失值因系统故障导致,数据连续性重要(如课程ID中断) | 避免删除导致信息丢失,保留业务逻辑 |
| 删除记录 | 直接删除缺失值记录 | 缺失比例高(如>30%),数据量小 | 丢失信息,影响分析结果 |
| 均值/中位数填充 | 用统计值填充(如学生平均作业时长) | 缺失比例低(如<10%),数据分布稳定 | 可能引入偏差,需验证业务合理性 |
(伪代码,展示增量抽取、清洗、加载流程)
# 1. 增量抽取(Kafka CDC,假设从数据库binlog写入Kafka)
def extract_live_data():
messages = kafka_consumer.consume(topic="live_course_binlog", start_time="2024-01-01")
live_records = [parse_message(msg) for msg in messages if is_valid(msg)]
return live_records
# 2. 清洗:处理缺失值(课程ID)和异常值(学生人数)
def clean_data(records):
records = fill_missing_course_id(records, method="prev")
records = filter_outliers(records, column="student_count", method="iqr")
return records
# 3. 加载到Hive分区表(按事件时间分区)
def load_to_warehouse(cleaned_records):
hive_loader.write_to_hive(
table="live_course_data",
partition_column="event_time",
data=cleaned_records
)
(约90秒,自然表达)
“构建教育数据ETL管道,我们分三步走:第一步抽取,用增量方式(比如从数据库binlog或Kafka消费变化数据),避免全量抽取的延迟;第二步清洗,处理缺失值(比如课程ID用前一个有效值填充)和异常值(比如学生人数超过箱线图范围则过滤);第三步加载,按时间戳分区到数据仓库。数据延迟方面,实时数据用Kafka(秒级延迟),批量任务处理历史数据;数据一致性用学生ID+更新时间,确保多源数据唯一,冲突时优先更新最新数据。监控方面,比如抽取延迟(P99)、清洗错误率、加载成功率,及时发现问题。”
问:如何量化数据延迟?
问:缺失值处理中,为什么当缺失比例超过20%时采用人工干预?
问:多源数据同步时,若两个系统数据冲突(如学生ID不一致),如何解决?
问:大数据量下,如何优化ETL性能?
问:监控数据一致性,具体怎么做?