Skip to content

ChenQiaoling00/HydroJobSche

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

HydroJobSche

HydroJobSche (Hydro Job Scheduler) 是一个支持在运行时暂停和恢复 PyTorch 深度学习训练任务的轻量级调度框架。通过在 operator 级别插入钩子(hooks),实现细粒度的训练控制,适用于 GPU 资源的抢占式调度场景。

🌟 核心特性

  • 细粒度控制:在每个 PyTorch operator(Conv2d、Linear 等)级别实现暂停/恢复
  • 信号驱动:使用 Unix 信号(SIGUSR1/SIGUSR2)进行进程间通信
  • 非侵入式设计:只需简单包装模型,无需修改训练代码
  • 自动钩子注册:递归地为模型的所有叶子模块注册前向/反向传播钩子
  • 轻量级实现:最小化运行时开销,适合生产环境

📦 安装

依赖要求

  • Python 3.7+
  • PyTorch 1.8+
  • CUDA(可选,用于 GPU 训练)

安装步骤

# 克隆仓库
cd HydroJobSche

# 安装依赖
pip install torch torchvision

# 或从源码安装
pip install -e .

🚀 快速开始

基本用法

只需三步即可为你的 PyTorch 训练任务添加暂停/恢复功能:

import torch
from HydroJobSche import od_execution_wrapper

# 1. 定义你的模型
model = YourModel()

# 2. 使用 od_execution_wrapper 包装模型
model = od_execution_wrapper(model)

# 3. 正常训练(无需修改训练代码)
for epoch in range(epochs):
    for data, target in train_loader:
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

完整训练示例

查看 train.py 获取完整的 MNIST 训练示例:

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from HydroJobSche import od_execution_wrapper

# 定义模型
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

# 包装模型
model = Net().to(device)
model = od_execution_wrapper(model)

# 训练循环
optimizer = optim.Adadelta(model.parameters(), lr=1.0)
for epoch in range(epochs):
    for batch_idx, (data, target) in enumerate(train_loader):
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()

🎮 暂停和恢复控制

HydroJobSche 提供三种方式控制训练任务:

方式一:使用命令行工具

# 1. 获取训练进程的 PID
ps aux | grep train.py

# 2. 暂停训练
python client.py --pid <PID> --action pause

# 3. 恢复训练
python client.py --pid <PID> --action resume

方式二:在代码中控制

import os
from client import send_signal

# 获取当前进程 PID
pid = os.getpid()

# 暂停当前进程
send_signal(pid, "pause")

# 恢复当前进程
send_signal(pid, "resume")

方式三:使用系统信号

# 暂停训练进程
kill -SIGUSR1 <PID>

# 恢复训练进程
kill -SIGUSR2 <PID>

🔧 运行示例

本地运行

# 基本训练(CPU)
python train.py --batch-size 64 --epochs 10 --iterations 1000

# GPU 训练
python train.py --batch-size 128 --epochs 10 --lr 1.0

# 快速测试
python train.py --dry-run

集群运行(SLURM)

# 使用 SLURM 调度器
srun --preempt -p gpu_partition \
     --cpus-per-task=8 \
     --gres=gpu:1 \
     --nodes=1 \
     python train.py --iterations 200

# 使用 nsys 性能分析
nsys profile --force-overwrite=true -o profile_output python train.py

📚 API 文档

od_execution_wrapper(model)

包装 PyTorch 模型以支持暂停/恢复功能。

参数:

  • model (torch.nn.Module): 需要包装的 PyTorch 模型

返回:

  • Engine: 包装后的模型引擎,支持暂停/恢复操作

示例:

from HydroJobSche import od_execution_wrapper
model = od_execution_wrapper(model)

Engine 类

包装后的模型引擎,提供以下方法:

方法 说明
__call__(*args, **kwargs) 调用模型前向传播(与原模型用法相同)
forward(*args, **kwargs) 显式调用前向传播
pause() 暂停训练执行
resume() 恢复训练执行
train() 设置模型为训练模式

信号处理

信号 作用 触发方式
SIGUSR1 暂停训练 kill -SIGUSR1 <PID>
SIGUSR2 恢复训练 kill -SIGUSR2 <PID>

信号处理器在调用 od_execution_wrapper() 时自动注册。

🏗️ 架构设计

核心组件

1. OpHook 系统

HydroJobSche/ophooks/
├── _base_ophook.py         # BaseOpHook 抽象基类
└── _od_execution_ophook.py # ExecutionOpHook 实现
  • BaseOpHook: 定义钩子接口(前向/反向传播的前后钩子)
  • ExecutionOpHook: 实现暂停/恢复逻辑的具体钩子

2. 钩子注册机制

  • 递归遍历模型的所有子模块
  • 仅为包含参数或缓冲区的叶子模块注册钩子
  • 在前向和反向传播的前后各插入检查点

3. 暂停/恢复实现

class ExecutionOpHook:
    def __init__(self):
        self.state = 1  # 0=暂停, 1=运行
    
    def sleep_while_paused(self):
        while self.state == 0:
            time.sleep(0.001)  # 轮询等待,1ms 检查一次

执行流程

训练循环
  │
  ├── Forward Pass
  │   ├── [pre_fwd_exec]  ← 检查是否暂停
  │   ├── Operator 执行
  │   └── [post_fwd_exec] ← 检查是否暂停
  │
  ├── Loss 计算
  │
  ├── Backward Pass
  │   ├── [pre_bwd_exec]  ← 检查是否暂停
  │   ├── Operator 梯度计算
  │   └── [post_bwd_exec] ← 检查是否暂停
  │
  └── Optimizer 更新

⚠️ 性能开销

1. 运行时开销

开销类型 描述 影响
钩子调用 每个 operator 额外 4 次函数调用 轻微
状态检查 每个钩子检查暂停标志 极小
轮询开销 暂停时每 1ms 轮询一次 仅在暂停时有影响

2. 内存开销

  • 为每个叶子模块注册钩子函数
  • 钩子闭包保存对 ExecutionOpHook 的引用

3. 响应延迟

  • 暂停响应时间:最坏情况等待当前 operator 执行完成(通常在微秒到毫秒级别)
  • 恢复响应时间:立即恢复执行

性能建议

场景 建议
✅ Operator 数量中等的模型 推荐使用
✅ GPU 集群抢占式调度 推荐使用
✅ 多任务资源共享环境 推荐使用
⚠️ 大量小 operator 的模型 累积开销可能较大
⚠️ 性能极度敏感的场景 不推荐

📝 应用场景

✅ 适用场景

  1. GPU 集群的抢占式调度

    • 高优先级任务到来时快速释放 GPU 资源
    • 低优先级任务暂停,等待资源释放后恢复
  2. 资源共享环境

    • 多个训练任务动态调度 GPU
    • 避免资源冲突,提高利用率
  3. 调试和性能分析

    • 在特定点暂停训练进行检查
    • 配合 NVTX 进行性能分析
  4. 远程实验管理

    • 远程控制训练任务的执行
    • 灵活调整训练计划

❌ 不适用场景

  • 单机单卡训练(无需暂停/恢复功能)
  • 对性能极度敏感的生产环境
  • 超大规模分布式训练(开销可能过大)

🔍 调试和监控

启用详细日志

编辑 HydroJobSche/ophooks/_od_execution_ophook.py,取消注释打印语句:

def pre_fwd_exec(self, module: torch.nn.Module, *args):
    self.sleep_while_paused()
    print(f'FWD PRE {module.__class__.__name__}')  # 取消注释

性能分析(NVIDIA Nsys)

# 生成性能分析报告
nsys profile --force-overwrite=true -o profile_output python train.py

# 查看报告
nsys-ui profile_output.qdrep

NVTX 标记

代码中已集成 NVTX 标记,用于在 Nsys 中可视化暂停/恢复事件:

def nvtx_mark(title):
    import torch.cuda.nvtx as nvtx
    nvtx.mark(title)

# 使用示例
nvtx_mark("pause")
send_signal(pid, "pause")

📖 高级示例

定时暂停和恢复

使用线程实现定时控制:

import threading
import time
import os
from client import send_signal

def pause_after_delay(delay_seconds):
    time.sleep(delay_seconds)
    pid = os.getpid()
    send_signal(pid, "pause")
    print(f"[HydroJobSche] Training paused after {delay_seconds}s")

def resume_after_delay(delay_seconds):
    time.sleep(delay_seconds)
    pid = os.getpid()
    send_signal(pid, "resume")
    print(f"[HydroJobSche] Training resumed after {delay_seconds}s")

# 10 秒后暂停,15 秒后恢复
t1 = threading.Thread(target=pause_after_delay, args=(10,))
t2 = threading.Thread(target=resume_after_delay, args=(15,))

t1.start()
t2.start()

# 开始训练
model = od_execution_wrapper(model)
# ...

集成到调度器

import signal
import os

class TrainingScheduler:
    def __init__(self, training_pid):
        self.training_pid = training_pid
    
    def preempt(self):
        """抢占训练任务"""
        os.kill(self.training_pid, signal.SIGUSR1)
        print(f"Task {self.training_pid} preempted")
    
    def resume(self):
        """恢复训练任务"""
        os.kill(self.training_pid, signal.SIGUSR2)
        print(f"Task {self.training_pid} resumed")

# 使用示例
scheduler = TrainingScheduler(training_pid=12345)
scheduler.preempt()  # 暂停任务
# ... 执行高优先级任务 ...
scheduler.resume()   # 恢复任务

限制

  1. 使用python signal本身会带来额外开销,主要体现在延迟上,从而导致暂停恢复不够及时。解决方法是使用cpp实现对应的信号触发sleep/resume,参考:https://github.com/yzs981130/cudaw

  2. 只限制launch,所以已经launch的kernel无法暂停,只能等待跑完。解决方法是可以通过kernel实现中添加同步信号量解决。

📄 许可证

本项目采用 MIT 许可证。

🤝 贡献

欢迎提交 Issue 和 Pull Request!

📧 联系方式

如有问题或建议,请通过 Issue 联系。


HydroJobSche - 让 PyTorch 训练任务调度更灵活 🚀

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages