PyTorch多机多GPU并行训练方法及问题整理

2022-10-04 14:07:44

       在单机多gpu可以满足的情况下, 绝对不建议使用多机多gpu进行训练, 多台机器之间传输数据的时间非常慢, 如果机器只是千兆网卡, 再加上别的一些损耗, 网络的传输速度跟不上, 会导致训练速度实际很慢。

1 初始化

       初始化操作一般在程序刚开始的时候进行。

       在进行多机多gpu运行训练的时候,需要先使用torch.distributed.init_process_group()进行初始化。torch.distributed.init_process_group()包含四个常用的参数

backend:后端,实际上是多个机器之间交换数据的协议

init_method:机器之间交换数据,需要指定一个主节点,而这个参数就是指定主节点的

world_size:介绍都是说是进程,实际就是机器的个数,例如两台机器一起训练的话,设置数值为2

rank:区分主节点和从节点,主节点为0,剩余的为了1-(N-1),N为要使用的机器的数量,即                   world_size。

1.1 初始化backend

       pytorch的官方教程提供的后端有:gloo、mpi、nccl,详细可参考PyTorch多GPU并行训练方法及问题

       根据官网的介绍, 如果是使用cpu的分布式计算, 建议使用gloo, 因为表中可以看到gloo对cpu的支持是最好的, 然后如果使用gpu进行分布式计算, 建议使用nccl, 实际测试中我也感觉到, 当使用gpu的时候,nccl的效率是高于gloo的。 根据博客和官网的态度, 好像都不怎么推荐在多gpu的时候使用mpi。

       对于后端选择好了之后,我们需要设置一下网络接口,因为多个主机之间肯定是使用网络进行交换,那肯定就涉及到ip之类的,对于 nccl 和 gloo 一般会自己寻找网络接口,但是某些时候,可能会需要自己手动设置。设置的方法也比较简单,在Python的代码中,使用了下面的代码进行设置就行:

import os
#以下二选一,第一个是使用gloo后端需要设置的,第二个是使用nccl需要设置的

os.environ['GLOO_SOCKET_IFNAME'] = 'eth0'

os.environ['NCCL_SOCKET_IFNAME'] = 'eth0'

       我们如何知道自己的网络接口,打开命令行,然后输入 ifconfig ,然后找到那个带自己ip地址的就是了,一般就是em0 ,eth0 ,esp2s0 之类的,具体的根据自己的填写。如果没有装 ipconfig ,输入命令会报错,根据报错提示安装一个就行了。

1.2 初始化init_method

       初始化init_method的方法有两种, 一种是使用TCP进行初始化, 另外一种是使用共享文件系统进行初始化

1.2.1 使用TCP初始化

代码如下:

import torch.distributed as dist

dist.init_process_group(backend,init_method='tcp://10.1.1.20:23456',
                        rank = rank,world_size = world_size)

       注意这里的使用格式为tcp://ip:端口号,首先 ip 地址是你的主节点的 ip 地址,也就是 rank 参数为0的那个主机的 ip 地址,然后再选择一个空闲的端口号,这样就可以初始化 init_method 了。

1.2.2 使用共享文件系统初始化

       有些人并不推荐这种方法, 因为这个方法好像比TCP初始化要没法, 搞不好和你硬盘的格式还有关系, 特别是window的硬盘格式和Ubuntu的还不一样, 代码如下:

import torch.distributed as dist

dist.init_process_group(backend,init_method = 'file:///mnt/nfs/sharedfile',
                        rank=rank,world_size=world_size)

       根据官网介绍, 要注意提供的共享文件一开始应该是不存在的, 但是这个方法又不会在自己执行结束删除文件, 所以下次再进行初始化的时候, 需要手动删除上次的文件, 所以比较麻烦, 而且官网给了一堆警告, 再次说明了这个方法不如TCP初始化的简单。

1.3 初始化rank 和 world_size

       需要确保不同机器的 rank 值不同,但主机的 rank 必须为0,而且使用 init_method 的 ip 一定是 rank 为 0 的主机,其次world_size是你的主机数量, 你不能随便设置这个数值, 你的参与训练的主机数量达不到world_size的设置值时, 代码是不会执行的。

1.4 初始化中一些需要注意的地方

       首先是代码的统一性,所有的节点上面的代码,建议完全一样,不然有可能会出现一些问题,其次,这些初始化的参数强烈建议通过 argparse 模块(命令行参数的形式)输入,不建议写死在代码中,也不建议使用pycharm 之类的 IDE 进行代码的运行,强烈建议使用命令行直接运行。

其次是运行代码的命令方面的问题,例如使用下面的命令运行代码distributed.py:

python distributed.py -bk nccl -im tcp://10.10.10.1:12345 -rn 1 -ws 2

       一定要注意的是,只能修改 rank 的值,其他的值一律不能修改,否则程序就卡死了,初始化到这里也就结束了。

2 数据的处理——DataLoader

       其实数据的处理和正常的代码的数据处理非常类似,但是因为多机多卡涉及到了效率问题,所以这里才会使用 torch.utils.data.distributed.DistributedSampler来规避数据传输的问题,首先看下面的代码:

print("Initialize Dataloaders...")
# Define the transform for the data.Notice,we must resize to 224x224 with this dataset and # model

transform = transforms.Compose(
      [transforms.Resize(224),
       transforms.ToTensor(),
       transforms.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5))])


# Initialize Datasets. STL10 will automatically download if not present
trainset = datasets.STL10(root='./data', split='train', download=True, transform=transform)
valset = datasets.STL10(root='./data', split='test', download=True, transform=transform)


#Create DistributedSampler to handle distributing the dataset across nodes when training
#This can only be called after torch.distributed.init_process_group is called
#这一句就是和平时使用有点不一样的地方
train_sampler = torch.utils.data.distributed.DistributedSampler(trainset)

# Create the Dataloaders to feed data to the training and validation steps
train_loader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=(train_sampler is None), num_workers=workers, pin_memory=False, sampler=train_sampler)
val_loader = torch.utils.data.DataLoader(valset, batch_size=batch_size, shuffle=False, num_workers=workers, pin_memory=False)

       单独看这段代码,和平时写的类似,唯一不一样的其实就是这里先将 trainset 送到了DistributedSampler 中创造了一个train_sampler ,然后在构造train_loader 的时候,参数传入了一个sampler = train_sampler 。使用这些的意图是,让不同节点的机器加载自己本地的数据进行训练,也就是说进行多机多卡训练的时候,不再是从主节点分发数据到各个从节点,而是各个从节点自己从自己的硬盘上读取数据。

       当然了,如果直接让各个节点自己读取自己的数据,特别是在训练的时候,经常是要打乱数据集进行训练的,这样就会导致不同的节点加载的数据混乱,所以这个时候使用DistributedSampler 来创造一个sampler 提供给DataLoader,sampler的作用自定义一个数据的编号,然后让DataLoader按照这个编号来提取数据放入到模型中训练,其中sampler 参数和shuffle参数不能同时指定,如果这个时候还想要可以随机的输入数据,我们可以在DistributedSampler 中指定shuffle参数,具体的可以参考官网的 api,拉到最后就是DistributedSampler

3 模型的处理

注意要提前把模型加载到gpu ,然后才可以加载到DistributedDataParallel

model = model.cuda()

model = nn.parallel.DistributedDataParallel(model)

4 模型的保存于加载

这里引用pytorch官方教程的一段代码:

def demo_checkpoint(rank,world_size):
    setup(rank,world_size)

    #setup devices for this process,rank 1 uses GPUs [0,1,2,3] and 
    #rank 2 uses GPUs [4,5,6,7]
    n = torch.cuda.device_count() // world_size
    device_ids = list(range(rank*n,(rank+1)*n))


    model = ToyModel().to(device_ids[0])
    # output_device defaults to device_ids[0]
    ddp_model = DDP(model, device_ids=device_ids)

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
    if rank == 0:
        # All processes should see same parameters as they all start from same
        # random parameters and gradients are synchronized in backward passes.
        # Therefore, saving it in one process is sufficient.
        torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)

    # Use a barrier() to make sure that process 1 loads the model after process
    # 0 saves it.
    dist.barrier()
    # configure map_location properly
    rank0_devices = [x - rank * len(device_ids) for x in device_ids]
    device_pairs = zip(rank0_devices, device_ids)
    map_location = {'cuda:%d' % x: 'cuda:%d' % y for x, y in device_pairs}
    ddp_model.load_state_dict(
        torch.load(CHECKPOINT_PATH, map_location=map_location))

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(device_ids[0])
    loss_fn = nn.MSELoss()
    loss_fn(outputs, labels).backward()
    optimizer.step()

    # Use a barrier() to make sure that all processes have finished reading the
    # checkpoint
    dist.barrier()

    if rank == 0:
        os.remove(CHECKPOINT_PATH)

    cleanup()

        看上面的代码,最重要的实际是这句 dist.barrier(), 这个是来自 torch.distributed.barrier(),根据pytorch的官网介绍,这个函数的功能是同步所有的进程,直到整组(也就是所有节点的所有GPU)到达这个函数的时候,才会执行后面的代码,看上面的代码,可以看到,在保存模型的时候,是只找 rank 为0的点保存模型,然后再加载模型的时候,首先得让所有的节点同步一下,然后给所有的节点加载上模型,然后在进行下一步的时候,还要同步一下,保证所有的节点都读完了模型,官网说不这样做会导致一些问题。

       保存模型的时候,是保存哪些节点上面的模型,pytorch推荐的是 rank = 0 的节点,然后也有人会保存所有节点的模型,然后进行计算。

  • 作者:L_bloomer
  • 原文链接:https://blog.csdn.net/qq_44289607/article/details/123332574
    更新时间:2022-10-04 14:07:44