
1) 【一句话结论】
构建一个基于分布式DAG依赖解析与动态优先级队列的调度系统,通过GPU资源池动态扩容、优先级继承协议解决优先级反转,支持高并发视频转码任务按优先级、资源限制及任务依赖关系高效执行。
2) 【原理/概念讲解】
老师口吻解释核心概念:
3) 【对比与适用场景】
| 调度策略 | 定义 | 特性 | 使用场景 | 注意点 |
|---|---|---|---|---|
| 优先级队列 | 根据任务优先级排序的队列,高优先级先执行 | 高优先级任务优先,低优先级可能饥饿 | 紧急转码任务(如用户修改优先级) | 需结合时间片轮询避免饥饿 |
| DAG调度 | 基于任务依赖的有向无环图,按拓扑顺序执行 | 保证依赖关系,避免循环依赖 | 依赖关系复杂的视频处理(如分割→转码) | 需高效解析依赖图,支持大规模任务 |
| 资源限制调度 | 结合资源需求的调度,根据资源可用性分配任务 | 优先分配资源充足的节点 | GPU资源有限的系统 | 需实时监控资源状态,动态调整 |
| 并行调度 | 支持任务子任务并行执行 | 利用多线程/进程提升效率 | 视频分割与转码可并行 | 需考虑数据依赖,避免竞争 |
| 优先级继承协议 | 当高优先级任务等待低优先级任务时,低优先级任务继承高优先级优先级 | 避免优先级反转 | 高优先级任务依赖低优先级任务 | 需实现临时优先级提升逻辑 |
4) 【示例】
伪代码(任务提交与调度流程):
class VideoTask:
def __init__(self, task_id, priority, cpu_req, gpu_req, dependencies, subtasks):
self.id = task_id
self.priority = priority # 1-100
self.cpu_req = cpu_req
self.gpu_req = gpu_req
self.dependencies = dependencies # 依赖任务ID列表
self.subtasks = subtasks # 子任务列表,如['split', 'encode']
self.status = 'pending'
class TaskScheduler:
def __init__(self):
self.priority_queue = PriorityQueue()
self.task_map = {}
self.dependency_graph = {}
self.gpu_pool = GPUResourcePool()
def add_task(self, task):
if self.detect_cycle(task.dependencies):
raise ValueError("任务存在循环依赖")
self.priority_queue.put((task.priority, task.id))
self.task_map[task.id] = task
self.update_dependency_graph(task)
def detect_cycle(self, deps):
visited = set()
rec_stack = set()
for dep in deps:
if dep in rec_stack:
return True
if dep not in visited:
if self.detect_cycle_helper(dep, visited, rec_stack):
return True
return False
def update_dependency_graph(self, task):
for dep in task.dependencies:
self.dependency_graph.setdefault(dep, []).append(task.id)
def schedule(self):
while not self.priority_queue.empty():
_, task_id = self.priority_queue.get()
task = self.task_map[task_id]
if self.check_dependencies(task):
if self.check_resources(task):
self.execute_task(task)
else:
self.priority_queue.put((task.priority, task_id))
else:
self.priority_queue.put((task.priority, task_id))
def check_dependencies(self, task):
for dep in task.dependencies:
if dep not in self.task_map or self.task_map[dep].status != 'completed':
return False
return True
def check_resources(self, task):
if not self.gpu_pool.has_capacity(task.gpu_req):
self.gpu_pool.expand()
return self.gpu_pool.has_capacity(task.gpu_req) and self.cpu_pool.has_capacity(task.cpu_req)
def execute_task(self, task):
task.status = 'running'
with ThreadPoolExecutor() as executor:
futures = []
for subtask in task.subtasks:
if subtask == 'split':
future = executor.submit(self.split_video, task.id)
else: # encode
future = executor.submit(self.encode_video, task.id)
futures.append(future)
for future in futures:
future.result()
task.status = 'completed'
def split_video(self, task_id):
try:
result = video_split_service.split(task_id, timeout=30)
self.task_map[task_id].subtasks['split'] = 'completed'
except TimeoutError:
self.task_map[task_id].status = 'timeout'
self.retry_task(task_id)
def encode_video(self, task_id):
for _ in range(3):
try:
result = video_encode_service.encode(task_id)
self.task_map[task_id].subtasks['encode'] = 'completed'
break
except Exception as e:
if _ == 2:
self.task_map[task_id].status = 'failed'
self.retry_task(task_id)
else:
time.sleep(5)
def retry_task(self, task_id):
task = self.task_map[task_id]
task.priority -= 1
self.priority_queue.put((task.priority, task_id))
class GPUResourcePool:
def __init__(self):
self.capacity = 0
self.available = 0
def has_capacity(self, req):
return self.available >= req
def expand(self):
self.capacity += 1
self.available += 1
# 示例调用
scheduler = TaskScheduler()
task = VideoTask(
task_id='video1',
priority=50,
cpu_req=2,
gpu_req=1,
dependencies=['video0'],
subtasks=['split', 'encode']
)
scheduler.add_task(task)
scheduler.schedule()
5) 【面试口播版答案】
面试官您好,针对万兴视频编辑的高并发视频转码任务,我设计了一个分布式调度系统。核心是构建任务模型(包含优先级、资源需求、依赖关系),通过优先级队列和DAG依赖图实现调度,资源池动态扩容,并解决优先级反转。比如用户提交任务后,系统检测依赖,若满足则启动并行处理(分割和转码),超时重试,同时根据任务队列长度自动增加GPU节点,确保资源充足。具体来说,任务按优先级排序,依赖关系用DAG保证顺序,GPU资源池根据任务队列长度动态扩容,优先级继承协议避免低优先级任务因高优先级任务占用资源而饥饿。这样既能支持数千并发任务,又能保证任务按优先级和依赖关系高效执行。
6) 【追问清单】
7) 【常见坑/雷区】