
1) 【一句话结论】
采用分层架构,通过数据采集层适配多格式数据并解耦,处理层用流处理引擎实时转换,存储层结合时序数据库(存原始IoT数据)与关系型数据库(存元数据),结合消息队列和事务机制保证数据一致性与实时同步。
2) 【原理/概念讲解】
老师解释各层职责:
3) 【对比与适用场景】
| 架构组件 | 定义 | 特性 | 使用场景 | 注意点 |
|---|---|---|---|---|
| 消息队列(如Kafka) | 分布式消息系统,提供高吞吐、低延迟异步通信 | 基于发布-订阅模式,持久化消息,支持多消费者 | 数据采集层与处理层解耦,缓冲数据,处理高并发 | 需考虑消息积压,确保消息不丢失 |
| 流处理引擎(如Flink) | 实时计算框架,支持流数据处理与批处理 | 毫秒级低延迟,支持状态管理、容错处理 | 实时数据清洗、转换、聚合(如设备状态实时监控) | 需合理配置并行度,避免资源浪费 |
| 时序数据库(如InfluxDB) | 专为时间序列数据设计的数据库 | 高吞吐、高查询性能,支持时间切片、聚合 | 存储IoT设备实时数据(如传感器读数、机械位置) | 适合高频率数据,不适合复杂关联查询 |
| 关系型数据库(如MySQL) | 传统关系型数据库,支持结构化数据存储 | 事务支持(ACID),支持复杂查询 | 存储设备元数据(如设备ID、供应商、数据格式定义) | 适合数据关联查询,写入延迟较高 |
4) 【示例】
伪代码示例(数据采集适配器+处理层):
# 数据采集适配器(JSON格式)
def json_adapter(data):
device_id = data['device_id']
ts = data['timestamp']
value = data['value']
return {'device_id': device_id, 'ts': ts, 'value': value, 'format': 'json'}
# Kafka生产者
def send_to_kafka(data, topic='iot_data'):
producer = KafkaProducer(bootstrap_servers='kafka:9092')
producer.send(topic, value=data.encode('utf-8'))
producer.flush()
# Flink处理逻辑(伪代码)
def process_stream(stream):
for record in stream:
data = json.loads(record.value.decode('utf-8'))
# 写入InfluxDB(时序数据库)
influx_client.write(data['device_id'], data['ts'], data['value'])
# 写入MySQL(关系型数据库)
mysql_client.insert(device_id=data['device_id'], ts=data['ts'], value=data['value'])
# 确认消息处理
stream.commit()
5) 【面试口播版答案】
“面试官您好,针对中铁建项目管理系统整合不同供应商IC设备数据的需求,我设计的数据库架构分为三层:数据采集层、处理层和存储层。首先,数据采集层通过适配器解析JSON、Protobuf、二进制等不同格式的数据,将标准化数据发送到消息队列(如Kafka),实现设备与处理层的解耦,避免直接耦合导致系统扩展性问题。处理层采用流处理引擎(如Flink),实时处理数据,进行数据清洗、格式转换和聚合(比如设备状态实时监控),然后将数据写入存储层。存储层结合时序数据库(如InfluxDB)存储IoT设备的原始时间序列数据(如传感器温度、机械位置),支持高频率时间序列查询;关系型数据库(如MySQL)存储设备元数据(设备ID、供应商信息、数据格式定义),用于数据关联和复杂查询。为保证数据实时同步,消息队列作为缓冲层,削峰填谷,确保数据采集与处理层之间的延迟在毫秒级(考虑网络延迟、设备采集延迟等实际因素);为保证数据一致性,处理层在写入时序数据库和关系型数据库时采用事务机制,并确认消息已成功处理,确保数据不丢失且同步。这样既能处理多格式数据,又能保证实时同步和数据一致性。”
6) 【追问清单】
7) 【常见坑/雷区】