取消流onData [英] Cancel stream onData

查看:75
本文介绍了取消流onData的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个事件总线,可以处理我应用程序中的所有中央事件。我有一个特殊情况,即我有一系列异步动作只想执行一次(发生特殊事件时),所以我必须启动一个异步函数,使其他函数失控,这些事件将触发一个事件。我的第二个动作,依此类推。

I have an event bus that handles all central events to my app. I have a special case where I have a sequence of asynchronous actions that I want to execute once only (when a special event happens), so I have to launch async function one, go through out of control other functions that will trigger an event for my second action, and so on.

所以我需要启动动作一,然后听事件总线等待动作一触发(不直接地)触发将启动动作二的事件,依此类推...

So I need to launch action one, then listen to the event bus waiting for action one to trigger (undirectly) an event that will launch action two etc...

自然地,一旦执行了序列中的每个元素,我就想停止监听触发它的事件。

Naturally, once each element of the sequence is executed, I want to stop listening to the event that triggered it.

我为此设想了一个ConsumerOnce(event,action)函数,该函数将订阅总线,等待预期的事件,在接收到事件时执行该动作,并在动作启动后立即取消订阅(异步)

I imagined for that, a consumeOnce(event, action) function that will subscribe to the bus, wait for the expected event, execute the action when receiving the event and immediately cancel the subscription once the action launched (asynchronously)

  final StreamController<Map<PlaceParam, dynamic>> _controller =
  new StreamController<Map<PlaceParam, dynamic>>.broadcast();

  void consumeOnce(PlaceParam param, Function executeOnce) {
    StreamSubscription subscription = _controller.stream.listen((Map<PlaceParam, dynamic> params) {
      if(params.containsKey(param)) {
        executeOnce();
        subscription.cancel(); //can't access, too early: not created yet
      }
    });
  }

问题是我无法访问变量 subscription 在我的回调主体中,因为当时尚未创建

The issue is that I can't access the variable subscription in the body of my callback, since it is still not created at the time

由于监听者无法按照其订阅顺序执行任何担保,因此我无法注册其他订阅者将删除我的订阅(即使执行顺序得到保证,无论如何我都会找到我自己无法删除的订阅:负责删除原始订阅的订阅者)...

Since nothing garanties that listeners will execute in their subscription order, I cannot register another subscriber who will remove my subscription (and even if the execution order was guaranteed, I will anyway find my self with a subscription that I can't remove : the one responsible for removing my original subscription)...

请问有什么想法吗?

这种模式可以解决我的问题,但是我觉得它并不优雅:

This pattern could solve my problem, but I don't find it elegant:

@Injectable()
class EventBus<K, V> {
  final StreamController<Map<PlaceParam, dynamic>> _controller =
  new StreamController<Map<PlaceParam, dynamic>>.broadcast();

  Future<Null> fire(Map<PlaceParam, dynamic> params) async {
    await _controller.add(params);
  }

  Stream<Map<PlaceParam, dynamic>> getBus() {
    return _controller.stream;
  }


  void consumeOnce(PlaceParam param, Function executeOnce) {
    SubscriptionRemover remover = new SubscriptionRemover(param, executeOnce);
    StreamSubscription subscription = _controller.stream.listen(remover.executeOnce);
    remover.subscription = subscription;
  }
}

class SubscriptionRemover {
  PlaceParam param;
  Function executeOnce;
  StreamSubscription subscription;

  SubscriptionRemover(this.param, this.executeOnce);

  void execute(Map<PlaceParam, dynamic> params) {
    if (params.containsKey(param)) {
      executeOnce();
      subscription.cancel();
    }
  }
}

但我不喜欢从理论上讲,该事件很可能在两个调用之间发生:

But I don't like it much since theoretically, the event could happen between the two calls :

    StreamSubscription subscription = _controller.stream.listen(remover.executeOnce); //event may occur now!!!
    remover.subscription = subscription;

我认为存在一种方法:_controller.stream.remove(Function fn)应该是

I think the existence of a method: _controller.stream.remove(Function fn) would have been a lot more direct and clear.

对吗?还是有我没想到的方法?

Am I right? or is there a way I didn't think of?

推荐答案




问题在于我无法在回调的主体中访问变量订阅,因为当时尚未创建变量订阅

The issue is that I can't access the variable subscription in the body of my callback, since it is still not created at the time

是的-您无法访问订阅 变量,即使您知道订阅​​本身将存在。 Dart不允许变量声明引用自己。当变量仅在尚未执行的闭包中引用时,这有时会很烦人。

That's correct - you cannot access is the subscription variable, even if you know that the subscription itself would exist. Dart doesn't allow variables declarations to refer to themselves. That's occasionally annoying when the variable it's only referenced inside a closure that won't be executed yet.

解决方案是预先声明变量或更新 onData -listener收听之后:

The solution is to either pre-declare the variable or to update the onData-listener after doing the listening:

// Pre-declare variable (can't be final, though).
StreamSubscription<Map<PlaceParam, dynamic>> subscription;
subscription = stream.listen((event) {
  .... subscription.cancel();
});

final subscription = stream.listen(null);
subscription.onData((event) {  // Update onData after listening.
  .... subscription.cancel(); ....
});

在某些情况下,您仍无法访问订阅 object ,但这只有在流违反 Stream 合同并在收听时立即开始发送事件的情况下才有可能。流不能这样做,它们必须等到以后的微任务才能传递第一个事件,这样调用 listen 的代码就有时间了,例如,接收订阅并将其分配给变量。使用同步流控制器可能会违反合同(这是为什么应谨慎使用同步流控制器的原因之一)。

There can be cases where you can't access the subscription object yet, but that's only possible if the stream breaks the Stream contract and starts sending events immediately when it's listened to. Streams must not do that, they must wait until a later microtask before delivering the first event, so that the code that called listen has time to, say, receive the subscription and assign it to a variable. It's possible to violate the contract using a synchronous stream controller (which is one reason why synchronous stream controllers should be used judiciously).

这篇关于取消流onData的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆