
1) 【一句话结论】
亿级用户实时消息系统通过“消息队列(Kafka)解耦并缓冲流量、分布式存储(MySQL+Redis)保障数据持久与低延迟、长连接WebSocket推送实现实时交互”,结合Kafka事务消息(本地事务+全局事务)确保数据最终一致性,通过动态缓存预热、重试机制处理边界场景,满足延迟<1秒、消息不丢失、高可用及峰值流量应对。
2) 【原理/概念讲解】
老师口吻解释关键组件:
id(主键)、from_user、to_user、content、created_at、status(0=待推送,1=已推送)。关键索引:复合索引to_user, created_at(按接收用户和时间排序,便于查询未推送消息)、from_user, created_at(按发送用户和时间排序,便于查询历史消息)。查询优化:使用覆盖索引(索引包含查询所需所有字段),避免回表;批量操作(如批量插入/更新)减少I/O。user_online:{user_id})、WebSocket连接ID(ws_conn:{user_id})、热点用户信息(hot_users)。动态预热:根据实时流量(如Kafka消息速率、WebSocket连接数)动态调整预热频率,例如当Kafka消息速率超过阈值时,增加Redis预热频率(如每分钟预热1000个热点用户),避免缓存雪崩。3) 【对比与适用场景】
对比Kafka事务消息与双写事务:
| 机制 | 定义 | 特性 | 适用场景 | 注意点 |
|---|---|---|---|---|
| Kafka事务消息 | Kafka内部事务,保证消息在Kafka和消费端(如数据库)的顺序与原子性 | 高吞吐、低延迟,适合大规模消息系统(如亿级用户实时消息) | 实时消息、日志系统 | 需Kafka 2.1+,消费端支持事务 |
| 双写事务(MySQL+Kafka) | 生产者先执行数据库事务插入,再写入Kafka,用数据库事务保证 | 强一致性,适合对数据一致性要求极高但吞吐稍低场景 | 金融交易、核心业务 | 依赖数据库事务,吞吐受数据库影响 |
4) 【示例】
CREATE TABLE messages (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
from_user BIGINT NOT NULL,
to_user BIGINT NOT NULL,
content TEXT NOT NULL,
created_at DATETIME NOT NULL,
status TINYINT DEFAULT 0 COMMENT '0=待推送,1=已推送',
INDEX idx_to_user_created_at (to_user, created_at),
INDEX idx_from_user_created_at (from_user, created_at)
);
# Kafka消费者配置
consumer:
group-id: chat-message-consumer
partitions: 1000 # 按用户ID哈希分区
concurrency: 2 # 每分区2个消费者,总消费者数=1000*2=2000
auto-offset-reset: earliest
# 生产者发送消息(事务消息)
def send_message(to_user_id, content):
# 启动Kafka事务
kafka_producer.begin_transaction()
try:
# 1. 写入Kafka(本地事务)
kafka_producer.send(
topic="chat_message",
key=f"user_{to_user_id}",
value=json.dumps({"from": "user_A", "to": to_user_id, "content": content})
)
# 2. 写入MySQL(全局事务)
db.execute_transaction(
f"INSERT INTO messages (from_user, to_user, content, created_at) "
f"VALUES ('user_A', {to_user_id}, '{content}', NOW())"
)
# 提交事务
kafka_producer.commit_transaction()
except Exception as e:
# 回滚事务
kafka_producer.abort_transaction()
raise e
def dynamic_preheat():
# 获取当前Kafka消息速率(每秒消息数)
kafka_rate = get_kafka_message_rate()
# 获取当前WebSocket连接数
ws_conn_count = get_ws_connection_count()
# 动态调整预热频率(如消息速率>10000/s时,预热频率增加)
if kafka_rate > 10000:
hot_users = db.query_hot_users(limit=1000) # 查询活跃用户
for user in hot_users:
redis.set(f"user_{user.id}", user.to_dict(), ex=3600) # 缓存1小时
5) 【面试口播版答案】
面试官您好,针对亿级用户实时消息系统,核心设计是构建一个高可用、低延迟的分布式架构。系统分为消息生产、处理、存储和推送四大模块。消息生产端通过WebSocket长连接接收用户输入,发送到消息队列(如Kafka),保证消息不丢失且解耦。消息处理服务从队列消费消息,通过Kafka事务消息(本地事务+全局事务)确保消息先写入Kafka再写入MySQL,实现数据最终一致性。同时,将用户信息缓存到Redis,提升读取速度。推送服务通过长连接WebSocket与客户端保持连接,实时推送消息。为了应对峰值流量,消息队列作为缓冲层,设置容量上限(如10万条/秒)并配合限流(令牌桶算法),同时部署多副本服务。高可用方面,数据库主从复制(主节点写,从节点读,故障自动切换),消息队列多节点集群(Kafka),推送服务集群(负载均衡),故障时自动切换,确保服务不中断。延迟控制通过异步处理和直接推送,确保延迟通常控制在1秒内(极端网络拥堵时采用消息分片推送,降低单次延迟)。边界场景如WebSocket断开,系统会通过心跳检测自动重连;Redis缓存击穿时,根据实时流量动态预热(如消息速率高时增加预热频率),提前加载热点用户数据。总结来说,通过解耦、缓存、分布式存储和负载均衡,满足亿级用户实时消息的需求。
6) 【追问清单】
7) 【常见坑/雷区】