
1) 【一句话结论】
针对证券市场Tick级行情数据,系统以交易所流式API为采集入口,通过Kafka解耦缓冲,Flink结合RocksDB状态管理处理数据,Redis Cluster缓存高频数据,实现毫秒级延迟、99.99%高可用及交易-清算数据一致性,并采用流削峰策略应对交易高峰。
2) 【原理/概念讲解】
老师口吻解释关键组件:
3) 【对比与适用场景】
| 组件 | 定义 | 特性 | 使用场景 | 注意点 |
|---|---|---|---|---|
| 交易所流式API | 交易所提供的WebSocket实时数据推送接口 | 低延迟(毫秒级),实时推送 | 数据采集端,直接获取原始Tick数据 | 需与交易所对接,延迟受交易所限制 |
| Kafka | 分布式消息队列 | 高吞吐(百万级TPS)、持久化、多副本、高可用 | 实时数据管道,解耦采集与处理 | 需管理分区、副本因子,避免数据丢失;网络分区时需考虑重试 |
| RabbitMQ | 点对点/发布订阅消息队列 | 基于消息确认,延迟比Kafka高 | 小规模系统,简单消息传递 | 不适合高吞吐实时场景,延迟约10-50ms |
| Flink | 流处理框架 | 事件时间、状态管理、低延迟(毫秒级)、容错 | 实时计算,需状态持久化 | 需配置检查点(如1秒间隔),RocksDB持久化状态;状态大小影响检查点时间 |
| Spark Streaming | 微批处理框架 | 大规模数据,对延迟容忍(秒级) | 大规模数据,对延迟容忍 | 延迟比Flink高,适合离线处理或准实时 |
| Redis Cluster | 分布式缓存系统 | 高并发读写、数据分片、高可用 | 缓存高频数据,减少后端压力 | 需预热热点key,配置限流防雪崩;网络延迟影响响应时间 |
| 单机Redis | 单机缓存系统 | 低延迟,但扩展性差 | 小规模场景 | 高峰时易崩溃,不适合高并发 |
4) 【示例】
import json
from kafka import KafkaProducer
import websocket
producer = KafkaProducer(bootstrap_servers='kafka:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
def on_message(ws, message):
tick = json.loads(message)
# 批量生产,减少网络调用
producer.send('tick_data', tick, partition=0)
# 设置批量发送间隔,如5ms
producer.flush()
from pyflink import StreamExecutionEnvironment
from pyflink.table import *
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(8) # 根据数据量动态调整,如高峰时增加至16
t_env = StreamTableEnvironment.create(env)
# 读取Kafka主题
source = t_env.from_stream(
env.from_collection([("tick", "topic1")], type_info=TypeInfo.of(StringType(), StringType())),
'tick_data'
)
cleared = t_env.from_stream(
env.from_collection([("clear", "topic2")], type_info=TypeInfo.of(StringType(), StringType())),
'clear_data'
)
joined = source.join(cleared).where("tick.trade_id = clear.trade_id")
windowed = joined.window(TumblingEventTimeWindow.of(Time.seconds(1)))
result = windowed.group_by().select(
'trade_id', 'price', 'volume',
'sum(volume) as total_volume'
)
# 写入InfluxDB
result.to_append_stream().sink_to_influxdb('influxdb:8086', 'tick_data')
# 状态持久化配置(RocksDB)
env.get_config().set_value("statebackend.rocksdb.path", "/tmp/flink-state")
env.get_config().set_value("statebackend.rocksdb.checkpoint-interval", "1000") # 1秒检查点
env.get_config().set_value("statebackend.rocksdb.memory", "1g") # 内存配置
from rediscluster import RedisCluster
redis_client = RedisCluster(startup_nodes=[{'host': 'redis1', 'port': '6379'}, ...], max_connections=1000)
def get_cached_data(key):
# 限流(防雪崩)
if redis_client.get('rate_limit:tick') is None:
redis_client.set('rate_limit:tick', 1, ex=1) # 1秒内限1次
# 热点key预热
if not redis_client.exists(key):
hot_keys = ['tick:000001', 'tick:600519']
for k in hot_keys:
redis_client.set(k, 'preheated', ex=60) # 预热60秒
return redis_client.get(key)
5) 【面试口播版答案】
“面试官您好,我设计的系统核心是:采集端用交易所流式API(如WebSocket)实时推送Tick数据,通过批量写入Kafka减少网络开销,确保采集延迟在毫秒级;处理端用Flink,配置RocksDB持久化状态,支持事件时间语义,保证状态恢复可靠且延迟低;缓存用Redis Cluster存储高频行情数据,做热点key预热和限流防雪崩,避免缓存失效时冲击后端;存储用时序数据库按时间分区存储数据。高可用方面,Kafka和Flink部署多副本(如3副本),故障时自动切换,满足99.99%可用。数据一致性通过交易ID关联交易与清算数据,若延迟导致数据不匹配,启动补偿任务重试关联或回滚数据,保证最终一致性。应对9:30-15:00交易高峰,用流削峰策略(调整Flink并行度、Redis缓存压力),确保系统在高峰时仍维持毫秒级延迟。”
6) 【追问清单】
7) 【常见坑/雷区】