Pytorch Lightning框架:使用笔记【LightningModule、LightningDataModule、Trainer、ModelCheckpoint】

2022-10-25 10:46:59

在这里插入图片描述
pytorch是有缺陷的,例如要用半精度训练、BatchNorm参数同步、单机多卡训练,则要安排一下Apex,Apex安装也是很烦啊,我个人经历是各种报错,安装好了程序还是各种报错,而pl则不同,这些全部都安排,而且只要设置一下参数就可以了。另外,根据我训练的模型,4张卡的训练速度大概提升3倍,训练效果(图像生成)好很多,真香。另外,还有一个特色,就是你的超参数全部保存到模型中,如果你要调巨多参数,那就不需要再对每个训练的模型进行参数标记了,而且恢复模型时可以直接恢复超参数,可以大大减小代码量和工作量,这点真是太香了。

Pytorch Lightning官方手册
Pytorch Lightning源码:GitHub地址
Pytorch Lightning使用案例:Pytorch-Lightning-Template项目 GitHub地址

一、Pytorch Lightning 的流程

Pytorch Lightning框架应用的流程很简单,生产流水线,有一个固定的顺序:

  • 初始化 definit(self)
  • 训练training_step(self, batch, batch_idx)
  • 校验validation_step(self, batch, batch_idx)
  • 测试 test_step(self, batch, batch_idx)

就完事了,总统是实现这三个函数的重写。

当然,除了这三个主要的,还有一些其他的函数,为了方便我们实现其他的一些功能,因此更为完整的流程是:

  • 在training_step 后面都紧跟着其相应的 training_step_end(self,batch_parts)和training_epoch_end(self, training_step_outputs) 函数;
  • validation_step 后面都紧跟着其相应的 validation_step_end(self,batch_parts)和validation_epoch_end(self, training_step_outputs) 函数;
  • test_step 后面都紧跟着其相应的 test_step_end(self,batch_parts)和 test_epoch_end(self, training_step_outputs) 函数;

这里以训练为例:

def training_step(self, batch, batch_idx):
    x, y = batch
    y_hat = self.model(x)
    loss = F.cross_entropy(y_hat, y)
    pred = ...
    return {'loss': loss, 'pred': pred}

def training_step_end(self, batch_parts):
    '''
    当gpus=0 or 1时,这里的batch_parts即为traing_step的返回值(已验证)
    当gpus>1时,这里的batch_parts为list,list中每个为training_step返回值,list[i]为i号gpu的返回值(这里未验证)
    '''
    gpu_0_prediction = batch_parts[0]['pred']
    gpu_1_prediction = batch_parts[1]['pred']

    # do something with both outputs
    return (batch_parts[0]['loss'] + batch_parts[1]['loss']) / 2

def training_epoch_end(self, training_step_outputs):
    '''
    当gpu=0 or 1时,training_step_outputs为list,长度为steps的数量(不包括validation的步数,当你训练时,你会发现返回list<训练时的steps数,这是因为训练时显示的steps数据还包括了validation的,若将limit_val_batches=0.,即关闭validation,则显示的steps会与training_step_outputs的长度相同)。list中的每个值为字典类型,字典中会存有`training_step_end()`返回的键值,键名为`training_step()`函数返回的变量名,另外还有该值是在哪台设备上(哪张GPU上),例如{device='cuda:0'}
    '''
    for out in training_step_outputs:
       # do something with preds

1、Train

训练主要是重写def training_setp(self, batch, batch_idx)函数,并返回要反向传播的loss即可,其中batch 即为从 train_dataloader 采样的一个batch的数据,batch_idx即为目前batch的索引。

def training_setp(self, batch, batch_idx):
    image, label = batch
    pred = self.forward(iamge)
    loss = ...
    # 一定要返回loss
    return loss

2、Validation(设置校验的频率)

2.1 每训练n个epochs 校验一次

默认为每1个epoch校验一次,即自动调用validation_step()函数:check_val_every_n_epoch

trainer = Trainer(check_val_every_n_epoch=1)

2.2 单个epoch内校验频率

当一个epoch 比较大时,就需要在单个epoch 内进行多次校验,这时就需要对校验的调动频率进行修改, 传入val_check_interval的参数为float型时表示百分比,为int时表示batch:

# 每训练单个epoch的 25% 调用校验函数一次,注意:要传入float型数
trainer = Trainer(val_check_interval=0.25)
# 当然也可以是单个epoch训练完多少个batch后调用一次校验函数,但是一定是传入int型
trainer = Trainer(val_check_interval=100) # 每训练100个batch校验一次

校验和训练是一样的,重写def validation_step(self, batch, batch_idx)函数,不需要返回值:

def validation_step(self, batch, batch_idx):
    image, label = batch
    pred = self.forward(iamge)
    loss = ...
    # 标记该loss,用于保存模型时监控该量
    self.log('val_loss', loss)

3、test

pytoch_lightning框架中,test 在训练过程中是不调用的,也就是说是不相关,在训练过程中只进行trainingvalidation,因此如果需要在训练过中保存validation的一些信息,就要放到validation中。

关于测试,测试是在训练完成之后的,因此这里假设已经训练完成:

# 获取恢复了权重和超参数等的模型
model = MODEL.load_from_checkpoint(checkpoint_path='my_model_path/heiheihei.ckpt')
# 修改测试时需要的参数,例如预测的步数等
model.pred_step = 1000
# 定义trainer, 其中limit_test_batches表示取测试集中的0.05的数据来做测试
trainer = pl.Trainer(gpus=1, precision=16, limit_test_batches=0.05)
# 测试,自动调用test_step(), 其中dm为数据集,放在下面讲
trainer.test(model=dck, datamodule=dm)

二、PL 的数据集

数据集有两种实现方法:

  • 直接调用第三方公开数据集(如:MNIST等数据集)
  • 自定义数据集(自己去继承torch.utils.data.dataset.Dataset,自定义类)

1、直接在Module中调用DataLoader

直接实现是指在 Model 中重写def train_dataloader(self)等函数来返回dataloader

1.1 使用现有的公平数据集

可以用现有的,例如MNIST等数据集

from torch.utils.dataimport DataLoader, random_splitimport pytorch_lightningas plclassMyExampleModel(pl.LightningModule):def__init__(self, args):super().__init__()
        dataset= MNIST(os.getcwd(), download=True, transform=transforms.ToTensor())
        train_dataset, val_dataset, test_dataset= random_split(dataset,[50000,5000,5000])

        self.train_dataset= train_dataset
        self.val_dataset= val_dataset
        self.test_dataset= test_dataset...deftrain_dataloader(self):return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=False, num_workers=0)defval_dataloader(self):return DataLoader(self.val_dataset, batch_size=self.batch_size, shuffle=False)deftest_dataloader(self):return DataLoader(self.test_dataset, batch_size=1, shuffle=True)

这样就完成了数据集和dataloader的编程了

1.2 自定义dataset

自己完成dataset的编写

# -*- coding: utf-8 -*-'''
@Description: Define the format of data used in the model.
'''import sysimport pathlibimport torchfrom torch.utils.dataimport Datasetfrom utilsimport sort_batch_by_len, source2ids

abs_path= pathlib.Path(__file__).parent.absolute()
sys.path.append(sys.path.append(abs_path))classSampleDataset(Dataset):"""
    The class represents a sample set for training.
    """def__init__(self, data_pairs, vocab):
        self.src_texts=[data_pair[0]for data_pairin data_pairs]
        self.tgt_texts=[data_pair[1]for data_pairin data_pairs]
        self.vocab= vocab
        self._len=len(data_pairs)# Keep track of how many data points.def__len__(self):return self._lendef__getitem__(self, index):# print("\nself.src_texts[{0}] = {1}".format(index, self.src_texts[index]))
        src_ids, oovs= source2ids(self.src_texts[index], self.vocab)# 将当前文本self.src_texts[index]转为ids,oovs为超出词典范围的词汇文本
        item={'x':[self.vocab.SOS]+ src_ids+[self.vocab.EOS],'y':[self.vocab.SOS]+[self.vocab[i]for iin self.tgt_texts[index]]+[self.vocab.EOS],'x_len':len(self.src_texts[index]),'y_len':len(self.tgt_texts[index]),'oovs': oovs,'len_oovs':len(oovs)}return item

2、自定义DataModule类(继承LightningDataModule)来调用DataLoader

这种方法是继承pl.LightningDataModule 来提供训练、校验、测试的数据。

from torch.utils.dataimport DataLoader, random_splitimport pytorch_lightningas plclassMyDataModule(pl.LightningDataModule):def__init__(self):super().__init__()defprepare_data(self):# 在该函数里一般实现数据集的下载等,只有cuda:0 会执行该函数# download, split, etc...# only called on 1 GPU/TPU in distributedpassdefsetup(self, stage):# make assignments here (val/train/test split)# called on every process in DDP# 实现数据集的定义,每张GPU都会执行该函数, stage 用于标记是用于什么阶段if stage=='fit'or stageisNone:
            self.train_dataset= MyDataset(self.train_file_path, self.train_file_num, transform=None)
            self.val_dataset= MyDataset(self.val_file_path, self.val_file_num, transform=None)if stage=='test'or stageisNone:
            self.test_dataset= MyDataset(self.test_file_path, self.test_file_num, transform=None)deftrain_dataloader(self):return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=False, num_workers=0)defval_dataloader(self):return DataLoader(self.val_dataset, batch_size=self.batch_size, shuffle=False)deftest_dataloader(self):return DataLoader(self.test_dataset, batch_size=1, shuffle=True)

三、PL的使用

# Training & Testif __name__=="__main__":
    parser= argparse.ArgumentParser()
    parser.add_argument("-is_train", default='yes',type=str, choices=['yes','no'])
    args= parser.parse_args()

    seed_everything(42)# 定义数据集
    data_module= GLUEDataModule(model_name_or_path=r"D:\Pretrained_Model\albert-base-v2", task_name="cola")print("args.is_train = ", args.is_train)if args.is_train=='yes':# 训练# 定义数据集为训练校验(train+validation)阶段
        data_module.setup("fit")# 初始化`ModelCheckpoint`回调,并设置要监控的量
        checkpoint_callback= ModelCheckpoint(monitor='val_loss', dirpath="saved_module", filename='sample-cola-{epoch:02d}-{val_loss:.2f}')# 定义模型
        model= GLUETransformer(model_name_or_path=r"D:\Pretrained_Model\albert-base-v2", num_labels=data_module.num_labels, eval_splits=data_module.eval_splits, task_name=data_module.task_name)# 定义logger
        logger= TensorBoardLogger('log_dir', name='test_PL')# 定义trainer
        trainer= Trainer(max_epochs=20, gpus=AVAIL_GPUS, check_val_every_n_epoch=1, callbacks=[checkpoint_callback])# 默认为每1个epoch校验一次,即自动调用validation_step()函数;将 checkpoint_callback 放到Trainer的callback 的list中# 开始训练
        trainer.fit(model=model, datamodule=data_module)# trainer.save_checkpoint(filepath=os.path.join("saved_module"))# trainer.validate(model, data_module.val_dataloader())else:# 定义数据集为测试(test)阶段
        data_module.setup("test")# 恢复模型
        model= GLUETransformer.load_from_checkpoint(checkpoint_path='saved_module/sample-cola-epoch=00-val_loss=0.53.ckpt')# 定义trainer并测试
        trainer= Trainer(gpus=AVAIL_GPUS, precision=16, limit_test_batches=0.05)
        trainer.test(model=model, datamodule=data_module)

四、模型、Trainer的保存与恢复

1、利用Trainer保存模型(在Trainer中设置default_root_dir参数)

Lightning 会自动保存最近训练的epoch的模型到当前的工作空间(or.getcwd()),也可以在定义Trainer的时候指定:

trainer= Trainer(default_root_dir='/your/path/to/save/checkpoints')

当然,也可以关闭自动保存模型:

trainer= Trainer(checkpoint_callback=False)

2、利用ModelCheckpoint (callbacks)保存模型

2.1 参数说明(所有参数均为optional)

ModelCheckpoint(
    dirpath=None,
    filename=None,
    monitor=None,
    verbose=False,
    save_last=None,
    save_top_k=1,
    save_weights_only=False,
    mode="min",
    auto_insert_metric_name=True,
    every_n_train_steps=None,
    train_time_interval=None,
    every_n_epochs=None,
    save_on_train_epoch_end=None,
    every_n_val_epochs=None)
  • dirpath: string类型。例如:dirpath=‘my/path_to_save_model/’;
  • filename: string类型;前面就说过不建议使用filepath变量,推荐使用
  • monitor:需要监控的量,string类型。例如’val_loss’(在training_step() or validation_step()函数中通过self.log(‘val_loss’, loss)进行标记);默认为None,只保存最后一个epoch的模型参数,(我的理解是只保留最后一个epoch的模型参数,但是还是每训练完一个epoch之后会保存一次,然后覆盖上一次的模型)
  • verbose:冗余模式,默认为False.
  • save_last: bool类型; 默认None,当为True时,表示在每个epoch 结果的时候,总是会保存一个模型last.ckpt,也就意味着会覆盖保存,只会有一个文件保留。
  • save_top_k:int类型;
    • 当 save_top_k==k,根据monitor监控的量,保存k个最好的模型,而最好的模型是当monitor监控的量最大时表示最好,还是最小时表示最好,在后面的参数mode中进行设置。
    • 当save_top_k==0时,不保存;
    • 当save_top_k==-1时,保存所有的模型,即每个次保存模型不进行覆盖保存,全都保存下来;
    • 当save_top_k>=2,并且在单个epoch内多次调用保存模型的函数,则模型的名字最后会追加版本号,从v0开始。
  • save_weights_only: bool 类型;True只保存模型权重(model.save_weights(flepath)),否则保存整个模型。建议保存权重就可以了,保存整个模型会消耗更多时间和存储空间。
  • mode :string类型,只能取{‘auto’, ‘min’, ‘max’}中的一个;当save_top_k!=0时,保存模型时就会覆盖保存,如果monitor监控的是val_loss等越小就表示模型越好的,这个参数应该被设置成’min’,当monitor监控的是val_acc(校验准确度)等越大就表示模型训练的越好的量,则应该设置成’max’。auto会自动根据monitor的名字来判断(auto模式是个人理解,可能会出错,例如你编程的时候,你就喜欢用val_loss表示模型准确度这样就会导致保存的模型是最差的模型了)。

2.2 ModelCheckpoint的使用

自动保存下,也可以自定义要监控的量来保存模型,步骤如下:

  • 计算需要监控的量,例如校验误差:loss
  • 使用log()函数标记该要监控的量(直接在training_step、validation_step中添加)
  • 初始化ModelCheckpoint回调,并设置要监控的量,
  • 将其传回到Trainer中
from pytorch_lightningimport Trainer, LightningDataModule, LightningModule, Callback, seed_everythingfrom pytorch_lightning.callbacksimport ModelCheckpointfrom pytorch_lightning.loggersimport TensorBoardLogger# Transformer LightningModuleclassGLUETransformer(LightningModule):...deftraining_step(self, batch, batch_idx):# 1. 计算loss
        outputs= self(**batch)
        train_loss= outputs[0]# 2. 使用log()函数标记该要监控的量,名字叫'val_loss'
        self.log('ltrain_lossoss', train_loss)return train_loss...defvalidation_step(self, batch, batch_idx, dataloader_idx=0):
        outputs= self(**batch)# 1. 计算需要监控的量
        val_loss, logits= outputs[:2]# 2. 使用log()函数标记该要监控的量,名字叫'val_loss'
        self.log('val_loss', val_loss)if self.hparams.num_labels>=1:
            preds= torch.argmax(logits, axis=1)elif self.hparams.num_labels==1:
            preds= logits.squeeze()

        labels= batch["labels"]return{"loss": val_loss,"preds": preds,"labels": labels}...# Training & Testif __name__=="__main__":
    seed_everything(42)# 定义数据集
    data_module= GLUEDataModule(model_name_or_path=r"D:\Pretrained_Model\albert-base-v2", task_name="cola")# 定义模型
    model= GLUETransformer(model_name_or_path=r"D:\Pretrained_Model\albert-base-v2", num_labels=data_module.num_labels, eval_splits=data_module.eval_splits, task_name=data_module.task_name)# 初始化`ModelCheckpoint`回调,并设置要监控的量
    checkpoint_callback= ModelCheckpoint(
        dirpath='saved_module',
        filename='sample-cola-{epoch:02d}-{val_loss:.2f}',
        monitor='val_loss')# 定义trainer
    trainer= Trainer(max_epochs=20, gpus=AVAIL_GPUS, check_val_every_n_epoch=1, callbacks=[checkpoint_callback])# 默认为每1个epoch校验一次,即自动调用validation_step()函数;将 checkpoint_callback 放到Trainer的callback 的list中# 开始训练
    trainer.fit(model=model, datamodule=data_module)

在上面的filename 参数中,定义了模型文件的保存格式,然后通过自动调用format_checkpoint_name 函数给其中的变量赋值的,返回string 类型,文件名。

>>> tmpdir= os.path.dirname(__file__)>>> ckpt= ModelCheckpoint(dirpath=tmpdir, filename='{epoch}')>>> os.path.basename(ckpt.format_checkpoint_name(0,1, metrics={}))'epoch=0.ckpt'>>> ckpt= ModelCheckpoint(dirpath=tmpdir, filename='{epoch:03d}')>>> os.path.basename(ckpt.format_checkpoint_name(5,2, metrics={}))'epoch=005.ckpt'>>> ckpt= ModelCheckpoint(dirpath=tmpdir, filename='{epoch}-{val_loss:.2f}')>>> os.path.basename(ckpt.format_checkpoint_name(2,3, metrics=dict(val_loss=0.123456)))'epoch=2-val_loss=0.12.ckpt'>>> ckpt= ModelCheckpoint(dirpath=tmpdir, filename='{missing:d}')>>> os.path.basename(ckpt.format_checkpoint_name(0,4, metrics={}))'missing=0.ckpt'>>> ckpt= ModelCheckpoint(filename='{step}')>>> os.path.basename(ckpt.format_checkpoint_name(0,0,{}))'step=0.ckpt'

2.2 获取最好的模型

因为根据上面保存的参数,可能保存了多个模型,根据best_model_path 恢复最好的模型。

from pytorch_lightningimport Trainerfrom pytorch_lightning.callbacksimport ModelCheckpoint

checkpoint_callback= ModelCheckpoint(dirpath='my/path/')
trainer= Trainer(callbacks=[checkpoint_callback])
model=...
trainer.fit(model)# 训练完成之后,保存了多个模型,下面是获得最好的模型,也就是将原来保存的模型中最好的模型权重apply到当前的网络上
checkpoint_callback.best_model_path

3、手动保存模型

除了自动保存,也可以手动保存、加载模型

model= MyLightningModule(hparams)
trainer.fit(model)
trainer.save_checkpoint("example.ckpt")
new_model= MyModel.load_from_checkpoint(checkpoint_path="example.ckpt")

当我们采用该 Pytorch Lightning 框架做强化学习的时候,由于强化学习的训练数据集不是固定的,是与环境实时交互生成的训练数据,因此在整个训练过程中,Epoch恒为0,模型就不会自动保存,这时候需要我们手动保存模型。

另外,保存的模型一般都挺大的,因此保存最好的三个模型就OK了,可以通过一个队列来进行维护,保存新的,删除旧的:

from collectionsimport dequeimport os# 维护一个队列
self.save_models= deque(maxlen=3)# 这里的self 是指这个函数放到继承了pl.LightningModule的类里,跟training_step()是同级的defmanual_save_model(self):
    model_path='your_model_save_path_%s'%(your_loss)iflen(self.save_models)>=3:# 当队列满了,取出最老的模型的路径,然后删除掉
        old_model= self.save_models.popleft()if os.path.exists(old_model):
            os.remove(old_model)# 手动保存
    self.trainer.save_checkpoint(model_path)# 将保存的模型路径加入到队列中
    self.save_models.append(model_path)

上面的函数,可以通过简单的判断,如果损失更小的,或者reward更大了,我们再调用,保存模型。

为了保险起见,我们也可以每隔一段时间就保存一个最新的模型。

这个函数是从pl的原码中抠出来的,因此保存的路径是我们前面在设置checkpoint_callbacks 的时候设置的路径,也就是本文前面ModelCheckpoint (callbacks) 这一节中的dir_path 路径,会在该路径下自动保存latest.ckpt 文件

# 保存最新的路径defsave_latest_model(self):
        checkpoint_callbacks=[cfor cin self.trainer.callbacksifisinstance(c, ModelCheckpoint)]print("Saving latest checkpoint...")
        model= self.trainer.get_model()[c.on_validation_end(self.trainer, model)for cin checkpoint_callbacks]

4、加载Checkpoint

4.1 load_from_checkpoint 方法

pl.LightningModule.load_from_checkpoint(
                                            checkpoint_path=checkpoint_path,
                                            map_location=None,
                                            hparams_file=None,
                                            strict=True,**kwargs)

该方法是从checkpoint 加载模型的主要方法。

4.2 加载模型的权重、偏置和超参数

model= MyLightingModule.load_from_checkpoint
  • 作者:u013250861
  • 原文链接:https://blog.csdn.net/u013250861/article/details/123591130
    更新时间:2022-10-25 10:46:59