当CPU仅在pytorch中可用时,如何并行化一批样本的训练循环? [英] How to parallelize a training loop ever samples of a batch when CPU is only available in pytorch?
问题描述
我希望对单个示例或一批示例进行并行化(在我的情况下,我只有CPU,最多可以有112个)。我试过了,但我得到了一个错误,即损失不能从单独的进程中获得梯度(这完全破坏了我的尝试)。我仍然想这样做,重要的是,在多进程处理之后,我可以执行优化器步骤。我怎么才能绕过它呢?我做了一个完全自成体系的例子:
import torch
import torch.nn as nn
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import Dataset, DataLoader
from torch.multiprocessing import Pool
class SimpleDataSet(Dataset):
def __init__(self, Din, num_examples=23):
self.x_dataset = [torch.randn(Din) for _ in range(num_examples)]
# target function is x*x
self.y_dataset = [x**2 for x in self.x_dataset]
def __len__(self):
return len(self.x_dataset)
def __getitem__(self, idx):
return self.x_dataset[idx], self.y_dataset[idx]
def get_loss(args):
x, y, model = args
y_pred = model(x)
criterion = nn.MSELoss()
loss = criterion(y_pred, y)
return loss
def get_dataloader(D, num_workers, batch_size):
ds = SimpleDataSet(D)
dl = DataLoader(ds, batch_size=batch_size, num_workers=num_workers)
return dl
def train_fake_data():
num_workers = 2
Din, Dout = 3, 1
model = nn.Linear(Din, Dout).share_memory()
optimizer = torch.optim.Adam(model.parameters(), lr=0.1)
batch_size = 2
num_epochs = 10
# num_batches = 5
num_procs = 5
dataloader = get_dataloader(Din, num_workers, batch_size)
scheduler = StepLR(optimizer, step_size=1, gamma=0.7)
for epoch in range(num_epochs):
for _, batch in enumerate(dataloader):
batch = [(torch.randn(Din), torch.randn(Dout), model) for _ in batch]
with Pool(num_procs) as pool:
optimizer.zero_grad()
losses = pool.map(get_loss, batch)
loss = torch.mean(losses)
loss.backward()
optimizer.step()
# scheduler
scheduler.step()
if __name__ == '__main__':
# start = time.time()
# train()
train_fake_data()
# print(f'execution time: {time.time() - start}')
错误:
Traceback (most recent call last):
File "/Users/brando/anaconda3/envs/coq_gym/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3427, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-2-ea57e03ba088>", line 1, in <module>
runfile('/Users/brando/ML4Coq/playground/multiprocessing_playground/multiprocessing_cpu_pytorch.py', wdir='/Users/brando/ML4Coq/playground/multiprocessing_playground')
File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile
pydev_imports.execfile(filename, global_vars, local_vars) # execute the script
File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"
", file, 'exec'), glob, loc)
File "/Users/brando/ML4Coq/playground/multiprocessing_playground/multiprocessing_cpu_pytorch.py", line 95, in <module>
train_fake_data()
File "/Users/brando/ML4Coq/playground/multiprocessing_playground/multiprocessing_cpu_pytorch.py", line 83, in train_fake_data
losses = pool.map(get_loss, batch)
File "/Users/brando/anaconda3/envs/coq_gym/lib/python3.7/multiprocessing/pool.py", line 290, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/Users/brando/anaconda3/envs/coq_gym/lib/python3.7/multiprocessing/pool.py", line 683, in get
raise self._value
multiprocessing.pool.MaybeEncodingError: Error sending result: '[tensor(0.5237, grad_fn=<MseLossBackward>)]'. Reason: 'RuntimeError('Cowardly refusing to serialize non-leaf tensor which requires_grad, since autograd does not support crossing process boundaries. If you just want to transfer the data, call detach() on the tensor before serializing (e.g., putting it on the queue).')'
我确定我想这么做。我应该如何执行此操作?
使用DDP进行新尝试
"""
Based on: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html
Note: as opposed to the multiprocessing (torch.multiprocessing) package, processes can use
different communication backends and are not restricted to being executed on the same machine.
"""
import torch
from torch import nn, optim
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
import os
num_epochs = 5
batch_size = 8
Din, Dout = 10, 5
data_x = torch.randn(batch_size, Din)
data_y = torch.randn(batch_size, Dout)
data = [(i*data_x, i*data_y) for i in range(num_epochs)]
class OneDeviceModel(nn.Module):
"""
Toy example for a model ran in parallel but not distributed accross gpus
(only processes with their own gpu or hardware)
"""
def __init__(self):
super().__init__()
self.net1 = nn.Linear(Din, Din)
self.relu = nn.ReLU()
self.net2 = nn.Linear(Din, Dout)
def forward(self, x):
return self.net2(self.relu(self.net1(x)))
def setup_process(rank, world_size, backend='gloo'):
"""
Initialize the distributed environment (for each process).
gloo: is a collective communications library (https://github.com/facebookincubator/gloo). My understanding is that
it's a library/API for process to communicate/coordinate with each other/master. It's a backend library.
"""
# set up the master's ip address so this child process can coordinate
# os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# - use NCCL if you are using gpus: https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends
if torch.cuda.is_available():
backend = 'nccl'
# Initializes the default distributed process group, and this will also initialize the distributed package.
dist.init_process_group(backend, rank=rank, world_size=world_size)
def cleanup():
""" Destroy a given process group, and deinitialize the distributed package """
dist.destroy_process_group()
def run_parallel_training_loop(rank, world_size):
"""
Distributed function to be implemented later.
This is the function that is actually ran in each distributed process.
Note: as DDP broadcasts model states from rank 0 process to all other processes in the DDP constructor,
you don’t need to worry about different DDP processes start from different model parameter initial values.
"""
print()
print(f"Start running DDP with model parallel example on rank: {rank}.")
print(f'current process: {mp.current_process()}')
print(f'pid: {os.getpid()}')
setup_process(rank, world_size)
# create model and move it to GPU with id rank
model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()
# ddp_model = DDP(model, device_ids=[rank])
ddp_model = DDP(model)
for batch_idx, batch in enumerate(data):
x, y = batch
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(x)
labels = y.to(rank) if torch.cuda.is_available() else y
# Gradient synchronization communications take place during the backward pass and overlap with the backward computation.
loss_fn(outputs, labels).backward() # When the backward() returns, param.grad already contains the synchronized gradient tensor.
optimizer.step() # TODO how does the optimizer know to do the gradient step only once?
print()
print(f"Start running DDP with model parallel example on rank: {rank}.")
print(f'current process: {mp.current_process()}')
print(f'pid: {os.getpid()}')
# Destroy a given process group, and deinitialize the distributed package
cleanup()
def main():
print()
print('running main()')
print(f'current process: {mp.current_process()}')
print(f'pid: {os.getpid()}')
# args
world_size = mp.cpu_count()
mp.spawn(run_parallel_training_loop, args=(world_size,), nprocs=world_size)
if __name__ == "__main__":
print('starting __main__')
main()
print('Done!a
')
似乎可以,但我的问题在第74行,我需要这样做吗
model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()
或
model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel()
要让它在多个CPU中正常工作?
即使我有112个CPU核心,串行也比并行快?
我当前的问题是,当只有CPU可用时,CPU并行作业比串行运行的作业慢。
我想知道如何设置python和并行CPU。例如,如果我有X个CPU,我应该运行多少个进程.X?不然呢?如何选择此数字,即使其启发式很粗糙。
研究相关链接:
- https://discuss.pytorch.org/t/multiprocessing-for-loop-on-cpu/59836
- How to use multiprocessing in PyTorch?
- https://discuss.pytorch.org/t/how-to-parallelize-a-loop-over-the-samples-of-a-batch/32698/7
- https://www.reddit.com/r/pytorch/comments/sm073v/how_to_parallelize_a_training_loop_ever_samples/
推荐答案
TORCH将使用多个cpu并行化操作,因此您的串行可能正在使用多核矢量化。
举个简单的例子
import torch
c = 0;
for i in range(10000):
A = torch.randn(1000, 1000, device='cpu');
B = torch.randn(1000, 1000, device='cpu');
c += torch.sum(A @ B)
没有使用任何代码进行并行化,但是使用默认配置的12个CPU中有80%。
您可以使用torch.set_num_threads在CPU上设置操作内并行性。特别是,如果您正在运行多个进程,并且希望每个进程使用单个CPU,则可能需要在每个进程中将操作内并行度设置为1
。
但是,将操作并行化是有代价的,我无法详细介绍实现细节,但我们可以运行一个快速实验,显示使用多个线程的开销。
import matplotlib.pyplot as plt
import numpy as np
import torch;
import time;
A = torch.randn(1000, 1000, device='cpu');
B = torch.randn(1000, 1000, device='cpu');
funcs = {
'sin': lambda a,b: torch.sin(A),
'tanh': lambda a,b: torch.tanh(A),
'log': lambda a,b: torch.log(A),
'matmul': lambda a,b: A @ B.T
}
t = np.zeros(20)
for k,f in funcs.items():
for i in range(1, len(t) + 1):
torch.set_num_threads(i)
c = 0;
t0 = time.time();
for _ in range(100):
f(A,B)
tf = time.time()
t[i-1] = (tf - t0)*i;
plt.plot(np.arange(1, len(t)+1), t, '-o', label=k)
plt.xlabel('Number of threads')
plt.legend()
plt.ylabel('Core x time')
使用并行化时,操作往往运行得更快
但如果我们将总CPU时间乘以线程数,我们会发现单线程版本的效率更高。
如果您能够通过运行独立的进程在更高的级别上并行您的实验,则应该尝试为每个进程使用一个内核,否则每个进程将尝试使用所有CPU,并且所有CPU都会因为系统过载而运行得非常慢。
调整DDP示例
我故意修改了您的示例脚本的超参数,使其加权有利于多进程。
- 相对较少的初始化开销
- 进程之间的通信相对较少
"""
Based on: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html
Note: as opposed to the multiprocessing (torch.multiprocessing) package, processes can use
different communication backends and are not restricted to being executed on the same machine.
"""
import torch
from torch import nn, optim
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
import argparse
import os
# More than one epoch so that the initialization is less significant
# than compared to the model processing time
num_epochs = 10
# for the experiment select a number that has a lot of divisors
# as I want to test with equal number of batches
num_batches = 16*9*5
# Uses a larger batch so that more work is done in each process
# between two gradient synchronizations
# apparently the intraop optimization is not helping
# (at least not too much) in the batch dimension
batch_size = 10000
# Use smaller dimensions, so that the intraop parallelization becomes less
# helpful
Din, Dout = 3, 5
data_x = torch.randn(batch_size, Din)
data_y = torch.randn(batch_size, Dout)
data = [(i*data_x, i*data_y) for i in range(num_batches)]
class OneDeviceModel(nn.Module):
"""
Toy example for a model ran in parallel but not distributed accross gpus
(only processes with their own gpu or hardware)
"""
def __init__(self):
super().__init__()
# -- Use more layers
self.net = [nn.Linear(Din, Din) for _ in range(10)]
# -- Bob: use more complex activation
self.tanh = nn.Tanh()
self.sigmoid = nn.Sigmoid()
self.relu = nn.ReLU()
self.net2 = nn.Linear(Din, Dout)
def forward(self, x):
# apply the 10 layers sequentially
for i in range(10):
x = self.net[i](x)
x = self.sigmoid(x)
x = self.tanh(x)
x = self.relu(x)
return self.net2(x)
def setup_process(rank, world_size, backend='gloo'):
"""
Initialize the distributed environment (for each process).
gloo: is a collective communications library (https://github.com/facebookincubator/gloo). My understanding is that
it's a library/API for process to communicate/coordinate with each other/master. It's a backend library.
"""
# set up the master's ip address so this child process can coordinate
# os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# - use NCCL if you are using gpus: https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends
if torch.cuda.is_available():
backend = 'nccl'
# Initializes the default distributed process group, and this will also initialize the distributed package.
dist.init_process_group(backend, rank=rank, world_size=world_size)
def cleanup():
""" Destroy a given process group, and deinitialize the distributed package """
dist.destroy_process_group()
def run_parallel_training_loop(rank, world_size):
"""
Distributed function to be implemented later.
This is the function that is actually ran in each distributed process.
Note: as DDP broadcasts model states from rank 0 process to all other processes in the DDP constructor,
you don’t need to worry about different DDP processes start from different model parameter initial values.
"""
print()
print(f"Start running DDP with model parallel example on rank: {rank}.")
print(f'current process: {mp.current_process()}')
print(f'pid: {os.getpid()}')
setup_process(rank, world_size)
torch.set_num_threads(mp.cpu_count() // world_size)
# create model and move it to GPU with id rank
model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()
# ddp_model = DDP(model, device_ids=[rank])
ddp_model = DDP(model)
for _ in range(num_epochs):
for batch_idx, batch in enumerate(data[rank::world_size]):
x, y = batch
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(x)
labels = y.to(rank) if torch.cuda.is_available() else y
# Gradient synchronization communications take place during the backward pass and overlap with the backward computation.
loss_fn(outputs, labels).backward() # When the backward() returns, param.grad already contains the synchronized gradient tensor.
optimizer.step() # TODO how does the optimizer know to do the gradient step only once?
print()
print(f"Start running DDP with model parallel example on rank: {rank}.")
print(f'current process: {mp.current_process()}')
print(f'pid: {os.getpid()}')
# Destroy a given process group, and deinitialize the distributed package
cleanup()
def main():
print()
print('running main()')
print(f'current process: {mp.current_process()}')
print(f'pid: {os.getpid()}')
parser = argparse.ArgumentParser()
parser.add_argument('--world-size', default=1, type=int)
args = parser.parse_args()
assert num_batches % args.world_size == 0
mp.spawn(run_parallel_training_loop, args=(args.world_size,), nprocs=args.world_size)
if __name__ == "__main__":
print('starting __main__')
main()
print('Done!a
')
$ time python3 ddp.py --world-size 1 > /dev/null
real 0m59.092s
user 8m46.589s
sys 0m7.320s
$ time python3 ddp.py --world-size 1 > /dev/null
real 1m11.124s
user 10m54.209s
sys 0m9.595s
$ time python3 ddp.py --world-size 6 > /dev/null
real 0m18.348s
user 2m28.799s
sys 0m18.068s
$ time python3 ddp.py --world-size 12 > /dev/null
real 0m26.352s
user 4m3.074s
sys 0m39.179s
$ time python3 ddp.py --world-size 3 > /dev/null
real 0m23.047s
user 3m51.172s
sys 0m11.483s
$ time python3 ddp.py --world-size 4 > /dev/null
real 0m18.195s
user 2m55.241s
sys 0m12.841s
$ time python3 ddp.py --world-size 2 > /dev/null
real 0m26.955s
user 4m15.837s
sys 0m7.127s
如果我删除该行
torch.set_num_threads(mp.cpu_count() // world_size)
$ time python3 ddp.py --world-size 4 > /dev/null
real 0m40.574s
user 6m39.176s
sys 0m19.025s
$ time python3 ddp.py --world-size 2 > /dev/null
real 0m28.066s
user 3m17.775s
sys 0m8.410s
$ time python3 ddp.py --world-size 1 > /dev/null
real 0m37.114s
user 2m19.743s
sys 0m4.866s
使用
torch.set_num_threads(mp.cpu_count() // world_size // 2)
$ time python3 ddp.py --world-size 6 > /dev/null
real 0m16.399s
user 1m38.915s
sys 0m20.780s
$ time python3 ddp.py --world-size 4 > /dev/null
real 0m15.649s
user 1m1.821s
sys 0m13.589s
$ time python3 ddp.py --world-size 3 > /dev/null
real 0m16.947s
user 1m29.696s
sys 0m10.069s
$ time python3 ddp.py --world-size 2 > /dev/null
real 0m21.851s
user 2m4.564s
sys 0m7.486s
我的意见
单个节点中的DDP似乎不是特别有利。除非您的模型完成了大量工作,而pytorch内部操作并行性特别不能很好地处理它,否则您需要大批量处理,并且最好使用参数较少、操作较多的模型,这意味着对同步的梯度较小,例如,非常大输入上的卷积模型。DDP可能有帮助的其他情况是,您在模型中使用了过多的python,而不是矢量化操作。
这篇关于当CPU仅在pytorch中可用时,如何并行化一批样本的训练循环?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!