
1) 【一句话结论】采用MQTT协议实现射频传感器数据的实时采集,通过流处理框架(如Apache Flink)完成滤波、去噪等预处理,并利用事务性数据库(如Cassandra或带有ACID特性的数据库)保证数据一致性,确保系统低延迟、高吞吐且数据可靠。
2) 【原理/概念讲解】
老师:咱们先讲几个核心概念。
3) 【对比与适用场景】
| 特性 | 流处理(实时处理) | 批处理(批量处理) |
|---|---|---|
| 定义 | 实时处理数据流,数据到达即处理 | 批量处理历史数据,定期执行 |
| 延迟 | 毫秒级(低延迟) | 分钟级/小时级(高延迟) |
| 适用场景 | 实时监控(信号强度波动)、实时告警 | 历史数据分析(月度温度统计)、报表生成 |
| 注意点 | 需处理状态管理(如Flink状态) | 适合数据量大、离线计算场景 |
4) 【示例】
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
client.subscribe("sensor/rf")
def on_message(client, msg):
data = msg.payload.decode()
temp, hum, signal = parse_data(data) # 解析温度、湿度、信号强度
send_to_streaming_system(temp, hum, signal) # 发送至流处理系统
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("mqtt.broker.com", 1883)
client.loop_forever()
DataStream<RawSensorData> rawStream = env.addSource(mqttSourceFunction);
DataStream<ProcessedData> processedStream = rawStream
.map(new SensorDataMapper()) // 解析原始数据
.keyBy(SensorData::getSensorId) // 按传感器ID分组
.process(new MovingAverageProcessor(10)); // 滤波(移动平均)
processedStream.addSink(new DatabaseSink()); // 存储至数据库
CREATE TABLE rf_sensor_data (
sensor_id text PRIMARY KEY,
temperature double,
humidity double,
signal_strength int,
timestamp timestamp
);
5) 【面试口播版答案】
“面试官您好,针对射频数据采集模块的设计,我核心思路是采用MQTT协议实现实时数据采集,通过流处理框架(如Flink)完成滤波、去噪等预处理,并利用事务性数据库保证数据一致性。具体来说,数据采集方面,我们选择MQTT是因为它轻量级、支持发布订阅,适合资源受限的射频传感器,比Modbus更高效。数据处理流程上,我们采用流处理,因为需要实时监控信号强度等数据,流处理能实现毫秒级延迟,而批处理更适合离线分析。数据一致性方面,我们使用事务性数据库(如Cassandra),通过ACID事务保证数据写入的一致性,同时结合Flink的事务性状态后端,确保处理结果不会丢失或重复。整体架构分为采集层(MQTT客户端)、处理层(Flink流处理)、存储层(事务性数据库),各层通过消息队列解耦,保证系统高可用。”
6) 【追问清单】
7) 【常见坑/雷区】