
推荐系统数据管道是连接用户行为采集与模型训练的链路,通过实时流处理(低延迟)和离线批处理(高吞吐)技术,将用户行为转化为模型可用的特征,关键技术包括流处理、特征工程、分布式存储,核心工程挑战是数据一致性、延迟控制、系统扩展性及模型迭代效率。
数据管道流程分为用户行为采集、数据清洗、特征工程、数据存储、模型训练五大环节,各环节关键技术及作用:
简短类比:数据管道就像“数据加工流水线”,用户行为是“原材料”,清洗是“质检”,特征工程是“加工成零件”,存储是“入库”,训练是“组装成产品”,最终用于推荐服务。
| 环节 | 实时流处理(流处理) | 离线批处理(批处理) |
|---|---|---|
| 定义 | 对数据流进行实时计算,低延迟(秒级),用于实时响应 | 对历史数据批量计算,延迟(小时/天),用于历史分析 |
| 关键技术 | 消息队列(Kafka)、流计算框架(Flink/Spark Streaming)、状态管理(Flink State Backend) | 批处理框架(Spark Batch/Hadoop MapReduce)、数据仓库(Hive)、分布式文件系统(HDFS) |
| 特性 | 低延迟、高吞吐、实时性、容错性(Exactly-Once语义) | 高吞吐、适合复杂计算、成本较低、支持复杂SQL |
| 使用场景 | 用户实时行为(如点击、购买)的实时特征更新、实时推荐、A/B测试 | 用户历史行为分析、离线模型训练、冷启动用户特征构建、长期趋势分析 |
| 注意点 | 数据丢失(需Kafka持久化+Flink Checkpoint保证Exactly-Once)、延迟控制(需优化计算逻辑)、系统扩展性(流处理框架并行度设置) | 数据量巨大(需分片、增量计算)、计算资源需求(需集群资源)、任务调度复杂(需YARN资源管理) |
from kafka import KafkaConsumer
import hdfs
hdfs_client = hdfs.Connection('hdfs://hdfs:50070')
consumer = KafkaConsumer('user_click_topic', bootstrap_servers='kafka:9092', group_id='user_feature_group')
for msg in consumer:
click_data = json.loads(msg.value)
if click_data.get('item_id') and click_data.get('user_id'):
user_features = {
'user_id': click_data['user_id'],
'liked_categories': [click_data['item_category']],
'timestamp': click_data['timestamp']
}
with hdfs_client.create(f'/user/feature/user_features/{click_data["user_id"]}.json', overwrite=True) as f:
f.write(json.dumps(user_features).encode('utf-8'))
# 触发离线训练任务
trigger_training_job()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("OfflineTraining").getOrCreate()
user_actions = spark.read.parquet("hdfs://hdfs:9000/user_actions")
cleaned_actions = user_actions.filter(
(user_actions.action_type.isin(['click', 'buy'])) &
(user_actions.valid == 1)
)
user_profile = cleaned_actions.groupBy("user_id").agg(
collect_list("item_category").alias("liked_categories")
)
item_features = cleaned_actions.groupBy("item_id").agg(
collect_list("item_attr").alias("item_attrs")
)
train_data = user_profile.join(item_features, on="user_id")
train_data.write.parquet("hdfs://hdfs:9000/train_data")
“数据管道从用户行为采集到模型训练,核心是通过实时流处理和离线批处理技术,将用户行为转化为模型可用的特征。首先,用户行为采集通过前端埋点(如JavaScript事件)或服务端日志,收集点击、购买等行为,数据格式为JSON或Protobuf。接着进行数据清洗,过滤无效数据。然后做特征工程,比如构建用户画像(用户点击的物品类别列表)和物品特征。存储方面,实时行为用Kafka,离线历史用HDFS。实时处理用Flink保证低延迟,离线用Spark处理历史数据。工程挑战包括数据一致性(实时与离线数据同步)、延迟控制(秒级更新)、系统扩展性(处理千万级用户)、模型迭代速度(快速重新训练)。关键技术有Kafka+Flink(流处理)、特征工程(用户/物品画像)、分布式存储(HDFS/Hive)等。”