51mee - AI智能招聘平台Logo
模拟面试题目大全招聘中心会员专区

设计一个实时猪舍环境监测与预警系统,需要支持多猪舍(如100个猪舍),每个猪舍部署温湿度、二氧化碳、氨气等传感器,数据采集频率为1分钟一次。系统需实时处理数据并触发告警(如温度过高、氨气浓度超标),告警信息需发送至养殖管理人员手机。请描述系统架构、数据流、关键技术选型及扩展性考虑。

牧原算法工程师难度:困难

答案

1) 【一句话结论】:采用“边缘计算+实时流处理+云平台”的三层架构,通过边缘网关预处理并缓存数据,利用流处理引擎(如Flink)实现连续超阈值检测与状态管理,最终通过安全消息队列和推送服务将告警发送至管理人员手机,确保100个猪舍的实时监测与低延迟告警。

2) 【原理/概念讲解】:系统分为四层,逐层讲解:

  • 边缘层:每个猪舍部署温湿度、CO₂、氨气传感器(假设每个猪舍3个传感器),通过低功耗网关(如LoRaWAN,适合长距离、低功耗,或4G,适合数据量大的场景)采集数据,1分钟一次。数据先本地缓存(如Redis,缓存大小根据数据量计算,如每个猪舍缓存最近100条数据,100个猪舍约3万条,Redis内存足够),再上传。预处理步骤包括校准(线性回归校准温度传感器,消除漂移)和异常值检测(3σ原则,过滤突变值,避免误告警,如温度突然跳到50℃属于异常,不传输)。
  • 实时处理层:采用Apache Flink,消费边缘网关上传的清洗后数据流。核心是状态管理:使用时间窗口(如5分钟Tumbling Window)检测温度持续超阈值(如>30℃持续5分钟),并维护状态(如当前猪舍的告警状态,避免重复告警)。规则引擎存储可配置的阈值(如温度阈值、氨气阈值),流处理引擎根据规则生成告警事件。状态管理逻辑:当温度恢复正常(<30℃),重置超阈值计数,确保仅持续超阈值时触发告警。
  • 分析层:告警事件写入Kafka(持久化,确保不丢失),历史数据存储在时序数据库(如InfluxDB,使用Shard Key按猪舍ID分片,支持按时间、猪舍查询,写入性能优化,如批量写入减少I/O)。
  • 告警与推送层:告警事件通过Kafka消费,由推送服务(如MQTT Broker,通过TLS加密传输,确保安全)将告警内容(含猪舍ID、告警类型、阈值、时间)推送到管理人员手机APP,实现即时通知。

3) 【对比与适用场景】:对比边缘计算(边缘网关+本地处理)与云端实时处理(Flink/Spark在云上):

对比项边缘计算(边缘网关+本地处理)云端实时处理(Flink/Spark在云上)
数据延迟低(毫秒级,本地处理+本地缓存,网络传输延迟低)较高(网络传输+云处理延迟,秒级)
带宽消耗低(仅上传异常数据或聚合数据,如温度持续超阈值5分钟才上传一次聚合数据)高(全量数据上传,增加网络负载)
可靠性本地故障不影响整体,但单点故障风险(单个网关故障,对应猪舍数据丢失,需冗余网关)高(云服务冗余,如AWS EMR/Flink集群,故障转移)
适用场景对延迟敏感、数据量大的实时告警(如温度持续超阈值,需立即告警)数据量小、延迟要求不高的场景,或边缘设备资源有限(如计算能力不足)
注意点需考虑边缘设备计算能力、存储成本(如Redis缓存大小,需根据数据量调整),以及网络中断时的本地缓存策略(如Redis持久化,网络恢复后重传)需考虑网络稳定性,避免数据丢失(如消息队列持久化),且云服务成本较高(如Flink集群资源费用)

4) 【示例】(伪代码,包含数据量计算与状态管理):

# 1. 边缘网关:数据采集与预处理(数据量计算示例)
# 假设每个猪舍3个传感器,100个猪舍=300个传感器,1分钟采集1次,数据量=300条/分钟
# LoRaWAN带宽:假设每个网关支持1Mbps,300条数据(每条约100字节)=30KB/分钟,远低于带宽限制
import json
import time
from redis import Redis

redis_client = Redis(host='edge_redis', db=0)

def calibrate_temperature(temp):
    # 线性回归校准(示例系数,实际为模型)
    return temp * 0.98

def collect_and_preprocess():
    while True:
        raw_data = {
            "pigshed_id": 101,
            "temperature": 31.2,
            "humidity": 65,
            "co2": 420,
            "ammonia": 18
        }
        
        calibrated_temp = calibrate_temperature(raw_data["temperature"])
        
        # 3σ异常值检测(正常温度20-28℃,σ=3)
        if abs(calibrated_temp - 25) > 3 * 3:
            continue  # 过滤异常值
        
        redis_client.rpush(f"pigshed_{raw_data['pigshed_id']}_data", json.dumps(raw_data))
        kafka_producer.send("pigshed_data", json.dumps(raw_data))
        time.sleep(60)

# 2. 流处理(Flink):连续超阈值检测与状态管理(状态管理逻辑)
from flink import Flink, TumblingWindow, KeyedState, ValueState

def process_stream(stream):
    f = Flink()
    keyed_stream = stream.key_by(lambda x: x["pigshed_id"])
    temp_state = KeyedState[ValueState[Tuple[float, int]]]  # (当前温度, 超阈值次数)
    
    def process_element(element):
        pigshed_id = element["pigshed_id"]
        temp = element["temperature"]
        
        state = temp_state.get(pigshed_id)
        if state is None:
            state = ValueState[Tuple[float, int]]((temp, 0))
            temp_state.set(pigshed_id, state)
        else:
            current_temp, count = state.value()
            
            if temp > 30:
                count += 1
            else:
                count = 0  # 低于阈值重置计数
            state.update((temp, count))
            
            if count > 0:
                alert = {
                    "pigshed_id": pigshed_id,
                    "alert_type": "temperature",
                    "value": temp,
                    "timestamp": element["timestamp"]
                }
                alert_stream.sink_kafka("alert_data", alert)
    
    keyed_stream.map(process_element).execute()

# 3. 推送服务(MQTT安全传输)
def send_alert(alert):
    mqtt_client = MQTTClient()
    mqtt_client.tls_connect("broker.mqtt.com")
    mqtt_client.publish("alert_topic", json.dumps(alert), qos=1)

5) 【面试口播版答案】:
“系统采用分层架构,分为边缘层、实时处理层、分析层和告警推送层。边缘层用传感器和低功耗网关(如LoRaWAN)采集数据,1分钟上传一次,先本地缓存并做校准和异常值检测;实时处理层用Flink消费数据流,通过5分钟时间窗口检测温度持续超阈值(如>30℃持续5分钟),并维护状态避免重复告警;分析层将告警写入Kafka,历史数据存时序数据库;最后通过TLS加密的MQTT将告警推送到手机。这样能支持100个猪舍,确保低延迟告警,且扩展性好。”

6) 【追问清单】:

  • 问题1:数据延迟如何保证?
    回答要点:边缘网关本地缓存+流处理低延迟,目标将告警延迟控制在3-5秒内(实际受网络抖动、设备负载影响,优化措施包括增加网关带宽、优化流处理并行度)。
  • 问题2:网络中断时数据如何处理?
    回答要点:边缘网关本地缓存数据(如Redis,支持持久化),网络恢复后通过重试机制(如指数退避)重传,避免数据丢失。
  • 问题3:系统如何扩展?
    回答要点:边缘网关水平扩展(增加网关数量,覆盖更多猪舍),流处理集群扩容(每个猪舍对应1个Flink实例,100个猪舍需100+实例,通过Kafka分区实现负载均衡),消息队列水平扩展(Kafka分区数与猪舍数匹配)。
  • 问题4:告警策略如何配置?
    回答要点:规则引擎支持配置阈值(如温度、氨气阈值),可动态调整,支持多级告警(轻度、重度),管理员可通过Web界面配置。
  • 问题5:容错与恢复?
    回答要点:流处理框架Checkpoint(Flink的Checkpoint机制),消息队列持久化(Kafka的日志持久化),确保数据不丢失,系统故障后快速恢复(如Flink实例重启后从Checkpoint恢复状态)。

7) 【常见坑/雷区】:

  • 坑1:忽略传感器数据校准:未校准温度传感器,导致数据漂移,误判温度过高,引发误告警。
  • 坑2:网络中断处理不足:未考虑本地缓存,数据丢失或告警延迟,影响系统可靠性。
  • 坑3:边缘设备选型不当:用4G网关但猪舍分散,导致成本高或覆盖不足;或用LoRaWAN但数据量过大,带宽不足,需增加网关数量。
  • 坑4:告警逻辑简单化:未考虑连续超阈值(如温度持续超阈值5分钟才告警),导致漏报,错过关键告警。
  • 坑5:扩展性考虑不足:未说明流处理集群规模(如100个猪舍需多少Flink实例),导致实际部署时资源不足,影响性能。
51mee.com致力于为招聘者提供最新、最全的招聘信息。AI智能解析岗位要求,聚合全网优质机会。
产品招聘中心面经会员专区简历解析Resume API
联系我们南京浅度求索科技有限公司admin@51mee.com
联系客服
51mee客服微信二维码 - 扫码添加客服获取帮助
© 2025 南京浅度求索科技有限公司. All rights reserved.
公安备案图标苏公网安备32010602012192号苏ICP备2025178433号-1