
在双11红包大促中,我负责的消息推送模块通过引入Kafka消息队列异步解耦、动态令牌桶限流控制请求速率、以及基于失败率的熔断机制,成功应对百万级并发。压力测试显示,模拟100万并发请求时,平均响应时间从500ms降至50ms,失败率从5%降至0.1%,活动期间用户红包通知延迟低于1秒,用户满意度提升显著。
高并发场景下,用户同时点击领取红包时,消息推送接口瞬间承受海量请求(如百万级),导致接口响应超时甚至服务崩溃。为解决此问题,需理解以下核心机制:
| 方案 | 定义 | 特性 | 使用场景 | 注意点 |
|---|---|---|---|---|
| 消息队列(异步处理) | 将请求异步写入队列,消费者异步处理 | 解耦、削峰、可水平扩展 | 高并发推送、通知等场景(如红包、订单确认) | 需考虑消息丢失(设置重试机制)、延迟(调整消费者数量和批处理大小),避免队列积压导致延迟过高 |
| 限流(令牌桶) | 控制请求进入速率 | 限制流量,保护系统 | 防止流量过大导致系统崩溃(如秒杀、大促) | 需动态调整阈值(根据系统负载变化),避免误判(突发流量时允许少量超额),防止限流后用户请求被拒绝 |
| 熔断 | 服务失败率超阈值时暂时拒绝请求 | 防雪崩效应 | 依赖服务不稳定时(如第三方推送SDK故障) | 需合理设置阈值(如失败率50%),避免误触发(失败率波动时逐步调整),恢复策略需平滑(按比例增加请求量) |
伪代码(消息推送异步流程):
# 用户点击领取红包,将推送任务写入Kafka队列
def handle_receive_redpack(user_id, redpack_id):
# 限流:检查令牌桶,不足则拒绝或排队
if not token_bucket.check():
return "请求过多,请稍后重试"
# 将推送任务写入Kafka(高吞吐,支持百万级消息)
kafka_producer.send("push_task", value={"user_id": user_id, "redpack_id": redpack_id})
return "领取成功,通知中"
# 推送服务(消费者)消费队列消息,调用推送SDK
def consume_push_task():
while True:
msg = kafka_consumer.poll(timeout_ms=100)
for record in msg:
data = record.value
# 生成推送内容(如微信消息)
push_content = f"您领取了{data['redpack_id']}红包!"
# 调用推送SDK(如微信小程序推送)
wechat_push.send(data['user_id'], push_content)
限流逻辑(令牌桶,动态调整):
class TokenBucket:
def __init__(self, capacity, refill_rate, monitor_interval=1):
self.capacity = capacity # 令牌桶容量
self.refill_rate = refill_rate # 每秒补充的令牌数(如1000个/秒)
self.tokens = capacity
self.last_refill = time.time()
self.monitor_interval = monitor_interval # 监控周期(秒)
def check(self):
now = time.time()
# 动态调整:根据系统负载(如CPU使用率)调整令牌补充速率
if now - self.last_refill >= self.monitor_interval:
load = get_system_load() # 获取系统负载(如CPU > 70%时降低补充速率)
if load > 0.7: # 负载过高时,补充速率减半
self.refill_rate = self.refill_rate * 0.5
else:
self.refill_rate = self.refill_rate # 正常补充
self.last_refill = now
self.tokens = min(self.capacity, self.tokens + (now - self.last_refill) * self.refill_rate)
return self.tokens > 0
熔断逻辑(基于失败率,动态恢复):
class CircuitBreaker:
def __init__(self, threshold=0.5, recovery_rate=0.1, recovery_step=0.1):
self.threshold = threshold # 失败率阈值(如50%)
self.state = "CLOSED" # 状态:CLOSED、OPEN、HALF_OPEN
self.success_count = 0
self.failure_count = 0
self.recovery_rate = recovery_rate # 恢复速率(如每秒10%的请求尝试)
self.recovery_step = recovery_step # 恢复步长(如每分钟增加10%的请求量)
def request(self):
if self.state == "OPEN":
# 熔断状态下,允许少量请求尝试
if random.random() < self.recovery_rate: # 每秒10%的请求尝试
return self._execute()
elif self.state == "HALF_OPEN":
# 恢复中,允许请求,并统计结果
return self._execute()
else:
return self._execute()
def _execute(self):
try:
# 调用推送SDK(模拟)
wechat_push.send(...)
self.success_count += 1
return True
except Exception as e:
self.failure_count += 1
return False
def update_state(self):
total = self.success_count + self.failure_count
if total == 0:
return
failure_rate = self.failure_count / total
if failure_rate > self.threshold and self.state == "CLOSED":
self.state = "OPEN"
elif failure_rate < self.threshold and self.state == "HALF_OPEN":
# 逐步恢复,按步长增加请求量
self.recovery_rate += self.recovery_step
if self.recovery_rate > 1:
self.recovery_rate = 1
self.state = "CLOSED"
(约90秒)
“我参与过双11红包大促,负责的消息推送模块在大促时遇到百万级并发。当时问题表现为用户点击领取后,红包通知延迟超过1秒甚至不推送,导致用户流失。解决方案是引入Kafka消息队列异步处理推送任务,配合动态令牌桶限流控制请求速率,同时设置基于失败率的熔断机制。具体来说,通过压力测试,模拟100万并发请求时,平均响应时间从500ms降至50ms,失败率从5%降至0.1%。活动期间,用户红包通知延迟低于1秒,用户满意度提升显著,未出现服务崩溃。”