在Actor对象中的方法上使用远程调用时是否阻塞? [英] Remote calls are blocking when used on methods in an Actor Object?

查看:164
本文介绍了在Actor对象中的方法上使用远程调用时是否阻塞?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

执行以下命令不能同时工作,而是先执行Run1并阻塞直到完成,然后再执行Run2.

Executing the following will not work concurrently, instead it will first execute Run1 and block until it's completed, before it will execute Run2.

@ray.remote
class Test:
    def __init__(self):
        pass

    def Run1(self):
        print('Run1 Start')
        sleep(5)
        print('Run1 End')

    def Run2(self):
        print('Run2')

ray.init()
test = Test.remote()
test.Run1.remote()
test.Run2.remote()

sleep(10)

输出:

(pid=8109) Run1 Start
(pid=8109) Run1 End
(pid=8109) Run2

这有点出乎意料.我该如何强制这些方法并发执行?

This is a bit unexpected. How can I enforce that the methods get executed concurrently?

编辑以处理后续评论:

执行双线程方法似乎不起作用.以下代码始终导致PyArrow的管道破损.我想同时运行self.PreloadSamples方法和self.Optimize方法. BufferActor类通过@ ray.remote装饰的GetSamples()方法收集并提供批处理的样本.由于GPU上的数据不可序列化,因此需要在Optimizer对象端完成此操作,我想确保并行执行该操作,而不是针对优化进行顺序操作.

Doing a dual threaded approach doesn't seem to work. The below code consistently results in broken pipes from PyArrow. I'd like to run both, the self.PreloadSamples method as well as the self.Optimize methods consistently in parallel. The BufferActor class collects and provides batched samples through the @ray.remote decorated GetSamples() method. Since data on GPU is not serializable, this needs to be done on the Optimizer object side, and I want to make sure that this gets done in parallel and not sequentially with respect to the optimization.

请参阅下面的问题的完全隔离版本,该版本在运行约1分钟后即可复制问题:

See below for a fully isolated version of the problem that replicates the issues after about 1 minute of running:

import torch
import ray
import threading
from time import sleep


def Threaded(fn):
    def wrapper(*args, **kwargs):
        thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
        thread.start()
        return thread
    return wrapper

@ray.remote
class BufferActor():
    def __init__(self):
        pass

    def GetSamples(self):
        return torch.randn(32, 100)


@ray.remote(num_gpus=1)
class OptimizerActor():
    def __init__(self, bufferActor):
        self.bufferActor = bufferActor
        self.samplesOnGPU = list()

        self.PreloadSamples()
        self.Optimize()

    @Threaded
    def PreloadSamples(self):
        #this retrieves a batch of samples (in numpy/torch format on CPU)
        if (len(self.samplesOnGPU) < 5):
            samples = ray.get(self.bufferActor.GetSamples.remote())

            self.samplesOnGPU.append(samples.to('cuda'))

            print('Samples Buffer: %s' % len(self.samplesOnGPU))
        else:
            sleep(0.01)

        self.PreloadSamples()

    @Threaded
    def Optimize(self):
        if (len(self.samplesOnGPU) > 0):
            samples = self.samplesOnGPU.pop(0)
            print('Optimizing')

            #next we perform loss calc + backprop + optimizer step (not shown)

        sleep(0.01)
        self.Optimize()



ray.init()

bufferActor = BufferActor.remote()
optimizerActor = OptimizerActor.remote(bufferActor)

sleep(60*60)

推荐答案

Actor一次执行一个方法,以避免并发问题.如果您希望与参与者并行(通常这样做),最好的方法是启动两个(或多个)参与者并向他们两个提交任务.

Actors will execute one method at a time to avoid concurrency issues. If you want parallelism with actors (which you normally do), the best way is to start two (or more) actors and submit tasks to them both.

这篇关于在Actor对象中的方法上使用远程调用时是否阻塞?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆