在Actor对象中的方法上使用远程调用时是否阻塞? [英] Remote calls are blocking when used on methods in an Actor Object?
问题描述
执行以下命令不能同时工作,而是先执行Run1并阻塞直到完成,然后再执行Run2.
Executing the following will not work concurrently, instead it will first execute Run1 and block until it's completed, before it will execute Run2.
@ray.remote
class Test:
def __init__(self):
pass
def Run1(self):
print('Run1 Start')
sleep(5)
print('Run1 End')
def Run2(self):
print('Run2')
ray.init()
test = Test.remote()
test.Run1.remote()
test.Run2.remote()
sleep(10)
输出:
(pid=8109) Run1 Start
(pid=8109) Run1 End
(pid=8109) Run2
这有点出乎意料.我该如何强制这些方法并发执行?
This is a bit unexpected. How can I enforce that the methods get executed concurrently?
编辑以处理后续评论:
执行双线程方法似乎不起作用.以下代码始终导致PyArrow的管道破损.我想同时运行self.PreloadSamples方法和self.Optimize方法. BufferActor类通过@ ray.remote装饰的GetSamples()方法收集并提供批处理的样本.由于GPU上的数据不可序列化,因此需要在Optimizer对象端完成此操作,我想确保并行执行该操作,而不是针对优化进行顺序操作.
Doing a dual threaded approach doesn't seem to work. The below code consistently results in broken pipes from PyArrow. I'd like to run both, the self.PreloadSamples method as well as the self.Optimize methods consistently in parallel. The BufferActor class collects and provides batched samples through the @ray.remote decorated GetSamples() method. Since data on GPU is not serializable, this needs to be done on the Optimizer object side, and I want to make sure that this gets done in parallel and not sequentially with respect to the optimization.
请参阅下面的问题的完全隔离版本,该版本在运行约1分钟后即可复制问题:
See below for a fully isolated version of the problem that replicates the issues after about 1 minute of running:
import torch
import ray
import threading
from time import sleep
def Threaded(fn):
def wrapper(*args, **kwargs):
thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
thread.start()
return thread
return wrapper
@ray.remote
class BufferActor():
def __init__(self):
pass
def GetSamples(self):
return torch.randn(32, 100)
@ray.remote(num_gpus=1)
class OptimizerActor():
def __init__(self, bufferActor):
self.bufferActor = bufferActor
self.samplesOnGPU = list()
self.PreloadSamples()
self.Optimize()
@Threaded
def PreloadSamples(self):
#this retrieves a batch of samples (in numpy/torch format on CPU)
if (len(self.samplesOnGPU) < 5):
samples = ray.get(self.bufferActor.GetSamples.remote())
self.samplesOnGPU.append(samples.to('cuda'))
print('Samples Buffer: %s' % len(self.samplesOnGPU))
else:
sleep(0.01)
self.PreloadSamples()
@Threaded
def Optimize(self):
if (len(self.samplesOnGPU) > 0):
samples = self.samplesOnGPU.pop(0)
print('Optimizing')
#next we perform loss calc + backprop + optimizer step (not shown)
sleep(0.01)
self.Optimize()
ray.init()
bufferActor = BufferActor.remote()
optimizerActor = OptimizerActor.remote(bufferActor)
sleep(60*60)
推荐答案
Actor一次执行一个方法,以避免并发问题.如果您希望与参与者并行(通常这样做),最好的方法是启动两个(或多个)参与者并向他们两个提交任务.
Actors will execute one method at a time to avoid concurrency issues. If you want parallelism with actors (which you normally do), the best way is to start two (or more) actors and submit tasks to them both.
这篇关于在Actor对象中的方法上使用远程调用时是否阻塞?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!