
1) 【一句话结论】
360安全产品通过Spark(批处理构建用户行为基线)与Flink(实时流处理检测异常)结合的大数据技术栈,结合特征工程(如登录频率、IP/设备异常、操作时间分布)和规则/机器学习模型,实现威胁检测,并通过消息队列(如Kafka)与告警系统(如ES+告警平台)实现实时告警,确保低延迟和高准确率。
2) 【原理/概念讲解】
老师讲解:
3) 【对比与适用场景】
| 技术栈 | 定义 | 特性 | 使用场景 | 注意点 |
|---|---|---|---|---|
| Spark | 分布式计算框架,支持批处理、流处理、机器学习 | 速度快(内存计算),支持复杂计算(如SQL、机器学习) | 历史数据离线分析(如构建用户行为基线)、特征工程(计算统计特征) | 对延迟敏感的实时检测不适用 |
| Flink | 实时流处理框架,支持低延迟、状态管理 | 低延迟(毫秒级),高吞吐,支持状态快照 | 实时威胁检测(如异常登录、恶意操作)、实时告警 | 需考虑状态存储和容错(如检查点) |
4) 【示例】
(Flink处理实时登录流,检测异常登录的伪代码)
from flink import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
# 读取实时登录日志(JSON格式,包含用户ID、时间戳、IP、设备等)
login_stream = env.socket_text_stream("localhost", 9999)
# 定义用户基线(已通过Spark计算,存储在Redis)
user_baseline = env.get_connection("redis").table("user_login_baseline")
def detect_anomaly(login_event, baseline):
# 检查登录时间间隔、IP是否与基线一致
if (login_event['timestamp'] - baseline['last_login']) < 0 or \
login_event['ip'] not in baseline['normal_ips']:
return True
return False
# 连接用户基线数据
joined_stream = login_stream.join(user_baseline, on="user_id").where("login_event.user_id == baseline.user_id")
# 过滤异常事件
anomaly_stream = joined_stream.filter(lambda x: detect_anomaly(x[0], x[1]))
# 发送告警(通过Kafka)
anomaly_stream.map(lambda x: json.dumps({"event": x[0], "alert": "异常登录"})).write_to_kafka("alert_topic")
5) 【面试口播版答案】
(约90秒)
“面试官您好,360安全产品处理海量用户行为日志时,核心是采用Spark与Flink结合的大数据技术栈,以及特征工程和实时告警机制。具体来说,Spark用于离线分析,比如计算每个用户的正常行为基线(如登录频率、IP/设备、操作时间分布),这些基线存储后用于实时检测。Flink负责实时流处理,比如处理用户访问、登录等日志,通过规则或机器学习模型(比如基于基线的异常检测)识别异常行为。比如检测到用户在短时间内多次登录不同IP,或者登录时间偏离正常规律,就判定为异常。检测到异常后,通过消息队列(如Kafka)将告警事件发送给告警系统(如ES存储事件,Kibana可视化或企业内部平台推送),实现实时告警。这样既能处理海量数据,又能保证低延迟的威胁检测。”
6) 【追问清单】
7) 【常见坑/雷区】