如何使用rxpy/rxjs延迟事件发出? [英] How to delay event emission with rxpy/rxjs?
问题描述
我有两个事件流.一个来自电感环路,另一个来自IP摄像机.汽车将驶过环路,然后撞到相机.如果事件在彼此之间的N毫秒之内(车将始终首先进入循环),我想将它们组合在一起,但是我也希望每个流中的不匹配事件(任何硬件都可能发生故障)都合并到一个流中.像这样:
I've got two event streams. One is from an inductance loop, the other is an IP camera. Cars will drive over the loop and then hit the camera. I want to combine them if the events are within N milliseconds of each other (car will always hit the loop first), but I also want the unmatched events from each stream (either hardware can fail) all merged into a single stream. Something like this:
---> (only unmatched a's, None)
/ \
stream_a (loop) \
\ \
--> (a, b) ---------------------------> (Maybe a, Maybe b)
/ /
stream_b (camera) /
\ /
--> (None, only unmatched b's)
现在,我可以通过做一个好的ole Subject反模式来破解自己的路:
Now certainly I can hack my way around by doing the good ole Subject anti-pattern:
unmatched_a = Subject()
def noop():
pass
pending_as = [[]]
def handle_unmatched(a):
if a in pending_as[0]:
pending_as[0].remove(a)
print("unmatched a!")
unmatched_a.on_next((a, None))
def handle_a(a):
pending_as[0].append(a)
t = threading.Timer(some_timeout, handle_unmatched)
t.start()
return a
def handle_b(b):
if len(pending_as[0]):
a = pending_as[0].pop(0)
return (a, b)
else:
print("unmatched b!")
return (None, b)
stream_a.map(handle_a).subscribe(noop)
stream_b.map(handle_b).merge(unmatched_a).subscribe(print)
这不仅很hacky,而且尽管我没有观察到它,但是我很确定当我使用threading.Timer
检查挂起的队列时有一个竞争条件.鉴于有太多的rx运算符,我很确定它们的某种组合将使您无需使用Subject
即可执行此操作,但我无法弄清楚.如何做到这一点?
Not only is this rather hacky, but although I've not observed it I'm pretty sure there's a race condition when I check the pending queue using threading.Timer
. Given the plethora of rx operators, I'm pretty sure some combination of them will let you do this without using Subject
, but I can't figure it out. How does one accomplish this?
尽管出于组织和运营方面的原因,我宁愿坚持使用Python,但我将使用JavaScript rxjs答案并移植它,甚至可能在节点中重写整个脚本.
Although for organizational and operational reasons I'd prefer to stick to Python, I'll take a JavaScript rxjs answer and either port it or even possibly rewrite the entire script in node.
推荐答案
您应该能够使用auditTime
和buffer
解决问题.像这样:
You should be able to solve the problem using auditTime
and buffer
. Like this:
function matchWithinTime(a$, b$, N) {
const merged$ = Rx.Observable.merge(a$, b$);
// Use auditTime to compose a closing notifier for the buffer.
const audited$ = merged$.auditTime(N);
// Buffer emissions within an audit and filter out empty buffers.
return merged$
.buffer(audited$)
.filter(x => x.length > 0);
}
const a$ = new Rx.Subject();
const b$ = new Rx.Subject();
matchWithinTime(a$, b$, 50).subscribe(x => console.log(JSON.stringify(x)));
setTimeout(() => a$.next("a"), 0);
setTimeout(() => b$.next("b"), 0);
setTimeout(() => a$.next("a"), 100);
setTimeout(() => b$.next("b"), 125);
setTimeout(() => a$.next("a"), 200);
setTimeout(() => b$.next("b"), 275);
setTimeout(() => a$.next("a"), 400);
setTimeout(() => b$.next("b"), 425);
setTimeout(() => a$.next("a"), 500);
setTimeout(() => b$.next("b"), 575);
setTimeout(() => b$.next("b"), 700);
setTimeout(() => b$.next("a"), 800);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
如果b
值可能紧跟在a
值之后,并且您不希望它们匹配,则可以使用更具体的审核,如下所示:
If it's possible for b
values to be closely followed by a
values and you do not want them to be matched, you could use a more specific audit, like this:
const audited$ = merged$.audit(x => x === "a" ?
// If an `a` was received, audit upcoming values for `N` milliseconds.
Rx.Observable.timer(N) :
// If a `b` was received, don't audit the upcoming values.
Rx.Observable.of(0, Rx.Scheduler.asap)
);
这篇关于如何使用rxpy/rxjs延迟事件发出?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!