
1) 【一句话结论】:采用流式实时数仓架构,以Kafka作为数据采集缓冲层,Flink作为实时计算引擎(支持事件时间与状态计算),ClickHouse作为列式存储(按地域+时间分区),通过Kafka事务+ Flink Exactly-Once(ATLE)保证数据一致性,统一UTC时区处理全球数据,实现交易量、客户流失率等指标的实时监控与查询。
2) 【原理/概念讲解】:老师口吻解释各层核心逻辑:
3) 【对比与适用场景】:
| 层级 | 方案 | 定义 | 特性 | 使用场景 | 注意点 |
|---|---|---|---|---|---|
| 数据采集 | Kafka | 分布式消息队列,基于日志持久化 | 高吞吐(百万级TPS)、高容错(副本)、支持事务(0.11+) | 业务系统实时数据流(交易、日志) | 分区数=ceil(TPS/(目标吞吐量/分区数)),副本数=3,避免积压 |
| 数据处理 | Flink | 分布式流处理引擎 | 低延迟(亚秒)、状态计算、窗口聚合、事件时间 | 实时计算指标(聚合、过滤) | 状态管理:本地磁盘(小状态)或分布式存储(大状态);watermark延迟容忍窗口(如10秒) |
| 数据存储 | ClickHouse | 列式数据库 | 高效分析查询(聚合)、实时写入、表分区 | 实时指标存储与查询 | 列式存储适合聚合,按地域/时区分区优化查询;不适合随机写(插入单条慢) |
| 数据查询 | ClickHouse SQL | SQL接口 | 实时查询(亚秒)、复杂聚合 | 业务系统查询指标 | 分区策略影响性能(按区域分区后,聚合查询更快) |
4) 【示例】:
from kafka import KafkaProducer
import json
from datetime import datetime, timezone
producer = KafkaProducer(bootstrap_servers='kafka:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
def process_data(data):
local_dt = datetime.strptime(data['timestamp'], "%Y-%m-%d %H:%M:%S")
utc_dt = local_dt.replace(tzinfo=timezone.utc)
data['timestamp'] = utc_dt.isoformat()
return data
data = {"transaction_id": 1, "amount": 100, "customer_id": 101, "timestamp": "2024-01-01 10:00:00", "region": "Asia", "user_event": "login"}
processed = process_data(data)
producer.send('global_events', value=processed)
DataStream<Transaction> transactionStream = env
.socketTextStream("localhost", 9999)
.map(new SimpleStringToTransactionMapper());
DataStream<UserEvent> userEventStream = env
.socketTextStream("localhost", 9998)
.map(new SimpleStringToUserEventMapper());
DataStream<User> activeUsers = transactionStream
.keyBy(t -> t.customer_id)
.flatMap(new UserActivityMapper())
.union(userEventStream
.keyBy(e -> e.customer_id)
.flatMap(new UserActivityMapper()));
DataStream<User> activeCount = activeUsers
.keyBy(u -> u.customer_id)
.window(TumblingEventTimeWindow.of(Time.days(7)))
.reduce(new ActiveDaysReducer());
DataStream<User> activeUsersStream = activeCount
.filter(u -> u.active_days >= 1)
.map(new ActiveUserMapper());
DataStream<ChurnRate> churnRate = activeUsersStream
.keyBy(u -> u.region)
.window(TumblingEventTimeWindow.of(Time.hours(1)))
.reduce(new ChurnRateReducer());
churnRate.addSink(new ClickHouseSink("clickhouse:8123", "real_time_churn_rate", "region"));
CREATE TABLE real_time_churn_rate (
region String,
hour UInt32,
churn_rate Float64,
PRIMARY KEY (region, hour) WITH TTL (1 hour)
) ENGINE = MergeTree
ORDER BY (region, hour);
SELECT region, hour, churn_rate
FROM real_time_churn_rate
WHERE hour >= now() - interval 1 hour;
5) 【面试口播版答案】:面试官您好,针对实时监控全球业务指标的需求,我设计的实时数仓架构分为四层:数据采集、处理、存储和查询。首先,数据采集层采用Kafka作为消息队列,接收各业务系统的实时数据流(如交易、用户行为、CRM数据),通过生产者写入Kafka主题,消费者(Flink)订阅。处理层用Flink计算指标,比如按小时窗口聚合交易量,同时处理数据清洗(过滤无效交易、填充缺失客户ID),并统一时区为UTC(将本地时区数据转换为UTC时间)。存储层用ClickHouse列式数据库,按地域(如Asia、Europe)或业务线分区存储数据,优化查询性能。为保证实时性和一致性,Kafka通过0.11+事务机制确保生产者写入不丢失,Flink结合Exactly-Once语义(ATLE)保证数据不重复,针对客户流失率等复杂指标,基于用户行为序列计算(如连续7天活跃的用户为活跃用户,流失率=1-当前活跃用户数/历史总用户数),最终实现全球业务指标的实时监控与查询。
6) 【追问清单】:
7) 【常见坑/雷区】: