在RxPY中使用from_iterable/range进行subscribe_on [英] subscribe_on with from_iterable/range in RxPY

查看:177
本文介绍了在RxPY中使用from_iterable/range进行subscribe_on的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在努力让我在python的反应式扩展中进行调度.我想使用subscribe_on并行处理多个可观察对象.如果使用just创建了可观察对象,则效果很好,但是如果使用了rangefrom_,则不能.

I'm trying to get my head around scheduling in reactive extensions for python. I would like to use subscribe_on to process multiple observables in parallel. This works fine if the observable is created with just, but not if for example range or from_ are used.

just默认为Scheduler.immediate,而其他生成器默认为Scheduler.current_thread.造成差异的原因,但对我而言却前后矛盾.可能是因为我没有完全解决问题.
考虑以下示例:

just defaults to Scheduler.immediate, while other generators default to Scheduler.current_thread. Which causes the difference, but feels inconsistent to me. Probably because I don't grasp the full problem.
Consider the following example:

import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading


def work(x):
    print "processing %s on thread %s" % (x, threading.currentThread().name)
    time.sleep(1)


def finish(x):
    print "finished %s on thread %s" % (x, threading.currentThread().name)


# Creates new thread (I like)
rx.Observable.just(3)\
    .do_action(work)\
    .subscribe_on(Scheduler.new_thread)\
    .subscribe(finish)

# Runs on MainThread (I don't like)
rx.Observable.range(1, 3) \
    .do_action(work) \
    .subscribe_on(Scheduler.new_thread) \
    .subscribe(finish)

它可与observe_on一起使用,或者将Scheduler直接传递给生成器,但是我想将可观察的创建与处理分离,并实现以下目标:

It works with observe_on or if the Scheduler is passed directly to the generator, but I would like to decouple observable creation from processing and achieve something like this:

import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading


def work(x):
    print "processing %s on thread %s" % (x, threading.currentThread().name)
    time.sleep(1)


def finish(x):
    print "finished %s on thread %s" % (x, threading.currentThread().name)


def factory_single():
    return rx.Observable.just(1).do_action(work)


def factory_multiple():
    return rx.Observable.range(2, 4).do_action(work)


def process(factory):
    factory().subscribe_on(Scheduler.new_thread).subscribe(finish)

# Creates a new thread (I like)
process(factory_single)

# Runs on MainThread (I don't like)
process(factory_multiple)

我误会了subscribe_on吗?我的方法不对吗?

Am I misunderstanding subscribe_on? Is my approach wrong?

推荐答案

示例中的三个操作可以独立安排:

There are three actions in your example that can be scheduled independently:

  1. 数据馈送操作.默认情况下,justrange使用不同的调度程序,但是它们之间没有太大区别.两者都在当前线程上输入初始值.通过将其作为参数传递给这些方法,可以将其默认调度程序覆盖为所需的任何内容.

  1. Data feed action. just and range use different schedulers by default, but there is not much difference between them. Both feed in initial values on the current thread. You can override their default schedulers to whatever you wish by passing it as a parameter to these methods.

订阅操作.默认情况下使用Scheduler.current_thread. IE.它与数据馈送操作在同一线程上执行.可以被subscribe_on方法覆盖.

Subscribe action. Uses Scheduler.current_thread by default. I.e. it is executed on the same thread as data feed action. Can be overridden by subscribe_on method.

观察(on_nexton_erroron_completed)操作.默认情况下使用Scheduler.current_thread. IE.它在与订阅操作相同的线程上执行.可以通过observe_on方法覆盖.

Observe (on_next, on_error, on_completed) action. Uses Scheduler.current_thread by default. I.e. it is executed on the same thread as subscribe action. Can be overridden by observe_on method.

如果您仅对其中一项操作覆盖了调度程序,则其他操作将如上所述执行.

In case you override scheduler for only one of these actions the others shall follow as described above.

关于调度程序

Scheduler.immediate并没有真正安排任何时间.它立即在预定的同一线程上调用操作.

Scheduler.immediate does not really schedule anything. It invokes action immediately on the same thread where it was scheduled.

Scheduler.current_thread通过对操作进行排队来避免递归,但仍在调度了该线程的同一线程上调用操作.

Scheduler.current_thread avoids recursion by queuing actions but still invokes action on the same thread where it was scheduled.

Scheduler.new_thread启动单个后台线程,一个接一个地执行操作.

Scheduler.new_thread launches single background thread to execute actions one after another.

Scheduler.timeout为需要执行的每个操作启动新的后台线程.

Scheduler.timeout launches new background thread for every action it needs to execute.

尝试并行处理

在不同线程中安排工作的最合适方法似乎是observe_on.

The most appropriate method for scheduling work in different threads seems to be observe_on.

问题是,RxPy中目前没有thread_pool调度程序. new_thread调度程序仅启动一个线程,因此对您没有太大帮助.

The problem though is that there is no thread_pool scheduler in RxPy right now. new_thread scheduler launches just one thread so it will not help you much.

timeout调度程序可用于并行执行,但无法控制并发线程数,因此,并发任务数的爆炸性增长会导致内存溢出并有效地使系统崩溃.

timeout scheduler can be used to go parallel but it offers no control on the number of concurrent threads, therefore explosive growth in the number of concurrent tasks can overflow memory and effectively crash your system.

observe_on中没有错误

我尝试使用observe_on(Scheduler.timeout)运行您的示例,但是任务仍然没有并行进行.在查看RxPy源代码后,我发现它仅在当前事件完成后才调度下一个事件,从而有效地禁用了并行处理.我的第一反应是报告observe_on实现中的错误.

I tried running your example with observe_on(Scheduler.timeout) but the tasks still did not go in parallel. After looking into the RxPy source I have discovered that it schedules next event only after current event completes, which effectively disables parallel processing. My first reaction was to report a bug in observe_on implementation.

但是在进一步调查后,我发现串行执行不是错误,而是预期的行为.

But after further investigation I've found that serial execution is not a bug but rather intended behavior.

并行执行任务的正确方法

这是有效的代码(基于此答案):

Here is the code that works (based on this answer):

Observable.range(1, 3) \
  .select_many(lambda i: Observable.start(lambda: work(i), scheduler=Scheduler.timeout)) \
  .observe_on(Scheduler.event_loop) \
  .subscribe(finish)

Observable.start创建异步可观察的对象,该对象通过Scheduler.timeout在单独的线程上调度.

Observable.start creates asynchronous observable that is scheduled on a separate thread via Scheduler.timeout.

observe_on(Scheduler.event_loop)是可选的.强制finish方法在同一线程上调用所有项.

observe_on(Scheduler.event_loop) is optional. It forces finish method for all items to be called on the same thread.

请注意,不能保证以range的初始顺序调用finish方法.

Please note that there is no guarantee that finish method is called in initial range order.

这篇关于在RxPY中使用from_iterable/range进行subscribe_on的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆