
采用分布式流处理架构,以Kafka(数据采集+异常过滤)、Flink(实时计算+状态管理)、Redis集群(结果发布+高并发)为核心,通过水平扩展、流控及成分股平滑过渡算法,确保交易高峰下低延迟(亚秒级)和高可用,同时处理异常交易数据与成分股定期调整。
老师口吻解释关键环节:
| 组件 | 定义 | 特性 | 使用场景 | 注意点 |
|---|---|---|---|---|
| Kafka | 分布式消息队列 | 高吞吐、持久化、容错(副本机制),支持过滤(如异常数据过滤) | 数据采集(交易所数据接入)、日志收集 | 分区数需根据流量动态调整,避免消息堆积;需管理存储空间 |
| Flink | 流处理引擎 | 低延迟(亚秒级)、状态管理、Exactly-Once语义、支持窗口计算 | 实时计算(指数、风控)、成分股调整处理 | 并行度配置需匹配CPU核心数(如8核任务分配8并行子任务),避免资源浪费 |
| Redis集群 | 内存数据库(主从复制) | 高并发读写、缓存、持久化(RDB/AOF) | 结果发布、实时查询 | 集群模式需主从复制,确保高可用;内存有限需合理设置数据过期策略 |
| 数据清洗模块 | 异常数据处理逻辑 | 过滤/修正异常数据(如价格/成交量异常、交易时间异常) | 避免指数计算错误 | 需定义异常阈值(如价格>1000元视为异常),修正逻辑(如用前一个有效值填充) |
from kafka import KafkaConsumer
import redis
import json
consumer = KafkaConsumer('trade_topic', bootstrap_servers='kafka:9092', group_id='cleaner')
r = redis.Redis(host='redis:6379')
for msg in consumer:
trade = json.loads(msg.value)
if trade['price'] > 1000 or trade['volume'] < 0: # 过滤异常
continue
r.publish('cleaned_trade', json.dumps(trade))
DataStream<Trade> cleaned = env.readStream("kafka", "cleaned_trade", "group.id=cleaner")
.map(new MapFunction<String, Trade>() {
@Override
public Trade map(String v) throws Exception {
return new Trade(...);
}
});
DataStream<IndexResult> index = cleaned
.keyBy(Trade::getSecId)
.timeWindow(Time.seconds(1))
.process(new ProcessWindowFunction<Trade, IndexResult, String, TimeWindow>() {
double sum = 0, vol = 0;
@Override
public void process(String key, Context ctx, Iterable<Trade> ts, Collector<IndexResult> out) throws Exception {
for (Trade t : ts) {
sum += t.price * t.volume;
vol += t.volume;
}
double idx = sum / vol;
out.collect(new IndexResult(key, idx, ctx.window().end()));
}
});
DataStream<IndexResult> smoothed = index
.process(new ProcessWindowFunction<IndexResult, IndexResult, String, TimeWindow>() {
double lastIdx = 0;
@Override
public void process(String key, Context ctx, Iterable<IndexResult> irs, Collector<IndexResult> out) throws Exception {
for (IndexResult ir : irs) {
double newIdx = 0.7 * lastIdx + 0.3 * ir.getIndex();
out.collect(new IndexResult(key, newIdx, ir.getTimestamp()));
lastIdx = newIdx;
}
}
});
smoothed.addSink(new RedisSink("redis://redis-master:6379,redis-slave1:6379", "index"));
import redis
r = redis.Redis(host='redis-master', port=6379)
def publish(idx):
r.publish('index_topic', json.dumps(idx))
“面试官您好,我设计的系统采用分布式流处理架构,核心是数据采集、异常清洗、实时计算、平滑发布四部分。首先,数据采集用Kafka接收交易所逐笔成交数据,但先通过数据清洗模块过滤异常价格/成交量(如价格>1000元或成交量负数),避免影响计算。然后,用Flink处理清洗后的数据,计算中证500指数(加权平均,基于成分股市值),支持状态管理和Exactly-Once语义,低延迟(亚秒级)。计算结果存入Redis集群(主从复制),通过Redis Pub/Sub发布实时指数。针对交易高峰,通过增加Kafka分区数(每个分区处理更高吞吐)、Flink并行度(每个任务分配8个并行子任务,匹配8核CPU)、Redis集群节点(3主3从)来应对性能压力,同时处理成分股定期调整(采用指数平滑算法,如0.7旧指数+0.3新计算指数,平滑过渡新成分股权重)。这样确保系统在交易高峰期稳定运行,延迟低且数据准确,满足实时发布需求。”