
采用“流式采集+实时计算(Flink加权平均处理)+分层存储(InfluxDB+ClickHouse)”的分布式架构,通过数据清洗(基于业务统计的3σ异常值检测)、动态资源扩展(Kafka分区、Flink并行度)、Exactly-once语义与多副本同步保障数据一致性,实现高频交易数据的实时处理、高可用与低延迟。
老师:构建实时数仓需解决数据实时性、质量、容错三大问题。
replication.factor=3,分区数按交易量动态调整,如每个主题100分区)采集,解耦生产者与消费者。env.set_stream_time_characteristic("event_time")),Watermark阈值5秒(延迟容忍窗口)。计算逻辑:状态管理(缓存股票实时价格、成交量),1秒滑动窗口聚合(计算加权平均价格:SUM(price*volume)/SUM(volume),而非简单平均),用于实时指数计算。partition by symbol, toYYYYMMDD(timestamp),支持复杂SQL分析历史数据,列式存储优化查询性能)。max.message.age.ms=10000)、Flink状态快照(每5分钟保存一次,checkpoint.interval=300s),确保数据不丢失且计算不中断。(类比:Kafka是“智能中转站”,过滤异常数据后分发;Flink是“实时加工厂”,按时间窗口高效处理加权平均;InfluxDB是“实时数据仓库”,快速查询最新行情;ClickHouse是“分析工具库”,支持复杂历史数据分析。)
| 模块/组件 | 技术选型 | 定义/特性 | 使用场景 | 注意点 |
|---|---|---|---|---|
| 数据采集 | Apache Kafka | 分布式消息队列,高吞吐、持久化、多副本 | 高频交易数据采集(订单、成交),解耦生产者与消费者 | 配置分区数(根据交易量,如每个主题100分区)、副本因子(3,保障高可用),避免数据丢失 |
| 数据清洗 | 自定义逻辑(Flink UDF) | 格式验证、异常值检测(3σ原则,基于历史数据统计) | 确保数据质量,过滤无效数据 | 异常值处理逻辑(丢弃或标记,如price < 0或volume > mean(volume)*3则丢弃) |
| 实时计算 | Apache Flink | 流处理引擎,支持状态管理、事件时间、窗口计算 | 实时指数计算(加权平均)、行情指标 | 配置并行度(集群资源调整,如env.set_parallelism(24)),Watermark阈值(5秒),避免数据乱序 |
| 实时存储 | InfluxDB (时序) | 时间序列存储,低延迟查询,时间分区 | 存储实时聚合数据(如每秒交易量) | 按时间分区(如按小时),压缩算法(ZSTD),减少存储成本 |
| 汇总存储 | ClickHouse (宽表) | 分布式列式数据库,支持复杂SQL,多维度分区 | 存储汇总数据(日度指数、历史行情) | 按股票代码+时间分区(如partition by symbol, toYYYYMMDD(timestamp)),支持多维度分析 |
| 高可用保障 | 多副本+集群 | Kafka多副本(3)、Flink集群(3节点)、存储多副本 | 系统故障时数据不丢失、计算不中断 | 监控集群状态,故障节点自动恢复 |
伪代码展示数据清洗、加权平均计算与存储:
from pyflink import StreamExecutionEnvironment
from pyflink.table import *
from pyflink.table.window import Tumble
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(24) # 集群3节点,每个节点8任务,总并行度24
env.set_stream_time_characteristic("event_time")
table_env = StreamTableEnvironment.create(env)
# 1. 读取Kafka并清洗数据
table_env.connect(
Kafka()
.setBootstrapServers("kafka:9092")
.setTopic("trade_stream")
.setProperties("group.id", "trade_index")
.start_from_latest()
).in_schema(schema).to_table(table_env.from_path("raw_trades"))
schema = TableSchema()
schema.add("symbol", "VARCHAR")
schema.add("price", "DECIMAL(18,2)")
schema.add("volume", "BIGINT")
schema.add("timestamp", "TIMESTAMP")
# 2. 定义清洗逻辑(基于3σ原则)
def detect_abnormal(row):
price = row["price"]
volume = row["volume"]
# 假设历史数据统计:price_mean=10, price_std=2;volume_mean=1e6, volume_std=2e5
if price < 0 or volume < 0:
return None
if abs(price - 10) > 3*2 or volume > 1e6 + 3*2e5: # 3σ
return {"symbol": row["symbol"], "price": row["price"], "volume": row["volume"], "timestamp": row["timestamp"], "is_abnormal": True}
return row
# 3. 应用清洗逻辑
cleaned = table_env.from_path("raw_trades").select(
detect_abnormal("raw_trades")
).filter("is_abnormal is null")
cleaned.create_temporary_view("cleaned_trades")
# 4. 实时计算:1秒滑动窗口加权平均
result = table_env.sql_query("""
SELECT
symbol,
SUM(price * volume) AS total_price_volume,
SUM(volume) AS total_volume,
SUM(price * volume) / SUM(volume) AS weighted_avg_price,
CURRENT_TIMESTAMP() AS now
FROM cleaned_trades
GROUP BY symbol, TUMBLE(window, INTERVAL '1' SECOND)
""")
# 5. 写入InfluxDB
result.to_append_stream().write_to_influxdb("influxdb:8086", "index_data", "measurement")
各位面试官好,针对中证数据构建实时处理高频交易数仓的需求,我设计的系统整体架构是采用“流式采集+实时计算(Flink加权平均处理)+分层存储”的分布式方案。具体来说:
数据采集层用Apache Kafka作为消息队列,首先对高频交易数据进行清洗(如格式验证、异常值检测,基于历史数据统计的3σ原则,比如价格或成交量超出历史均值±3倍标准差则丢弃),然后通过Kafka(配置副本因子3,分区数根据交易量动态调整,如每个主题100分区)采集数据。
实时计算层选用Apache Flink,配置并行度为24(集群3节点,每个节点8任务),支持事件时间处理,设置Watermark阈值5秒,处理数据乱序。计算逻辑包括状态管理(缓存股票实时价格、成交量)和1秒滑动窗口聚合(计算加权平均价格:SUM(price*volume)/SUM(volume)),用于实时指数计算。
存储层分为两部分:实时聚合数据存入InfluxDB(按时间分区,支持低延迟查询最新行情,压缩算法ZSTD);汇总数据存入ClickHouse(按股票代码+时间分区,支持复杂SQL分析历史数据)。
为保证高可用,Kafka多副本部署,Flink集群3节点,存储多节点副本同步;低延迟通过流处理减少数据落地延迟;容错性方面,Kafka消息重试、Flink状态快照(每5分钟保存一次)和任务重分配,确保数据不丢失且计算不中断。这样既能满足实时行情和指数计算的需求,又能保障系统稳定运行。