在RxPY中使用from_iterable/range进行subscribe_on [英] subscribe_on with from_iterable/range in RxPY
问题描述
我正在努力让我在python的反应式扩展中进行调度.我想使用subscribe_on
并行处理多个可观察对象.如果使用just
创建了可观察对象,则效果很好,但是如果使用了range
或from_
,则不能.
I'm trying to get my head around scheduling in reactive extensions for python. I would like to use subscribe_on
to process multiple observables in parallel. This works fine if the observable is created with just
, but not if for example range
or from_
are used.
just
默认为Scheduler.immediate
,而其他生成器默认为Scheduler.current_thread
.造成差异的原因,但对我而言却前后矛盾.可能是因为我没有完全解决问题.
考虑以下示例:
just
defaults to Scheduler.immediate
, while other generators default to Scheduler.current_thread
. Which causes the difference, but feels inconsistent to me. Probably because I don't grasp the full problem.
Consider the following example:
import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading
def work(x):
print "processing %s on thread %s" % (x, threading.currentThread().name)
time.sleep(1)
def finish(x):
print "finished %s on thread %s" % (x, threading.currentThread().name)
# Creates new thread (I like)
rx.Observable.just(3)\
.do_action(work)\
.subscribe_on(Scheduler.new_thread)\
.subscribe(finish)
# Runs on MainThread (I don't like)
rx.Observable.range(1, 3) \
.do_action(work) \
.subscribe_on(Scheduler.new_thread) \
.subscribe(finish)
它可与observe_on
一起使用,或者将Scheduler直接传递给生成器,但是我想将可观察的创建与处理分离,并实现以下目标:
It works with observe_on
or if the Scheduler is passed directly to the generator, but I would like to decouple observable creation from processing and achieve something like this:
import rx
from rx.concurrency.scheduler import Scheduler
import time
import threading
def work(x):
print "processing %s on thread %s" % (x, threading.currentThread().name)
time.sleep(1)
def finish(x):
print "finished %s on thread %s" % (x, threading.currentThread().name)
def factory_single():
return rx.Observable.just(1).do_action(work)
def factory_multiple():
return rx.Observable.range(2, 4).do_action(work)
def process(factory):
factory().subscribe_on(Scheduler.new_thread).subscribe(finish)
# Creates a new thread (I like)
process(factory_single)
# Runs on MainThread (I don't like)
process(factory_multiple)
我误会了subscribe_on
吗?我的方法不对吗?
Am I misunderstanding subscribe_on
? Is my approach wrong?
推荐答案
示例中的三个操作可以独立安排:
There are three actions in your example that can be scheduled independently:
-
数据馈送操作.默认情况下,
just
和range
使用不同的调度程序,但是它们之间没有太大区别.两者都在当前线程上输入初始值.通过将其作为参数传递给这些方法,可以将其默认调度程序覆盖为所需的任何内容.
Data feed action.
just
andrange
use different schedulers by default, but there is not much difference between them. Both feed in initial values on the current thread. You can override their default schedulers to whatever you wish by passing it as a parameter to these methods.
订阅操作.默认情况下使用Scheduler.current_thread
. IE.它与数据馈送操作在同一线程上执行.可以被subscribe_on
方法覆盖.
Subscribe action. Uses Scheduler.current_thread
by default. I.e. it is executed on the same thread as data feed action. Can be overridden by subscribe_on
method.
观察(on_next
,on_error
,on_completed
)操作.默认情况下使用Scheduler.current_thread
. IE.它在与订阅操作相同的线程上执行.可以通过observe_on
方法覆盖.
Observe (on_next
, on_error
, on_completed
) action. Uses Scheduler.current_thread
by default. I.e. it is executed on the same thread as subscribe action. Can be overridden by observe_on
method.
如果您仅对其中一项操作覆盖了调度程序,则其他操作将如上所述执行.
In case you override scheduler for only one of these actions the others shall follow as described above.
关于调度程序
Scheduler.immediate
并没有真正安排任何时间.它立即在预定的同一线程上调用操作.
Scheduler.immediate
does not really schedule anything. It invokes action immediately on the same thread where it was scheduled.
Scheduler.current_thread
通过对操作进行排队来避免递归,但仍在调度了该线程的同一线程上调用操作.
Scheduler.current_thread
avoids recursion by queuing actions but still invokes action on the same thread where it was scheduled.
Scheduler.new_thread
启动单个后台线程,一个接一个地执行操作.
Scheduler.new_thread
launches single background thread to execute actions one after another.
Scheduler.timeout
为需要执行的每个操作启动新的后台线程.
Scheduler.timeout
launches new background thread for every action it needs to execute.
尝试并行处理
在不同线程中安排工作的最合适方法似乎是observe_on
.
The most appropriate method for scheduling work in different threads seems to be observe_on
.
问题是,RxPy中目前没有thread_pool
调度程序. new_thread
调度程序仅启动一个线程,因此对您没有太大帮助.
The problem though is that there is no thread_pool
scheduler in RxPy right now. new_thread
scheduler launches just one thread so it will not help you much.
timeout
调度程序可用于并行执行,但无法控制并发线程数,因此,并发任务数的爆炸性增长会导致内存溢出并有效地使系统崩溃.
timeout
scheduler can be used to go parallel but it offers no control on the number of concurrent threads, therefore explosive growth in the number of concurrent tasks can overflow memory and effectively crash your system.
observe_on中没有错误
我尝试使用observe_on(Scheduler.timeout)
运行您的示例,但是任务仍然没有并行进行.在查看RxPy源代码后,我发现它仅在当前事件完成后才调度下一个事件,从而有效地禁用了并行处理.我的第一反应是报告observe_on
实现中的错误.
I tried running your example with observe_on(Scheduler.timeout)
but the tasks still did not go in parallel. After looking into the RxPy source I have discovered that it schedules next event only after current event completes, which effectively disables parallel processing. My first reaction was to report a bug in observe_on
implementation.
但是在进一步调查后,我发现串行执行不是错误,而是预期的行为.
But after further investigation I've found that serial execution is not a bug but rather intended behavior.
并行执行任务的正确方法
这是有效的代码(基于此答案):
Here is the code that works (based on this answer):
Observable.range(1, 3) \
.select_many(lambda i: Observable.start(lambda: work(i), scheduler=Scheduler.timeout)) \
.observe_on(Scheduler.event_loop) \
.subscribe(finish)
Observable.start
创建异步可观察的对象,该对象通过Scheduler.timeout
在单独的线程上调度.
Observable.start
creates asynchronous observable that is scheduled on a separate thread via Scheduler.timeout
.
observe_on(Scheduler.event_loop)
是可选的.强制finish
方法在同一线程上调用所有项.
observe_on(Scheduler.event_loop)
is optional. It forces finish
method for all items to be called on the same thread.
请注意,不能保证以range
的初始顺序调用finish
方法.
Please note that there is no guarantee that finish
method is called in initial range
order.
这篇关于在RxPY中使用from_iterable/range进行subscribe_on的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!