“退订"Observable“执行器"中的函数回调/挂钩;功能 [英] "Unsubscribe" function callback/hook in Observable "executor" function
问题描述
我对处置"或取消订阅"函数的目的感到困惑,这是(可选)从可观察的执行程序"函数返回的,如下所示:
I am confused about what the purpose of the "dispose" or "unsubscribe" function is for, which is (optionally) returned from an observable "executor" function, like so:
const Rx = require('rxjs');
const obs = Rx.Observable.create(obs => {
// we are in the Observable "executor" function
obs.next(4);
// we return this function, which gets called if we unsubscribe
return function () {
console.log('disposed');
}
});
const s1 = obs.subscribe(
function (v) {
console.log(v);
},
function (e) {
console.log(e);
},
function () {
console.log('complete');
}
);
const s2 = obs.subscribe(
function (v) {
console.log(v);
},
function (e) {
console.log(e);
},
function () {
console.log('complete');
}
);
s1.unsubscribe();
s2.unsubscribe();
令我困惑的是,这样的函数实际上更有可能保留代码中的引用,从而防止垃圾收集.
What confuses me is that such a function would actually be more likely to hold on to references in your code and therefore prevent garbage collection.
谁能告诉我在那种情况下返回一个函数的目的是什么,这个函数被调用了什么,它的签名是什么?我无法找出有关它的信息.
Can anyone tell me what the purpose is of returning a function in that scenario, what the function is called, and what it's signature is? I am having trouble figuring out information about it.
我还看到了从 executor 函数返回订阅的更复杂的示例,例如:
I also see much more complex examples of returning a subscription from the executor function, for example this:
let index = 0;
let obsEnqueue = this.obsEnqueue = new Rx.Subject();
this.queueStream = Rx.Observable.create(obs => {
const push = Rx.Subscriber.create(v => {
if ((index % obsEnqueue.observers.length) === obsEnqueue.observers.indexOf(push)) {
obs.next(v);
}
});
return obsEnqueue.subscribe(push);
});
这似乎返回了一个订阅,而不仅仅是一个简单的函数.谁能解释一下这是怎么回事?
This seems to return a subscription instead of just a plain function. Can anyone explain what's going on with this?
为了明确问题,这样做有什么区别:
To make it a clear question, what is the difference between doing this:
const sub = new Rx.Subject();
const obs = Rx.Observable.create($obs => {
$obs.next(4);
return sub.subscribe($obs);
});
并且不返回订阅调用的结果:
and not returning the result of the subscribe call:
const sub = new Rx.Subject();
const obs = Rx.Observable.create($obs => {
$obs.next(4);
sub.subscribe($obs);
});
推荐答案
Rx.Observable.create
需要返回的unsubscribe
函数在下游不监听时调用流,有效地让您有时间清理资源.
The unsubscribe
function that Rx.Observable.create
needs to return is invoked when downstream does not listen to the stream anymore, effectively giving you time to clean up resources.
关于您的问题;.subscribe()
返回您可以调用 .unsubscribe()
的订阅.因此,如果您想对其他订阅执行某些操作,您可以将该订阅通过管道传输到您的下游:
In regards to your question; .subscribe()
returns the subscription on which you can call .unsubscribe()
. So if you want to do something with an other subscription you can pipe through that subscription to your downstream:
const obs = Rx.Observable.create($obs => {
const timer = Rx.Observable.interval(300)
.do(i => console.log('emission: ' + i))
return timer.subscribe($obs);
});
obs.take(4).subscribe(i => console.log('outer-emission:'+i))
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.2/Rx.js"></script>
如果没有取消订阅功能,您将停止监听 observable,但内部创建的间隔会继续运行:
Without the unsubscribe function you would stop listening to the observable but the interval created internally would keep on running:
const obs = Rx.Observable.create($obs => {
const timer = Rx.Observable.interval(300)
.do(i => console.log('emission: ' + i))
.take(10)
.subscribe(
val => $obs.next(val),
err => $obs.error(err),
() => $obs.complete()
);
return function(){} // empty unsubscribe function, internal subscription will keep on running
});
obs.take(4).subscribe(i => console.log('outer-emission:'+i))
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.2/Rx.js"></script>
这篇关于“退订"Observable“执行器"中的函数回调/挂钩;功能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!