
1) 【一句话结论】:采用“边缘计算+实时流处理+云平台”的三层架构,通过边缘网关预处理并缓存数据,利用流处理引擎(如Flink)实现连续超阈值检测与状态管理,最终通过安全消息队列和推送服务将告警发送至管理人员手机,确保100个猪舍的实时监测与低延迟告警。
2) 【原理/概念讲解】:系统分为四层,逐层讲解:
3) 【对比与适用场景】:对比边缘计算(边缘网关+本地处理)与云端实时处理(Flink/Spark在云上):
| 对比项 | 边缘计算(边缘网关+本地处理) | 云端实时处理(Flink/Spark在云上) |
|---|---|---|
| 数据延迟 | 低(毫秒级,本地处理+本地缓存,网络传输延迟低) | 较高(网络传输+云处理延迟,秒级) |
| 带宽消耗 | 低(仅上传异常数据或聚合数据,如温度持续超阈值5分钟才上传一次聚合数据) | 高(全量数据上传,增加网络负载) |
| 可靠性 | 本地故障不影响整体,但单点故障风险(单个网关故障,对应猪舍数据丢失,需冗余网关) | 高(云服务冗余,如AWS EMR/Flink集群,故障转移) |
| 适用场景 | 对延迟敏感、数据量大的实时告警(如温度持续超阈值,需立即告警) | 数据量小、延迟要求不高的场景,或边缘设备资源有限(如计算能力不足) |
| 注意点 | 需考虑边缘设备计算能力、存储成本(如Redis缓存大小,需根据数据量调整),以及网络中断时的本地缓存策略(如Redis持久化,网络恢复后重传) | 需考虑网络稳定性,避免数据丢失(如消息队列持久化),且云服务成本较高(如Flink集群资源费用) |
4) 【示例】(伪代码,包含数据量计算与状态管理):
# 1. 边缘网关:数据采集与预处理(数据量计算示例)
# 假设每个猪舍3个传感器,100个猪舍=300个传感器,1分钟采集1次,数据量=300条/分钟
# LoRaWAN带宽:假设每个网关支持1Mbps,300条数据(每条约100字节)=30KB/分钟,远低于带宽限制
import json
import time
from redis import Redis
redis_client = Redis(host='edge_redis', db=0)
def calibrate_temperature(temp):
# 线性回归校准(示例系数,实际为模型)
return temp * 0.98
def collect_and_preprocess():
while True:
raw_data = {
"pigshed_id": 101,
"temperature": 31.2,
"humidity": 65,
"co2": 420,
"ammonia": 18
}
calibrated_temp = calibrate_temperature(raw_data["temperature"])
# 3σ异常值检测(正常温度20-28℃,σ=3)
if abs(calibrated_temp - 25) > 3 * 3:
continue # 过滤异常值
redis_client.rpush(f"pigshed_{raw_data['pigshed_id']}_data", json.dumps(raw_data))
kafka_producer.send("pigshed_data", json.dumps(raw_data))
time.sleep(60)
# 2. 流处理(Flink):连续超阈值检测与状态管理(状态管理逻辑)
from flink import Flink, TumblingWindow, KeyedState, ValueState
def process_stream(stream):
f = Flink()
keyed_stream = stream.key_by(lambda x: x["pigshed_id"])
temp_state = KeyedState[ValueState[Tuple[float, int]]] # (当前温度, 超阈值次数)
def process_element(element):
pigshed_id = element["pigshed_id"]
temp = element["temperature"]
state = temp_state.get(pigshed_id)
if state is None:
state = ValueState[Tuple[float, int]]((temp, 0))
temp_state.set(pigshed_id, state)
else:
current_temp, count = state.value()
if temp > 30:
count += 1
else:
count = 0 # 低于阈值重置计数
state.update((temp, count))
if count > 0:
alert = {
"pigshed_id": pigshed_id,
"alert_type": "temperature",
"value": temp,
"timestamp": element["timestamp"]
}
alert_stream.sink_kafka("alert_data", alert)
keyed_stream.map(process_element).execute()
# 3. 推送服务(MQTT安全传输)
def send_alert(alert):
mqtt_client = MQTTClient()
mqtt_client.tls_connect("broker.mqtt.com")
mqtt_client.publish("alert_topic", json.dumps(alert), qos=1)
5) 【面试口播版答案】:
“系统采用分层架构,分为边缘层、实时处理层、分析层和告警推送层。边缘层用传感器和低功耗网关(如LoRaWAN)采集数据,1分钟上传一次,先本地缓存并做校准和异常值检测;实时处理层用Flink消费数据流,通过5分钟时间窗口检测温度持续超阈值(如>30℃持续5分钟),并维护状态避免重复告警;分析层将告警写入Kafka,历史数据存时序数据库;最后通过TLS加密的MQTT将告警推送到手机。这样能支持100个猪舍,确保低延迟告警,且扩展性好。”
6) 【追问清单】:
7) 【常见坑/雷区】: