PyTorch使用多GPU并行训练及其原理和注意事项

2022-10-17 09:55:48

1. 常见的多GPU使用方法

在这里插入图片描述

  • 模型并行(model parallel) -> 大型网络(对速度没有提升)
    当模型需要的显卡很大,一张GPU的显存放不下时,使用这种方式就可以训练一个大型的网络
  • 数据并行(data parallel)-> 加速训练速度
    可以将整个模型放到一张GPU时,我们可以将每一个模型放到每一个GPU上,让它们同时进行训练(正向传播+反向传播)

2. 训练速度与GPU数量之间的关系

性能实测:数据来源霹雳巴拉WZ

  • PyTorch 1.7
  • CUDA:10.1
  • Model:ResNet-34
  • Dataset:flower_photos(非常小的一个数据集)
  • BatchSize:16
  • Optimizer:SGD
  • GPU:Tesla V100(上一代卡皇)

在这里插入图片描述

从上图我们可以看到,训练速度和GPU数量并不是简单的倍乘关系。随着GPU数量的增加,提速效果越来越差,这时因为不同GPU之间需要进行通信,会有性能的损耗。

3. 重点

  1. 数据集如何在不同设备间分配
  2. 误差梯度如何在不同设备之间通信
  3. Batch Normalization如何在不同设备之间同步

3.1 数据集分配

3.2 误差通信

在使用多GPU训练时,每张GPU在进行完一个step后会产生梯度,我们需要将所有GPU的梯度求平均(并不是每张GPU各自学各自的,那样就没有意义了),这样才能将每张GPU的训练结果结合在一起。

3.3 BN同步

在这里插入图片描述

假设BS=2,feature1和feature2是网络中的某一层经过卷积操作后得到特征图,因为BS=2,所以有两个feature。在正向传播过程中,BN会求特征矩阵每一个channel的均值和方差,再对每个通道上的数据进行“减均值除标准差”的操作,这样就得到经过BN的特征图了。


在这里插入图片描述

当我们使用多GPU训练时,每个GPU都会计算各自的均值和方差。这里我们同样假设每个GPU上数据的BS=2,那么每个BN层求的均值 μ i \mu_iμi和方差 σ i 2 \sigma^2_iσi2都是针对两个特征图而言的。

之前我们说过,BS越大,BN求的均值和方差越接近全体样本,准确率越高。

所以如果我们使用多GPU训练,我们就应该考虑“我们是不是应该去求每一个BN层在所有设备上的均值和方差”。这样我们所求得的均值和方差是更加有意义的。

如果我们不考虑多GPU之间BN的参数关系,那么我们所求得的BN层的均值和方差都是针对输入的两个样本(BS=2)求解的。

如果我们考虑到另外的设备呢?

GPU1的BN层得到两个特征图1和2,GPU2的BN层得到两个特征图3和4。如果我们求的BN参数是特征图12+34,那么我们的BN就变相等于在BS=4的情况下求得的均值和方差。这样对我们最终训练的结果是有一定帮助的。

霹雳巴拉WZ说,如果不使用同步的BN(即普通的nn.BatchNorm),那么得到的结果和使用单GPU的结果基本上是一致的。
当然,使用不同BN的多GPU也对模型的训练速度有很大的帮助
如果使用了同步的BN后,最终结果一般会有将近一个点的提升
所有同步的BN确实的有一定的作用

如果你的GPU显存很大,本来在一张GPU上就可以很大的BS,那么使用同步的BN也不会有很大的作用。

同步BN主要用在:网络比较大,一张GPU它的BS不能设置很大的情况下,同步的BN对准确率的帮助比较大。

注意:使用了具有同步BN的方法,多GPU的并行速度会下降。可能会降低30%的速度。

  • 想要更快的速度 -> 不使用同步的BN
  • 更高的精度 -> 使用同步的BN

4. PyTorch实现多卡并行计算的方式

分为两种:

  1. DataParallel
    PyTorch官方很久之前给出的一种方案
  2. DistributedDataParallel
    更新一代的多卡训练方法

DDP不仅仅局限于单机多卡的情况,还适用于多级多卡的场景。

  1. PyTorch关于DP的文档
  2. [PyTorch关于DDP的文档](DistributedDataParallel — PyTorch 1.11.0 documentation)

在这里插入图片描述

  • DP是一个单进程、多线程并且仅仅只能工作在单一的设备中(单节点,不适用于多机的情况)
  • DDP是一个多进程的,可以工作在单机或者多机的场景中
  • DP通常要慢于DDP(即便在单一的设备上)

这里所说的单机和多机并不是单GPU和多GPU,而是单个服务器和多个服务器的意思


DP和DDP都可以用在单机的情况下,单DDP可以用在多机的情况下,而且即便在单机下(一台机器有多个GPU),DDP的速度也要比DP的速度快。

[PyTorch关于单机多卡和多级多卡的训练教程](PyTorch Distributed Overview — PyTorch Tutorials 1.11.0+cu102 documentation)

5. PyTorch中使用多GPU训练的常用启动方式

  1. torch.distributed.launch:代码量少,启动速度快

    python-m torch.distributed.launch# -m: run library module as a script# --help: 可以通过`torch.distributed.launch --help`这样真的方式去查看使用方法

    在PyTorch官方实现的Faster R-CNN源码中,多GPU训练就是使用distributed.launch进行启动的,因此后面将的主要基于distributed.launch来启动

  2. torch.multiprocessing:代码量更多,但拥有更好的控制和灵活性


注意事项:

  • 在使用torch.distributed.launch的方法进行训练。一旦训练开始后,手动强制终止训练程序(ctrl + c),会有小概率出现进程没有杀掉的情况。
    此时程序还会占用GPU的显存以及资源。所以需要我们将这些进程kill -9

5.1 单机单卡的训练脚本

5.1.1 main

"""
    单机单卡的训练脚本 —— 训练ResNet34/101
"""import osimport mathimport argparseimport torchimport torch.optimas optimfrom torch.utils.tensorboardimport SummaryWriterfrom torchvisionimport transformsimport torch.optim.lr_scheduleras lr_schedulerfrom modelimport resnet34, resnet101from my_datasetimport MyDataSetfrom utilsimport read_split_datafrom multi_train_utils.train_eval_utilsimport train_one_epoch, evaluatedefmain(args):# 检查机器的配置(是否有GPU,没有GPU则为CPU)
    device= torch.device(args.deviceif torch.cuda.is_available()else"cpu")print(args)# 打印传入的参数print('Start Tensorboard with "tensorboard --logdir=runs", view at http://localhost:6006/')
    tb_writer= SummaryWriter()# 创建Tensorborad对象if os.path.exists("./weights")isFalse:# 检查保存权值文件的文件夹是否存在,不存在则创建该文件夹
        os.makedirs("./weights")

    train_info, val_info, num_classes= read_split_data(args.data_path)
    train_images_path, train_images_label= train_info
    val_images_path, val_images_label= val_info# check num_classesassert args.num_classes== num_classes,"dataset num_classes: {}, input {}".format(args.num_classes,
                                                                                       num_classes)

    data_transform={"train": transforms.Compose([transforms.RandomResizedCrop(224),
                                     transforms.RandomHorizontalFlip(),
                                     transforms.ToTensor(),
                                     transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])]),"val": transforms.Compose([transforms.Resize(256),
                                   transforms.CenterCrop(224),
                                   transforms.ToTensor(),
                                   transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])])}# 实例化训练数据集
    train_data_set= MyDataSet(images_path=train_images_path,
                               images_class=train_images_label,
                               transform=data_transform["train"])# 实例化验证数据集
    val_data_set= MyDataSet(images_path=val_images_path,
                             images_class=val_images_label,
                             transform=data_transform["val"])

    batch_size= args.batch_size# 根据BS的数量和训练设备CPU核心数来定义num_worker的大小
    nw=min([os.cpu_count(), batch_sizeif batch_size>1else0,8])# number of workersprint('Using {} dataloader workers every process'.format(nw))# 读取训练数据
    train_loader= torch.utils.data.DataLoader(train_data_set,
                                               batch_size=batch_size,
                                               shuffle=True,
                                               pin_memory=True,
                                               num_workers=nw,
                                               collate_fn=train_data_set.collate_fn)# 读取验证数据
    val_loader= torch.utils.data.DataLoader(val_data_set,
                                             batch_size=batch_size,
                                             shuffle=False,
                                             pin_memory=True,
                                             num_workers=nw,
                                             collate_fn=val_data_set.collate_fn)# 定义模型对象并添加到所属设备中
    model= resnet34(num_classes=args.num_classes).to(device)# 如果存在预训练权重则载入if args.weights!="":if os.path.exists(args.weights):# 先使用torch.load加载指定文件中的权重
            weights_dict= torch.load(args.weights, map_location=device)# 只加载key和value元素相等的键值对
            load_weights_dict={k: vfor k, vin weights_dict.items()if model.state_dict()[k].numel()== v.numel()}# 1. 模型加载字典(不严格加载);2. 打印print(model.load_state_dict(load_weights_dict, strict=False))else:raise FileNotFoundError("not found weights file: {}".format(args.weights))# 是否冻结权重if args.freeze_layers:for name, parain model.named_parameters():# name: 层的名字; para: 对应的参数# 除最后的全连接层外,其他权重全部冻结if"fc"notin name:# 除了fc层外,所有层的参数都没有梯度(不进行反向传播,即不进行参数更新)
                para.requires_grad_(False)# 将带有梯度的参数传入pg这个list中
    pg=[pfor pin model.parameters()if p.requires_grad]# 定义参数优化器,第一个参数即为需要更新的参数,也就是上一行pg列表中的结果
    optimizer= optim.SGD(pg, lr=args.lr, momentum=0.9, weight_decay=0.005)# 定义学习率变化函数,参考:Scheduler https://arxiv.org/pdf/1812.01187.pdf -> 其实就是一个余弦函数[0, pi]
    lf=lambda x:((1+ math.cos(x* math.pi/ args.epochs))/2)*(1- args.lrf)+ args.lrf# cosine# scheduler即为调整学习率变化的对象,将优化器和学习率变化曲线传给它就可以实现学习率的规律性变化
    scheduler= lr_scheduler.LambdaLR(optimizer, lr_lambda=lf)# 开始迭代训练for epochinrange(args.epochs):# train"""
            可以看到,这里的train阶段只是返回平均loss,没有预测概率
        """
        mean_loss= train_one_epoch(model=model,
                                    optimizer=optimizer,
                                    data_loader=train_loader,
                                    device=device,
                                    epoch=epoch)# epoch进行了一个,学习率变化器需要更新一下(使得optimizer中的学习率进行变化)
        scheduler.step()# validate
        sum_num= evaluate(model=model,
                           data_loader=val_loader,
                           device=device)
        acc= sum_num/len(val_data_set)# top-1准确率 = 预测正确数 / 验证样本数# 打印该epoch下的准确率print("[epoch {}] accuracy: {}".format(epoch,round(acc,3)))# 将train和validation得到的数据添加到tensorboard中# optimizer.param_groups[0]["lr"]即为对应epoch的学习率
        tags=["loss","accuracy","learning_rate"]
        tb_writer.add_scalar(tags[0], mean_loss, epoch)
        tb_writer.add_scalar(tags[1], acc, epoch)
        tb_writer.add_scalar(tags[2], optimizer.param_groups[0]["lr"], epoch)# 保存对应epoch的模型(这里没有选择最佳模型,可以通过acc达到只保留准确率高的权值文件)
        torch.save(model.state_dict(),"./weights/model-{}.pth".format(epoch))if __name__=='__main__':
    parser= argparse.ArgumentParser()
    parser.add_argument('--num_classes',type=int, default=5)
    parser.add_argument('--epochs',type=int, default=30)
    parser.add_argument('--batch-size',type=int, default=16)
    parser.add_argument('--lr',type=float, default=0.001)# lrf为倍率因子,即学习率最终降低到初始学习率lr的多少倍。# 最终学习率为lr * lrf
    parser.add_argument('--lrf',type=float, default=0.1)# 数据集所在根目录# http://download.tensorflow.org/example_images/flower_photos.tgz
    parser.add_argument('--data-path',type=str,
                        default="/home/w180662/my_project/my_github/data_set/flower_data/flower_photos")# resnet34 官方权重下载地址# https://download.pytorch.org/models/resnet34-333f7ec4.pth
    parser.add_argument('--weights',type=str, default='resNet34.pth',help='initial weights path')# 为""表示不使用预训练模型
    parser.add_argument('--freeze-layers',type=bool, default=False)
    parser.add_argument('--device', default='cuda',help='device id (i.e. 0 or 0,1 or cpu)')

    opt= parser.parse_args()

    main(opt)

5.1.2 train_eval_utils

import sysfrom tqdmimport tqdmimport torchfrom multi_train_utils.distributed_utilsimport reduce_value, is_main_processdeftrain_one_epoch(model, optimizer, data_loader, device, epoch):
    model.train()# 声明模型的状态
    loss_function= torch.nn.CrossEntropyLoss()# 定义损失函数
    mean_loss= torch.zeros(1).to(device)# 生成shape为[1, ]的全零矩阵,并添加到对应的设备中 -> 用于存储后续计算得到mean_loss# 清空优化器中的梯度残留
    optimizer.zero_grad()# 在进程0中打印训练进度if is_main_process():# 判断是否为主进程# 使用tqdm库包装一下data_loader这个变量,一会儿在遍历data_loader时就会打印一个进度条
        data_loader= tqdm(data_loader,file=sys.stdout)# 迭代data_loader,获取step和对应的数据"""
        在data_loader中就定义了BS的大小, 所以这里step就是$step = 所有图片的数量 / BS$
        step即为BS的数量, 每走过一次step即为处理了BS张图片
        
        第1个step对应第1个bs
        第2个step对应第2个bs
        第3个step对应第3个bs
        ...
    """for step, datainenumerate(data_loader):
        images, labels= data# data包含两部分数据:1. 预处理后的图片;2. 对应的真实标签# 通过正向传播获取图片的预测结果
        pred= model(images.to(device))# 根据预测结果与真实标签计算loss
        loss= loss_function(pred, labels.to(device))# 对loss进行反向传播
        loss.backward()# 对loss进行求平均处理"""
            reduce_value:
                def reduce_value(value, average=True):
                    world_size = get_world_size()
                    if world_size < 2:  # 单GPU的情况
                        return value  # 原值返回

                    with torch.no_grad():
                        dist.all_reduce(value)
                        if average:
                            value /= world_size  # 需要除以GPU数量后再返回

                        return value
        """
        loss= reduce_value(loss, average=True)# 对历史损失求平均
        mean_loss=(mean_loss* step+ loss.detach())/(step+1)# update mean losses# 在进程0中打印平均lossif is_main_process():# 判断是否为主进程
            data_loader.desc="[epoch {}] mean loss {}".format(epoch,round(mean_loss.item(),3))# 判断loss是否为有限数据(不能是infty)ifnot torch.isfinite(loss):print('WARNING: non-finite loss, ending training ', loss)
            sys.exit(1)# 如果loss为无穷,则退出训练

        optimizer.step()# 参数优化器进行参数更新
        optimizer.zero_grad()# 参数优化器更新完参数后,需要将梯度清空# 等待所有进程计算完毕if device!= torch.device("cpu"):
        torch.cuda.synchronize(device)# 等待CUDA设备上所有流中的所有内核完成。# 返回计算求得的平均lossreturn mean_loss.item()"""
    这里使用了@torch.no_grad()这个装饰器对该方法进行修改,也可以使用 with torch.no_grad: 这个上下文管理器
"""@torch.no_grad()defevaluate(model, data_loader, device):
    model.eval()# 声明模型状态 -> 1. 关闭BN; 2. Dropout# 用于存储预测正确的样本个数
    sum_num= torch.zeros(1).to(device)# 在进程0中打印验证进度if is_main_process():
        data_loader= tqdm(data_loader,file=sys.stdout)# 在主进程中包装dataloaderfor step, datainenumerate(data_loader):
        images, labels= data
        pred= model(images.to(device))# 获取预测概率
        pred= torch.max(pred, dim=1)[1]# 获取预测概率最的max"""
        torch.eq(tensor, tensor/value)
        对两个张量Tensor进行逐元素的比较,若相同位置的两个元素相同,则返回True;若不同,返回False。
        """
        sum_num+= torch.eq(pred, labels.to(device)).sum()# 计算所有预测正确的数量# 等待所有进程计算完毕if device!= torch.device("cpu"):
        torch.cuda.synchronize(device)

    sum_num= reduce_value(sum_num, average=False)# 统计所有预测正确的数量return sum_num.item()

5.2 分布式训练

5.2.1 main

from cgiimport testimport osimport mathimport tempfileimport argparseimport torchimport torch.optimas optimimport torch.optim.lr_scheduleras lr_schedulerfrom torch.utils.tensorboardimport SummaryWriterfrom torchvisionimport transformsfrom modelimport resnet34from my_datasetimport MyDataSetfrom utilsimport read_split_data, plot_data_loader_imagefrom multi_train_utils.distributed_utilsimport init_distributed_mode, dist, cleanupfrom multi_train_utils.train_eval_utilsimport train_one_epoch, evaluatedefmain(args):if torch.cuda.is_available()isFalse:# 没有GPU设备会直接报错raise EnvironmentError("not find GPU device for training.")# 初始化各进程环境 -> args容器中多了几个参数:1. args.rank; 2. args.world_size; 3. args.gpu
    init_distributed_mode(args=args)# 将args中新增的DDP参数赋值到全局变量中
    rank= args.rank
    device= torch.device(args.device)
    batch_size= args.batch_size
    weights_path= args.weights"""
        当我们在使用多GPU并行训练时,梯度一般是将多块GPU的梯度求平均。
            在原本的单卡上,我们的每学习一个step,梯度前进1m(这里是为了方便理解),如果学习两部,则梯度前进2m
            假设我们的GPU数量为2。那么我们看起来是学习了一步,但因为有2块GPU,所以是两块一起运算,那么就是一次性
            算了两个step,但更新时我们对梯度进行了平均,所以这2个step的值只更新了依次,意味着梯度只前进了1m
            学习率变相地降低了,所以我们需要扩大学习率
    """
    args.lr*= args.world_size# 学习率要根据并行GPU的数量进行倍增 -> 这里是简单粗暴的增大学习率"""
        使用DDP时,一般的写入操作、打印操作都是放在第一个进程(主进程)中操作的(没有必要在每一个进程中执行相同的操作)
    """if rank==0:# 在第一个进程中打印信息,并实例化tensorboard(只在第一个进程中打印参数)print(args)print('Start Tensorboard with "tensorboard --logdir=runs", view at http://localhost:6006/')
        tb_writer= SummaryWriter()if os.path.exists("./weights")isFalse:
            os.makedirs("./weights")

    train_info, val_info, num_classes= read_split_data(args.data_path)
    train_images_path, train_images_label= train_info
    val_images_path
  • 作者:Le0v1n
  • 原文链接:https://blog.csdn.net/weixin_44878336/article/details/125412625
    更新时间:2022-10-17 09:55:48