
1) 【一句话结论】:从360安全产品日志系统(存储在MongoDB,每天约10亿条日志,更新频次高)到数据仓库的ETL流程,采用增量抽取(利用时间戳索引加速,避免全表扫描)、分布式转换(Spark按时间分片处理数据倾斜)、分区加载(Hive按天分区提升查询效率),并通过数据质量校验(字段完整性、时间戳有效性)与实时监控(Prometheus记录错误率、任务耗时),确保数据仓库数据准确、及时、一致。
2) 【原理/概念讲解】:针对日志系统(如MongoDB存储,数据量大、更新频繁),ETL流程分为三步:
{timestamp: 1}),利用索引快速查询自上次抽取时间戳之后的新增记录,避免全表扫描。类比:像用时间戳作为“钥匙”,打开仓库的“新增货物”区域,只取新增部分,效率高。partition_date字段),提高查询效率。类比:把加工好的产品按批次放入成品仓库,按时间顺序存放,方便后续查询。数据质量指数据仓库数据的可靠性,包含:准确性(事件类型编码正确,如“install”映射为1)、完整性(用户ID、时间戳等关键字段不缺失)、一致性(时间戳顺序正确,无乱序)、及时性(ETL任务在24小时内完成,确保分析数据及时)。
3) 【对比与适用场景】:以“增量抽取”与“全量抽取”为例,对比如下:
| 对比项 | 全量抽取(Full Extract) | 增量抽取(Incremental Extract) |
|---|---|---|
| 定义 | 每次抽取所有历史数据 | 仅抽取自上次抽取以来新增/变化的数据 |
| 特性 | 数据量固定,处理简单 | 数据量随时间增长,需维护增量标识(如时间戳、序列号) |
| 使用场景 | 数据量小、更新频率低 | 数据量大、更新频繁(如日志系统,每天10亿条) |
| 注意点 | 需定期全量同步,占用资源 | 需维护增量标识,处理数据冲突(如重复记录) |
4) 【示例】(伪代码,结合MongoDB索引与Spark分片):
# 1. 数据抽取:从安全卫士日志API获取增量数据(利用MongoDB时间戳索引)
import requests
import json
from pymongo import MongoClient
def extract_incremental_logs(last_processed_time):
client = MongoClient('mongodb://mongo-server:27017/')
db = client['safeweb_logs']
collection = db['raw_logs']
new_logs = collection.find(
{"timestamp": {"$gt": last_processed_time}},
projection={"_id": 0, "timestamp": 1, "event_type": 1, "user_id": 1, "device_info": 1, "error_msg": 1}
)
return list(new_logs)
# 2. 数据转换:清洗、格式化、分片处理(Spark处理倾斜)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("log_transform").getOrCreate()
def transform_logs(logs_df):
df = logs_df.filter(
(logs_df.event_type.isNotNull()) &
(logs_df.timestamp.isNotNull())
)
df = df.withColumn("timestamp", spark.sql("CAST(timestamp AS TIMESTAMP)"))
event_map = {"install":1, "uninstall":2, "error":3}
df = df.withColumn("event_type_id", spark.sql("CASE WHEN event_type IN ({}) THEN CAST(value AS INT) ELSE 0 END".format(','.join(map(str, event_map.keys())))))
df = df.repartition("date_format(timestamp, 'yyyy-MM-dd')") # 按天分片处理倾斜
return df
# 3. 数据加载:加载到Hive表(按天分区)
from pyhive import hive
def load_to_warehouse(df, today_date):
conn = hive.Connection(host='hive-server', port=10000, username='data_engineer')
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS safeweb_logs (
event_type_id INT,
timestamp TIMESTAMP,
user_id STRING,
device_info STRING,
error_msg STRING,
partition_date STRING
) PARTITIONED BY (partition_date STRING)
""")
cursor.execute(f"ALTER TABLE safeweb_logs ADD PARTITION (partition_date='{today_date}')")
cursor.execute("INSERT INTO safeweb_logs SELECT event_type_id, timestamp, user_id, device_info, error_msg, '{today_date}' FROM df")
conn.commit()
cursor.close()
conn.close()
# 主流程
last_processed_time = "2024-05-20 00:00:00"
logs = extract_incremental_logs(last_processed_time)
df = spark.createDataFrame(logs)
transformed_df = transform_logs(df)
load_to_warehouse(transformed_df, "2024-05-20")
5) 【面试口播版答案】:面试官您好,我来描述一下从360安全产品日志系统到数据仓库的ETL流程。首先,我们从360安全产品的日志系统(假设日志存储在MongoDB,每天产生约10亿条日志,更新频率为每小时一次)获取数据,采用增量抽取方式,关键是通过源系统(MongoDB)的时间戳索引,快速查询自上次抽取时间戳之后的新增日志,避免全量扫描,提升抽取效率。接着是数据转换,包括清洗(过滤缺失字段、异常时间戳的日志)、格式化(将时间戳统一为标准格式)、字段转换(比如将事件类型“install”“uninstall”映射为数字ID,方便后续分析),以及用Spark的repartition按天分片处理数据倾斜,确保每个节点处理均衡数据量。然后是数据加载,将转换后的数据加载到数据仓库的Hive表中,采用按天分区(提高查询效率)。为了保证数据质量,我们在每个步骤加入校验:比如抽取阶段检查API返回码是否正常;转换阶段检查数据字段是否完整,时间戳是否有效;加载阶段检查数据量是否匹配,并记录日志。同时,我们会定期监控ETL任务的执行状态,比如使用Prometheus记录任务耗时(要求在24小时内完成)、错误率(要求低于0.1%),一旦发现异常会触发告警,确保数据仓库数据准确、及时、一致。在实际项目中,我们曾用时间戳索引把抽取时间从1小时缩短到5分钟,用Spark分片处理数据倾斜,确保任务高效完成。
6) 【追问清单】:
7) 【常见坑/雷区】: