当CPU仅在pytorch中可用时,如何并行化一批样本的训练循环? [英] How to parallelize a training loop ever samples of a batch when CPU is only available in pytorch?

查看:15
本文介绍了当CPU仅在pytorch中可用时,如何并行化一批样本的训练循环?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望对单个示例或一批示例进行并行化(在我的情况下,我只有CPU,最多可以有112个)。我试过了,但我得到了一个错误,即损失不能从单独的进程中获得梯度(这完全破坏了我的尝试)。我仍然想这样做,重要的是,在多进程处理之后,我可以执行优化器步骤。我怎么才能绕过它呢?我做了一个完全自成体系的例子:


import torch
import torch.nn as nn
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import Dataset, DataLoader

from torch.multiprocessing import Pool

class SimpleDataSet(Dataset):

    def __init__(self, Din, num_examples=23):
        self.x_dataset = [torch.randn(Din) for _ in range(num_examples)]
        # target function is x*x
        self.y_dataset = [x**2 for x in self.x_dataset]

    def __len__(self):
        return len(self.x_dataset)

    def __getitem__(self, idx):
        return self.x_dataset[idx], self.y_dataset[idx]

def get_loss(args):
    x, y, model = args
    y_pred = model(x)
    criterion = nn.MSELoss()
    loss = criterion(y_pred, y)
    return loss

def get_dataloader(D, num_workers, batch_size):
    ds = SimpleDataSet(D)
    dl = DataLoader(ds, batch_size=batch_size, num_workers=num_workers)
    return dl

def train_fake_data():
    num_workers = 2
    Din, Dout = 3, 1
    model = nn.Linear(Din, Dout).share_memory()

    optimizer = torch.optim.Adam(model.parameters(), lr=0.1)

    batch_size = 2
    num_epochs = 10
    # num_batches = 5
    num_procs = 5
    dataloader = get_dataloader(Din, num_workers, batch_size)
    scheduler = StepLR(optimizer, step_size=1, gamma=0.7)
    for epoch in range(num_epochs):
        for _, batch in enumerate(dataloader):
            batch = [(torch.randn(Din), torch.randn(Dout), model) for _ in batch]
            with Pool(num_procs) as pool:
                optimizer.zero_grad()

                losses = pool.map(get_loss, batch)
                loss = torch.mean(losses)
                loss.backward()

                optimizer.step()
            # scheduler
            scheduler.step()


if __name__ == '__main__':
    # start = time.time()
    # train()
    train_fake_data()
    # print(f'execution time: {time.time() - start}')

错误:

Traceback (most recent call last):
  File "/Users/brando/anaconda3/envs/coq_gym/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3427, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-2-ea57e03ba088>", line 1, in <module>
    runfile('/Users/brando/ML4Coq/playground/multiprocessing_playground/multiprocessing_cpu_pytorch.py', wdir='/Users/brando/ML4Coq/playground/multiprocessing_playground')
  File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile
    pydev_imports.execfile(filename, global_vars, local_vars)  # execute the script
  File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"
", file, 'exec'), glob, loc)
  File "/Users/brando/ML4Coq/playground/multiprocessing_playground/multiprocessing_cpu_pytorch.py", line 95, in <module>
    train_fake_data()
  File "/Users/brando/ML4Coq/playground/multiprocessing_playground/multiprocessing_cpu_pytorch.py", line 83, in train_fake_data
    losses = pool.map(get_loss, batch)
  File "/Users/brando/anaconda3/envs/coq_gym/lib/python3.7/multiprocessing/pool.py", line 290, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/Users/brando/anaconda3/envs/coq_gym/lib/python3.7/multiprocessing/pool.py", line 683, in get
    raise self._value
multiprocessing.pool.MaybeEncodingError: Error sending result: '[tensor(0.5237, grad_fn=<MseLossBackward>)]'. Reason: 'RuntimeError('Cowardly refusing to serialize non-leaf tensor which requires_grad, since autograd does not support crossing process boundaries.  If you just want to transfer the data, call detach() on the tensor before serializing (e.g., putting it on the queue).')'

我确定我想这么做。我应该如何执行此操作?


使用DDP进行新尝试

"""
Based on: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

Note: as opposed to the multiprocessing (torch.multiprocessing) package, processes can use
different communication backends and are not restricted to being executed on the same machine.
"""
import torch
from torch import nn, optim
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP

import os

num_epochs = 5
batch_size = 8
Din, Dout = 10, 5
data_x = torch.randn(batch_size, Din)
data_y = torch.randn(batch_size, Dout)
data = [(i*data_x, i*data_y) for i in range(num_epochs)]

class OneDeviceModel(nn.Module):
    """
    Toy example for a model ran in parallel but not distributed accross gpus
    (only processes with their own gpu or hardware)
    """
    def __init__(self):
        super().__init__()
        self.net1 = nn.Linear(Din, Din)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(Din, Dout)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))

def setup_process(rank, world_size, backend='gloo'):
    """
    Initialize the distributed environment (for each process).

    gloo: is a collective communications library (https://github.com/facebookincubator/gloo). My understanding is that
    it's a library/API for process to communicate/coordinate with each other/master. It's a backend library.
    """
    # set up the master's ip address so this child process can coordinate
    # os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # - use NCCL if you are using gpus: https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends
    if torch.cuda.is_available():
        backend = 'nccl'
    # Initializes the default distributed process group, and this will also initialize the distributed package.
    dist.init_process_group(backend, rank=rank, world_size=world_size)

def cleanup():
    """ Destroy a given process group, and deinitialize the distributed package """
    dist.destroy_process_group()

def run_parallel_training_loop(rank, world_size):
    """
    Distributed function to be implemented later.

    This is the function that is actually ran in each distributed process.

    Note: as DDP broadcasts model states from rank 0 process to all other processes in the DDP constructor,
    you don’t need to worry about different DDP processes start from different model parameter initial values.
    """
    print()
    print(f"Start running DDP with model parallel example on rank: {rank}.")
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    setup_process(rank, world_size)

    # create model and move it to GPU with id rank
    model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()
    # ddp_model = DDP(model, device_ids=[rank])
    ddp_model = DDP(model)

    for batch_idx, batch in enumerate(data):
        x, y = batch
        loss_fn = nn.MSELoss()
        optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

        optimizer.zero_grad()
        outputs = ddp_model(x)
        labels = y.to(rank) if torch.cuda.is_available() else y
        # Gradient synchronization communications take place during the backward pass and overlap with the backward computation.
        loss_fn(outputs, labels).backward()  # When the backward() returns, param.grad already contains the synchronized gradient tensor.
        optimizer.step()  # TODO how does the optimizer know to do the gradient step only once?

    print()
    print(f"Start running DDP with model parallel example on rank: {rank}.")
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    # Destroy a given process group, and deinitialize the distributed package
    cleanup()

def main():
    print()
    print('running main()')
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    # args
    world_size = mp.cpu_count()
    mp.spawn(run_parallel_training_loop, args=(world_size,), nprocs=world_size)

if __name__ == "__main__":
    print('starting __main__')
    main()
    print('Done!a
')

似乎可以,但我的问题在第74行,我需要这样做吗

    model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()

    model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel()

要让它在多个CPU中正常工作?


即使我有112个CPU核心,串行也比并行快?

我当前的问题是,当只有CPU可用时,CPU并行作业比串行运行的作业慢。


我想知道如何设置python和并行CPU。例如,如果我有X个CPU,我应该运行多少个进程.X?不然呢?如何选择此数字,即使其启发式很粗糙。


研究相关链接:

推荐答案

TORCH将使用多个cpu并行化操作,因此您的串行可能正在使用多核矢量化。

举个简单的例子

import torch
c = 0;
for i in range(10000):
    A = torch.randn(1000, 1000, device='cpu');
    B = torch.randn(1000, 1000, device='cpu');
    c += torch.sum(A @ B)

没有使用任何代码进行并行化,但是使用默认配置的12个CPU中有80%。

您可以使用torch.set_num_threads在CPU上设置操作内并行性。特别是,如果您正在运行多个进程,并且希望每个进程使用单个CPU,则可能需要在每个进程中将操作内并行度设置为1

但是,将操作并行化是有代价的,我无法详细介绍实现细节,但我们可以运行一个快速实验,显示使用多个线程的开销。

import matplotlib.pyplot as plt
import numpy as np
import torch;
import time;
A = torch.randn(1000, 1000, device='cpu');
B = torch.randn(1000, 1000, device='cpu');
funcs = {
    'sin': lambda a,b: torch.sin(A),
    'tanh': lambda a,b: torch.tanh(A),
    'log': lambda a,b: torch.log(A),
    'matmul': lambda a,b: A @ B.T
}
t = np.zeros(20)
for k,f in funcs.items():
    for i in range(1, len(t) + 1):
        torch.set_num_threads(i)
        c = 0;
        t0 = time.time();
        for _ in range(100):
            f(A,B)
        tf = time.time()
        t[i-1] = (tf - t0)*i;
    plt.plot(np.arange(1, len(t)+1), t, '-o', label=k)
plt.xlabel('Number of threads')
plt.legend()
plt.ylabel('Core x time')
使用并行化时,操作往往运行得更快

但如果我们将总CPU时间乘以线程数,我们会发现单线程版本的效率更高。

如果您能够通过运行独立的进程在更高的级别上并行您的实验,则应该尝试为每个进程使用一个内核,否则每个进程将尝试使用所有CPU,并且所有CPU都会因为系统过载而运行得非常慢。

调整DDP示例

我故意修改了您的示例脚本的超参数,使其加权有利于多进程。

  • 相对较少的初始化开销
  • 进程之间的通信相对较少
"""
Based on: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

Note: as opposed to the multiprocessing (torch.multiprocessing) package, processes can use
different communication backends and are not restricted to being executed on the same machine.
"""
import torch
from torch import nn, optim
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
import argparse
import os

# More than one epoch so that the initialization is less significant
# than compared to the model processing time
num_epochs = 10
# for the experiment select a number that has a lot of divisors
# as I want to test with equal number of batches
num_batches = 16*9*5
# Uses a larger batch so that more work is done in each process
# between two gradient synchronizations
# apparently the intraop optimization is not helping 
# (at least not too much) in the batch dimension
batch_size = 10000
# Use smaller dimensions, so that the intraop parallelization becomes less 
# helpful
Din, Dout = 3, 5
data_x = torch.randn(batch_size, Din)
data_y = torch.randn(batch_size, Dout)
data = [(i*data_x, i*data_y) for i in range(num_batches)]

class OneDeviceModel(nn.Module):
    """
    Toy example for a model ran in parallel but not distributed accross gpus
    (only processes with their own gpu or hardware)
    """
    def __init__(self):
        super().__init__()
        # -- Use more layers
        self.net = [nn.Linear(Din, Din) for _ in range(10)]
        # -- Bob: use more complex activation  
        self.tanh = nn.Tanh()
        self.sigmoid = nn.Sigmoid()
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(Din, Dout)

    def forward(self, x):
      # apply the 10 layers sequentially
      for i in range(10):
        x = self.net[i](x)
        x = self.sigmoid(x)
        x = self.tanh(x)
        x = self.relu(x)
      return self.net2(x)

def setup_process(rank, world_size, backend='gloo'):
    """
    Initialize the distributed environment (for each process).

    gloo: is a collective communications library (https://github.com/facebookincubator/gloo). My understanding is that
    it's a library/API for process to communicate/coordinate with each other/master. It's a backend library.
    """
    # set up the master's ip address so this child process can coordinate
    # os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # - use NCCL if you are using gpus: https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends
    if torch.cuda.is_available():
        backend = 'nccl'
    # Initializes the default distributed process group, and this will also initialize the distributed package.
    dist.init_process_group(backend, rank=rank, world_size=world_size)

def cleanup():
    """ Destroy a given process group, and deinitialize the distributed package """
    dist.destroy_process_group()

def run_parallel_training_loop(rank, world_size):
    """
    Distributed function to be implemented later.

    This is the function that is actually ran in each distributed process.

    Note: as DDP broadcasts model states from rank 0 process to all other processes in the DDP constructor,
    you don’t need to worry about different DDP processes start from different model parameter initial values.
    """
    print()
    print(f"Start running DDP with model parallel example on rank: {rank}.")
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    setup_process(rank, world_size)
    torch.set_num_threads(mp.cpu_count() // world_size)
    # create model and move it to GPU with id rank
    model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()
    # ddp_model = DDP(model, device_ids=[rank])
    ddp_model = DDP(model)
    for _ in range(num_epochs):
      for batch_idx, batch in enumerate(data[rank::world_size]):
          x, y = batch
          loss_fn = nn.MSELoss()
          optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

          optimizer.zero_grad()
          outputs = ddp_model(x)
          labels = y.to(rank) if torch.cuda.is_available() else y
          # Gradient synchronization communications take place during the backward pass and overlap with the backward computation.
          loss_fn(outputs, labels).backward()  # When the backward() returns, param.grad already contains the synchronized gradient tensor.
          optimizer.step()  # TODO how does the optimizer know to do the gradient step only once?

    print()
    print(f"Start running DDP with model parallel example on rank: {rank}.")
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    # Destroy a given process group, and deinitialize the distributed package
    cleanup()

def main():
    print()
    print('running main()')
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    parser = argparse.ArgumentParser()
    parser.add_argument('--world-size', default=1, type=int)
    args = parser.parse_args()
    assert num_batches % args.world_size == 0
    mp.spawn(run_parallel_training_loop, args=(args.world_size,), nprocs=args.world_size)

if __name__ == "__main__":
    print('starting __main__')
    main()
    print('Done!a
')
$ time python3 ddp.py --world-size 1 > /dev/null

real    0m59.092s
user    8m46.589s
sys     0m7.320s

$ time python3 ddp.py --world-size 1 > /dev/null

real    1m11.124s
user    10m54.209s
sys     0m9.595s

$ time python3 ddp.py --world-size 6 > /dev/null

real    0m18.348s
user    2m28.799s
sys     0m18.068s
$ time python3 ddp.py --world-size 12 > /dev/null

real    0m26.352s
user    4m3.074s
sys     0m39.179s
$ time python3 ddp.py --world-size 3 > /dev/null

real    0m23.047s
user    3m51.172s
sys     0m11.483s
$ time python3 ddp.py --world-size 4 > /dev/null

real    0m18.195s
user    2m55.241s
sys     0m12.841s
$ time python3 ddp.py --world-size 2 > /dev/null

real    0m26.955s
user    4m15.837s
sys     0m7.127s

如果我删除该行

torch.set_num_threads(mp.cpu_count() // world_size)
$ time python3 ddp.py --world-size 4 > /dev/null

real    0m40.574s
user    6m39.176s
sys     0m19.025s

$ time python3 ddp.py --world-size 2 > /dev/null

real    0m28.066s
user    3m17.775s
sys     0m8.410s

$ time python3 ddp.py --world-size 1 > /dev/null

real    0m37.114s
user    2m19.743s
sys     0m4.866s

使用

torch.set_num_threads(mp.cpu_count() // world_size // 2)
$ time python3 ddp.py --world-size 6 > /dev/null

real    0m16.399s
user    1m38.915s
sys     0m20.780s

$ time python3 ddp.py --world-size 4 > /dev/null

real    0m15.649s
user    1m1.821s
sys     0m13.589s

$ time python3 ddp.py --world-size 3 > /dev/null

real    0m16.947s
user    1m29.696s
sys     0m10.069s

$ time python3 ddp.py --world-size 2 > /dev/null

real    0m21.851s
user    2m4.564s
sys     0m7.486s

我的意见

单个节点中的DDP似乎不是特别有利。除非您的模型完成了大量工作,而pytorch内部操作并行性特别不能很好地处理它,否则您需要大批量处理,并且最好使用参数较少、操作较多的模型,这意味着对同步的梯度较小,例如,非常大输入上的卷积模型。

DDP可能有帮助的其他情况是,您在模型中使用了过多的python,而不是矢量化操作。

这篇关于当CPU仅在pytorch中可用时,如何并行化一批样本的训练循环?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆