
农业大数据平台的ETL设计需分源处理多源数据(传感器流、农户结构化、历史批量),通过数据清洗(标记异常值)、转换(时间序列聚合)、加载(实时/历史分离),结合Flink实现多变量实时异常检测,保障数据质量与检测时效性。
ETL是数据集成的核心流程,包含Extract(抽取)、Transform(转换)、**Load(加载)**三个阶段,用于整合多源数据。针对不同数据源特性优化处理逻辑:
| 数据源类型 | ETL处理方式 | 延迟 | 适用场景 | 注意点 |
|---|---|---|---|---|
| 传感器流数据 | 实时ETL(Flink) | 毫秒级 | 实时异常检测、实时预警 | 需高并发处理,资源消耗大,需优化并行度 |
| 农户上报结构化数据 | 批量ETL(Spark) | 分钟级 | 历史种植计划分析、报表 | 适合低频、批量处理,数据量小 |
| 历史批量数据 | 批ETL(Hive) | 小时级 | 年度数据分析、趋势预测 | 需数据缓冲,延迟较高,适合离线分析 |
def clean_sensor_data(json_data):
data = json.loads(json_data)
# 数值型缺失值用历史均值填充
for key in ['temperature', 'humidity', 'soil_moisture']:
if data[key] is None:
data[key] = data[key].mean() # 历史均值填充
# 3σ原则检测异常值(标记,不填充)
for key in ['temperature', 'humidity', 'soil_moisture']:
mean_val = data[key].mean()
std_val = data[key].std()
if abs(data[key] - mean_val) > 3 * std_val:
data[key] = None # 标记异常值(保留原值,后续分析)
return data
// Flink Streaming API,多变量异常检测(结合温度、湿度、土壤湿度)
DataStream<SensorData> stream = env.socketTextStream("localhost", 9999);
stream
.map(new SensorDataMapper())
.keyBy(SensorData::getSensorId)
.window(TumblingProcessingTimeWindow.of(Time.minutes(30))) // 30分钟滑动窗口(农业异常持续时间)
.aggregate(new MultiVarAggregator()) // 聚合多变量统计量(均值、标准差)
.filter(new MultiVarOutlierDetector()) // 检测多变量异常(如温度过高+湿度异常)
.print();
“面试官您好,针对农业大数据平台的ETL设计,核心是分源处理多源数据,结合实时计算实现异常检测。首先,传感器数据(流式、每分钟采集一次)通过Flink实时ETL,清洗用3σ原则检测异常值,标记异常而非填充,避免偏差;农户上报的种植计划(结构化、每周上报)通过Spark批量ETL,用插值处理缺失值;历史数据通过Hive批ETL加载。数据转换阶段,将时间序列数据按30分钟滑动窗口聚合,生成分析数据。数据加载时,实时数据通过Flink加载到实时数据库,历史数据加载到Hive。然后,利用Flink的流处理,对实时数据聚合后,用多变量统计模型(结合温度、湿度、土壤湿度)检测异常,比如异常组合(温度过高+湿度异常),及时预警。这样既保证数据质量,又能实时检测异常,支持农业决策。”