
1) 【一句话结论】
为某制造企业建设实时数据分析平台,通过优化多源数据接入、数据质量治理及计算资源利用(如Spark并行度调整、内存缓存),解决数据源多样、质量差、资源不足问题,使分析效率提升30%,设备故障预警模型推动生产效率提升15%。
2) 【原理/概念讲解】
数据中台的核心是统一管理分散业务数据,为上层应用提供数据服务,类似“数据超市”——用户直接获取加工后的数据,无需自己处理原始数据。ETL(抽取、转换、加载)是数据从源系统到数据仓库的流程,像“数据搬运工”,需处理数据格式不统一、质量差等问题。数据质量是数据可信的基础,需通过规则检查(如完整性、准确性)保障,比如IoT设备数据可能存在缺失或异常值,需清洗确保数据可靠。
3) 【对比与适用场景】
数据湖与数据仓库对比:
| 特性 | 数据湖(如HDFS+Hive) | 传统数据仓库(如星型模型) |
|---|---|---|
| 数据形态 | 结构化/半结构化/非结构化混合 | 结构化为主 |
| 存储方式 | 分布式文件系统 | 关系型数据库 |
| 使用场景 | 大规模数据探索、机器学习 | 交互式查询、报表 |
| 注意点 | 需数据治理,避免数据混乱 | 扩展性有限,成本高 |
4) 【示例】
假设数据源包括:ERP(结构化生产数据)、IoT设备(流数据,设备状态)、社交媒体(非结构化用户评论)。计算资源不足表现为集群节点少(10个节点)、内存限制(每个节点8GB RAM),导致Spark任务处理延迟。解决方案:构建分层处理架构,分批处理与流处理结合。伪代码示例(Spark处理批数据,优化资源):
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ETL").getOrCreate()
# 1. 数据抽取(多源并行,考虑资源限制)
# ERP结构化数据(JDBC)
erp_df = spark.read.jdbc("jdbc:mysql://host:3306/erp", "production", "user:pwd")
# IoT流数据(Kafka,调整并行度适应资源)
iot_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka:9092").option("subscribe", "iot").option("max partitions per topic", 2).load().selectExpr("CAST(value AS STRING)")
# 社交媒体非结构化(HDFS,分块读取)
social_df = spark.read.format("json").load("hdfs://namenode:8020/social_data").selectExpr("text", "extract_keywords(text) as keywords")
# 2. 数据转换(资源优化:分区、缓存)
from pyspark.sql.functions import from_unixtime, to_timestamp
# 分区优化(按时间分区,减少扫描量)
erp_df = erp_df.repartition("date")
iot_df = iot_df.repartition("timestamp")
social_df = social_df.repartition("publish_time")
# 内存缓存(常用数据)
spark.catalog.cacheTable("processed_erp")
spark.catalog.cacheTable("processed_iot")
# 3. 数据清洗(数据质量处理)
# 缺失值处理:用中位数填充
def median(col):
return (avg(col) + stddev(col)) / 2
iot_df = iot_df.withColumn("temperature", when(col("temperature").isNull(), median(col("temperature"))).otherwise(col("temperature")))
# 异常值过滤:IQR方法
def filter_outliers(df, col_name):
q1 = df.approxQuantile(col_name, [0.25], 0.001)[0]
q3 = df.approxQuantile(col_name, [0.75], 0.001)[0]
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
return df.filter((col(col_name) >= lower_bound) & (col(col_name) <= upper_bound))
iot_df = filter_outliers(iot_df, "temperature")
# 4. 数据加载(写入数据仓库)
spark.sql("""
INSERT INTO production_fact
SELECT
product_id,
to_timestamp(date, "yyyy-MM-dd") as timestamp,
production_qty,
device_id,
comment_id,
status
FROM processed_data
""")
5) 【面试口播版答案】
面试官好,我分享一个为某制造企业建设实时数据分析平台的项目。项目背景是企业在生产、设备、销售数据分散,导致决策效率低。我的角色是技术负责人,负责数据架构设计和核心模块开发。主要挑战有三个:一是数据源多样(ERP、IoT、社交媒体),数据格式不统一;二是数据质量差,比如IoT设备数据有缺失和异常值;三是计算资源有限,集群节点少(仅10个节点)、内存限制(每个节点8GB),导致处理延迟。解决方案是构建统一数据中台,分三步:1. 数据接入层,用Kafka处理流数据(调整并行度为2,适应资源),Flume收集日志,API网关接入结构化数据;2. 数据处理层,用Spark进行ETL,包括数据清洗(用中位数填充缺失值,IQR过滤异常值),转换(时间戳统一、字段标准化),并采用分区、内存缓存优化资源;3. 数据服务层,用Flink实时计算,提供实时分析接口。最终成果是分析效率提升30%,比如原来查询生产报表需要5分钟,现在1分钟;同时发现设备故障预警模型,提前识别设备异常,优化后生产效率提升15%。
6) 【追问清单】
7) 【常见坑/雷区】