51mee - AI智能招聘平台Logo
模拟面试题目大全招聘中心会员专区

设计一个高并发视频处理任务调度系统,用于处理万兴视频编辑中的视频转码任务。系统需要支持数千个并发任务,并确保任务按优先级调度,同时考虑资源限制(如CPU、GPU)和任务依赖(如先完成视频分割再进行转码)。请描述系统的整体架构、核心组件设计以及关键的技术选型。

万兴科技算法工程化难度:困难

答案

1) 【一句话结论】
构建一个基于分布式DAG依赖解析与动态优先级队列的调度系统,通过GPU资源池动态扩容、优先级继承协议解决优先级反转,支持高并发视频转码任务按优先级、资源限制及任务依赖关系高效执行。

2) 【原理/概念讲解】
老师口吻解释核心概念:

  • 任务模型:视频转码任务拆分为子任务(如视频分割、转码),每个任务包含:优先级(用户设置,1-100,1最高)、资源需求(CPU核数、GPU显存)、依赖关系(如分割任务完成后才能启动转码,用有向无环图DAG表示)。依赖图需检测循环依赖,若存在则报错。
  • 调度核心:优先级队列(最大堆,高优先级先执行)与DAG拓扑排序结合。调度器按优先级排序,同时检查依赖是否满足,满足则执行;若依赖未完成,任务重新入队。
  • 资源管理:GPU资源池化(NVIDIA MIG隔离),动态扩容策略:根据任务队列长度(如队列长度超过阈值)和GPU使用率(如使用率>80%),自动增加GPU节点(如增加1个GPU服务器,启动新的GPU资源池实例)。CPU资源按需分配,避免浪费。
  • 优先级反转解决方案:采用优先级继承协议(Priority Inheritance Protocol, PIP)。当高优先级任务等待低优先级任务时,低优先级任务继承高优先级任务的优先级,避免低优先级任务因高优先级任务占用资源而饥饿。具体实现:低优先级任务等待高优先级任务时,临时提升自身优先级,直到高优先级任务释放资源。
  • 类比:任务调度像“工厂生产调度”,任务依赖是“工序顺序”,优先级是“紧急订单”,资源是“机器设备”,调度器按优先级和工序安排生产,同时动态调整机器数量(资源池扩容),确保订单高效完成。

3) 【对比与适用场景】

调度策略定义特性使用场景注意点
优先级队列根据任务优先级排序的队列,高优先级先执行高优先级任务优先,低优先级可能饥饿紧急转码任务(如用户修改优先级)需结合时间片轮询避免饥饿
DAG调度基于任务依赖的有向无环图,按拓扑顺序执行保证依赖关系,避免循环依赖依赖关系复杂的视频处理(如分割→转码)需高效解析依赖图,支持大规模任务
资源限制调度结合资源需求的调度,根据资源可用性分配任务优先分配资源充足的节点GPU资源有限的系统需实时监控资源状态,动态调整
并行调度支持任务子任务并行执行利用多线程/进程提升效率视频分割与转码可并行需考虑数据依赖,避免竞争
优先级继承协议当高优先级任务等待低优先级任务时,低优先级任务继承高优先级优先级避免优先级反转高优先级任务依赖低优先级任务需实现临时优先级提升逻辑

4) 【示例】
伪代码(任务提交与调度流程):

class VideoTask:
    def __init__(self, task_id, priority, cpu_req, gpu_req, dependencies, subtasks):
        self.id = task_id
        self.priority = priority  # 1-100
        self.cpu_req = cpu_req
        self.gpu_req = gpu_req
        self.dependencies = dependencies  # 依赖任务ID列表
        self.subtasks = subtasks  # 子任务列表,如['split', 'encode']
        self.status = 'pending'

class TaskScheduler:
    def __init__(self):
        self.priority_queue = PriorityQueue()
        self.task_map = {}
        self.dependency_graph = {}
        self.gpu_pool = GPUResourcePool()

    def add_task(self, task):
        if self.detect_cycle(task.dependencies):
            raise ValueError("任务存在循环依赖")
        self.priority_queue.put((task.priority, task.id))
        self.task_map[task.id] = task
        self.update_dependency_graph(task)

    def detect_cycle(self, deps):
        visited = set()
        rec_stack = set()
        for dep in deps:
            if dep in rec_stack:
                return True
            if dep not in visited:
                if self.detect_cycle_helper(dep, visited, rec_stack):
                    return True
        return False

    def update_dependency_graph(self, task):
        for dep in task.dependencies:
            self.dependency_graph.setdefault(dep, []).append(task.id)

    def schedule(self):
        while not self.priority_queue.empty():
            _, task_id = self.priority_queue.get()
            task = self.task_map[task_id]
            if self.check_dependencies(task):
                if self.check_resources(task):
                    self.execute_task(task)
                else:
                    self.priority_queue.put((task.priority, task_id))
            else:
                self.priority_queue.put((task.priority, task_id))

    def check_dependencies(self, task):
        for dep in task.dependencies:
            if dep not in self.task_map or self.task_map[dep].status != 'completed':
                return False
        return True

    def check_resources(self, task):
        if not self.gpu_pool.has_capacity(task.gpu_req):
            self.gpu_pool.expand()
        return self.gpu_pool.has_capacity(task.gpu_req) and self.cpu_pool.has_capacity(task.cpu_req)

    def execute_task(self, task):
        task.status = 'running'
        with ThreadPoolExecutor() as executor:
            futures = []
            for subtask in task.subtasks:
                if subtask == 'split':
                    future = executor.submit(self.split_video, task.id)
                else:  # encode
                    future = executor.submit(self.encode_video, task.id)
                futures.append(future)
            for future in futures:
                future.result()
        task.status = 'completed'

    def split_video(self, task_id):
        try:
            result = video_split_service.split(task_id, timeout=30)
            self.task_map[task_id].subtasks['split'] = 'completed'
        except TimeoutError:
            self.task_map[task_id].status = 'timeout'
            self.retry_task(task_id)

    def encode_video(self, task_id):
        for _ in range(3):
            try:
                result = video_encode_service.encode(task_id)
                self.task_map[task_id].subtasks['encode'] = 'completed'
                break
            except Exception as e:
                if _ == 2:
                    self.task_map[task_id].status = 'failed'
                    self.retry_task(task_id)
                else:
                    time.sleep(5)

    def retry_task(self, task_id):
        task = self.task_map[task_id]
        task.priority -= 1
        self.priority_queue.put((task.priority, task_id))

class GPUResourcePool:
    def __init__(self):
        self.capacity = 0
        self.available = 0

    def has_capacity(self, req):
        return self.available >= req

    def expand(self):
        self.capacity += 1
        self.available += 1

# 示例调用
scheduler = TaskScheduler()
task = VideoTask(
    task_id='video1',
    priority=50,
    cpu_req=2,
    gpu_req=1,
    dependencies=['video0'],
    subtasks=['split', 'encode']
)
scheduler.add_task(task)
scheduler.schedule()

5) 【面试口播版答案】
面试官您好,针对万兴视频编辑的高并发视频转码任务,我设计了一个分布式调度系统。核心是构建任务模型(包含优先级、资源需求、依赖关系),通过优先级队列和DAG依赖图实现调度,资源池动态扩容,并解决优先级反转。比如用户提交任务后,系统检测依赖,若满足则启动并行处理(分割和转码),超时重试,同时根据任务队列长度自动增加GPU节点,确保资源充足。具体来说,任务按优先级排序,依赖关系用DAG保证顺序,GPU资源池根据任务队列长度动态扩容,优先级继承协议避免低优先级任务因高优先级任务占用资源而饥饿。这样既能支持数千并发任务,又能保证任务按优先级和依赖关系高效执行。

6) 【追问清单】

  • 问:如何处理任务优先级的动态调整?
    回答要点:用户可通过API实时修改任务优先级(如将优先级从50提升至90),调度器接收到更新后,立即调整优先级队列位置,确保高优先级任务优先执行。例如,用户修改后,任务重新入队,按新优先级排序。
  • 问:系统如何处理循环依赖?
    回答要点:在任务提交时,调度器会检测依赖图是否存在环。若检测到环,系统会报错并拒绝任务,避免死锁。例如,若视频分割任务依赖转码任务,则存在循环依赖,系统会提示用户修正依赖关系。
  • 问:资源池的动态扩容策略具体如何实现?
    回答要点:当GPU任务队列长度超过阈值(如100个任务)或GPU使用率超过80%时,系统自动增加GPU节点(如启动新的GPU服务器,分配新的GPU资源)。CPU资源按需分配,避免浪费。例如,队列长则增加GPU节点,确保资源充足。
  • 问:优先级继承协议如何具体实现?
    回答要点:当高优先级任务等待低优先级任务时,低优先级任务临时继承高优先级任务的优先级。例如,高优先级任务需要GPU资源,低优先级任务占用GPU,此时低优先级任务优先级提升,直到高优先级任务释放资源。
  • 问:系统如何监控任务状态和资源使用?
    回答要点:使用Prometheus收集任务状态(运行中、完成、失败、超时)、资源使用率(CPU/GPU),通过Grafana Dashboard展示,支持告警(如任务积压超过阈值、GPU资源不足)。

7) 【常见坑/雷区】

  • 忽略循环依赖检测:若依赖图中存在环,可能导致死锁,需在任务提交时检测并报错。
  • 资源分配公平性:若所有任务按需分配资源,可能导致低优先级任务占用过多资源,需结合优先级和时间片轮询。
  • 优先级队列饥饿问题:低优先级任务长期无法执行,需设置时间片或轮询机制(如优先级队列结合时间片轮询)。
  • 消息队列延迟影响调度:任务提交后,消息队列延迟导致调度延迟,需优化消息队列配置(如增加分区数、减少延迟)。
  • 分布式一致性:多个调度器节点间任务状态同步,未考虑数据一致性问题,需使用分布式锁或消息队列确保状态同步。
51mee.com致力于为招聘者提供最新、最全的招聘信息。AI智能解析岗位要求,聚合全网优质机会。
产品招聘中心面经会员专区简历解析Resume API
联系我们南京浅度求索科技有限公司admin@51mee.com
联系客服
51mee客服微信二维码 - 扫码添加客服获取帮助
© 2025 南京浅度求索科技有限公司. All rights reserved.
公安备案图标苏公网安备32010602012192号苏ICP备2025178433号-1