51mee - AI智能招聘平台Logo
模拟面试题目大全招聘中心会员专区

描述一个从360安全产品日志系统(如安全卫士日志)到数据仓库的ETL流程,包括数据抽取、转换、加载的步骤,以及如何保证数据质量?

360大数据分析工程师难度:中等

答案

1) 【一句话结论】:从360安全产品日志系统(存储在MongoDB,每天约10亿条日志,更新频次高)到数据仓库的ETL流程,采用增量抽取(利用时间戳索引加速,避免全表扫描)、分布式转换(Spark按时间分片处理数据倾斜)、分区加载(Hive按天分区提升查询效率),并通过数据质量校验(字段完整性、时间戳有效性)与实时监控(Prometheus记录错误率、任务耗时),确保数据仓库数据准确、及时、一致。

2) 【原理/概念讲解】:针对日志系统(如MongoDB存储,数据量大、更新频繁),ETL流程分为三步:

  • 数据抽取(Extract):从源系统(如日志API)获取增量数据。关键在于源系统(MongoDB)需建立时间戳索引(如{timestamp: 1}),利用索引快速查询自上次抽取时间戳之后的新增记录,避免全表扫描。类比:像用时间戳作为“钥匙”,打开仓库的“新增货物”区域,只取新增部分,效率高。
  • 数据转换(Transform):对抽取的数据进行清洗(过滤缺失字段、异常时间戳)、格式化(时间戳统一为标准格式)、字段转换(事件类型映射为ID)、去重(按时间戳排序后去重)。为处理数据倾斜(如某天日志量异常大),采用Spark的repartition操作,将数据按时间分区(如按天)分片到不同计算节点,确保每个节点处理均衡数据量。类比:加工车间按批次分片处理,避免某台机器过载。
  • 数据加载(Load):将转换后的数据加载到目标数据仓库(如Hive表),采用增量加载(追加新数据),目标表按时间分区(如partition_date字段),提高查询效率。类比:把加工好的产品按批次放入成品仓库,按时间顺序存放,方便后续查询。

数据质量指数据仓库数据的可靠性,包含:准确性(事件类型编码正确,如“install”映射为1)、完整性(用户ID、时间戳等关键字段不缺失)、一致性(时间戳顺序正确,无乱序)、及时性(ETL任务在24小时内完成,确保分析数据及时)。

3) 【对比与适用场景】:以“增量抽取”与“全量抽取”为例,对比如下:

对比项全量抽取(Full Extract)增量抽取(Incremental Extract)
定义每次抽取所有历史数据仅抽取自上次抽取以来新增/变化的数据
特性数据量固定,处理简单数据量随时间增长,需维护增量标识(如时间戳、序列号)
使用场景数据量小、更新频率低数据量大、更新频繁(如日志系统,每天10亿条)
注意点需定期全量同步,占用资源需维护增量标识,处理数据冲突(如重复记录)

4) 【示例】(伪代码,结合MongoDB索引与Spark分片):

# 1. 数据抽取:从安全卫士日志API获取增量数据(利用MongoDB时间戳索引)
import requests
import json
from pymongo import MongoClient

def extract_incremental_logs(last_processed_time):
    client = MongoClient('mongodb://mongo-server:27017/')
    db = client['safeweb_logs']
    collection = db['raw_logs']
    new_logs = collection.find(
        {"timestamp": {"$gt": last_processed_time}},
        projection={"_id": 0, "timestamp": 1, "event_type": 1, "user_id": 1, "device_info": 1, "error_msg": 1}
    )
    return list(new_logs)

# 2. 数据转换:清洗、格式化、分片处理(Spark处理倾斜)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("log_transform").getOrCreate()

def transform_logs(logs_df):
    df = logs_df.filter(
        (logs_df.event_type.isNotNull()) &
        (logs_df.timestamp.isNotNull())
    )
    df = df.withColumn("timestamp", spark.sql("CAST(timestamp AS TIMESTAMP)"))
    event_map = {"install":1, "uninstall":2, "error":3}
    df = df.withColumn("event_type_id", spark.sql("CASE WHEN event_type IN ({}) THEN CAST(value AS INT) ELSE 0 END".format(','.join(map(str, event_map.keys())))))
    df = df.repartition("date_format(timestamp, 'yyyy-MM-dd')")  # 按天分片处理倾斜
    return df

# 3. 数据加载:加载到Hive表(按天分区)
from pyhive import hive
def load_to_warehouse(df, today_date):
    conn = hive.Connection(host='hive-server', port=10000, username='data_engineer')
    cursor = conn.cursor()
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS safeweb_logs (
            event_type_id INT,
            timestamp TIMESTAMP,
            user_id STRING,
            device_info STRING,
            error_msg STRING,
            partition_date STRING
        ) PARTITIONED BY (partition_date STRING)
    """)
    cursor.execute(f"ALTER TABLE safeweb_logs ADD PARTITION (partition_date='{today_date}')")
    cursor.execute("INSERT INTO safeweb_logs SELECT event_type_id, timestamp, user_id, device_info, error_msg, '{today_date}' FROM df")
    conn.commit()
    cursor.close()
    conn.close()

# 主流程
last_processed_time = "2024-05-20 00:00:00"
logs = extract_incremental_logs(last_processed_time)
df = spark.createDataFrame(logs)
transformed_df = transform_logs(df)
load_to_warehouse(transformed_df, "2024-05-20")

5) 【面试口播版答案】:面试官您好,我来描述一下从360安全产品日志系统到数据仓库的ETL流程。首先,我们从360安全产品的日志系统(假设日志存储在MongoDB,每天产生约10亿条日志,更新频率为每小时一次)获取数据,采用增量抽取方式,关键是通过源系统(MongoDB)的时间戳索引,快速查询自上次抽取时间戳之后的新增日志,避免全量扫描,提升抽取效率。接着是数据转换,包括清洗(过滤缺失字段、异常时间戳的日志)、格式化(将时间戳统一为标准格式)、字段转换(比如将事件类型“install”“uninstall”映射为数字ID,方便后续分析),以及用Spark的repartition按天分片处理数据倾斜,确保每个节点处理均衡数据量。然后是数据加载,将转换后的数据加载到数据仓库的Hive表中,采用按天分区(提高查询效率)。为了保证数据质量,我们在每个步骤加入校验:比如抽取阶段检查API返回码是否正常;转换阶段检查数据字段是否完整,时间戳是否有效;加载阶段检查数据量是否匹配,并记录日志。同时,我们会定期监控ETL任务的执行状态,比如使用Prometheus记录任务耗时(要求在24小时内完成)、错误率(要求低于0.1%),一旦发现异常会触发告警,确保数据仓库数据准确、及时、一致。在实际项目中,我们曾用时间戳索引把抽取时间从1小时缩短到5分钟,用Spark分片处理数据倾斜,确保任务高效完成。

6) 【追问清单】:

  • 问题1:如何优化源系统抽取效率?(回答要点:在MongoDB中为日志表建立时间戳索引,利用索引快速查询增量数据,避免全表扫描,提升抽取速度。)
  • 问题2:数据转换中如何处理数据倾斜?(回答要点:在Spark转换步骤中采用repartition按时间分区(如按天)分片,将数据分配到不同计算节点,确保每个节点处理均衡数据量,避免单节点过载。)
  • 问题3:ETL工具选择(Airflow+Spark)的依据?(回答要点:日志数据量大(10亿条/天),任务复杂(清洗、转换),需分布式处理,因此选择Spark进行数据转换(高效处理大规模数据),用Airflow调度任务(管理依赖关系、监控任务状态)。)
  • 问题4:如何应对源系统数据结构变更(如新增字段)?(回答要点:在转换阶段采用动态字段映射(如使用正则表达式匹配字段,或配置字段映射表),当源系统新增字段时,动态更新映射规则,避免增量标识失效。)
  • 问题5:数据质量监控指标如何设定?(回答要点:基于历史数据统计错误率分布(如错误率通常低于0.1%),设定阈值(如错误率超过0.1%触发告警),并记录任务耗时(如ETL任务应在24小时内完成),确保数据及时性。)

7) 【常见坑/雷区】:

  • 增量标识维护不当导致数据重复:若增量标识(如时间戳)记录错误,可能导致重复记录,影响分析结果(如统计安装次数时重复计算)。需定期校验增量标识的准确性。
  • 数据倾斜未处理导致任务卡顿:若未按时间分片处理数据倾斜,某天日志量异常大的数据会被集中到一个节点,导致任务耗时过长,影响数据及时性。需通过分片策略均衡数据。
  • ETL工具选择不当(如用全量抽取处理日志):若选择全量抽取,每天10亿条日志会导致资源占用过高,任务耗时过长,无法满足及时性要求。需根据数据量选择增量抽取+分布式处理。
  • 数据质量检查不全面:仅检查部分字段(如时间戳),遗漏关键数据(如用户ID缺失),导致后续分析错误(如无法关联用户行为)。需全面检查字段完整性、数据有效性。
  • 加载后未验证数据:直接加载后未进行数据校验,导致数据仓库数据错误(如加载了格式错误的日志,导致后续分析结果异常)。需通过抽样检查、与源系统数据对比等方式验证数据量与字段分布。
51mee.com致力于为招聘者提供最新、最全的招聘信息。AI智能解析岗位要求,聚合全网优质机会。
产品招聘中心面经会员专区简历解析Resume API
联系我们南京浅度求索科技有限公司admin@51mee.com
联系客服
51mee客服微信二维码 - 扫码添加客服获取帮助
© 2025 南京浅度求索科技有限公司. All rights reserved.
公安备案图标苏公网安备32010602012192号苏ICP备2025178433号-1