控制线程什么的接收订阅SubscribedOn已经使用后,设置在 [英] Controlling what thread an Rx subscription is Disposed on after SubscribedOn has been used

查看:313
本文介绍了控制线程什么的接收订阅SubscribedOn已经使用后,设置在的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个接收订阅,我 SubscribeOn 用不同的线程来防止其阻止。不过,我想该预订的处置阻止由于资源管理问题。我一直无法弄清楚如何到一个控制台应用程序或WinForms应用程序(我有两个用例)的范围内做到这一点。下面是工作减少的情况下代码,模仿我在做什么:

I have a Rx subscription that I SubscribeOn with a different thread to prevent it from blocking. However, I want the disposal of that subscription to block due to resource management issues. I have not been able to figure out how to accomplish this within the context of either a console app or a winforms app (I have both use cases). Below is working code of a reduced case that simulates what I am doing:

internal class Program
{

    private static void Log(string msg)
    {
        Console.WriteLine("[{0}] " + msg, Thread.CurrentThread.ManagedThreadId.ToString());
    }

    private static void Main(string[] args)
    {

        var foo = Observable.Create<long>(obs =>
            {
                Log("Subscribing starting.. this will take a few seconds..");
                Thread.Sleep(TimeSpan.FromSeconds(2));
                var sub =
                    Observable.Interval(TimeSpan.FromSeconds(1))
                              .Do(_ => Log("I am polling..."))
                              .Subscribe(obs);
                return Disposable.Create(() =>
                    {
                        Thread.Sleep(TimeSpan.FromSeconds(3));
                        sub.Dispose();
                        Log("Disposing is really done now!");
                    });
            });

        Log("I am subscribing..");
        var disp = foo.SubscribeOn(NewThreadScheduler.Default).Subscribe(i => Log("Processing " + i.ToString()));
        Log("I have returned from subscribing...");

        // SC.Current is null in a ConsoleApp :/  Can I get a SC that uses my current thread?
        //var dispSynced = new ContextDisposable(SynchronizationContext.Current, disp);
        Thread.Sleep(TimeSpan.FromSeconds(5));
        Log("I'm going to dispose...");
        //dispSynced.Dispose();
        disp.Dispose();
        Log("Disposed has returned...");
        Console.ReadKey();
    }
}

在上面是跑了,我得到:

When the above is ran I get:

[10] I am subscribing..
[10] I have returned from subscribing...
[11] Subscribing starting.. this will take a few seconds..
[6] I am polling...
[6] Processing 0
[6] I am polling...
[6] Processing 1
[10] I'm going to dispose...
[10] Disposed has returned...
[13] I am polling...
[6] I am polling...
[13] I am polling...
[14] Disposing is really done now!



因此,所有我想要做的是有 [10]处置返回... 将在最后一行印花指示处置呼叫阻塞。

So, all I'm trying to do is to have [10] Disposed has returned... be the last line printed indicating that the Dispose call is blocking.

ContextDisposable 的接收附带似乎非常适合我的使用情况,但我不知道怎么去的SynchronizationContext 代表我的当前线程。有没有一种方法,我可以使用 ContextDisposable 做什么,我想还是我需要一个完全不同的做法?

The ContextDisposable that Rx ships with seems ideal for my use case but I don't know how to get a SynchronizationContext representing my current thread. Is there a way I can use ContextDisposable to do what I want or do I need a totally different approach?

推荐答案

如果你看一下SubscribeOn的源代码它看起来像处置功能将在指定的调度安排。尝试是这样的:

If you look at the source code for SubscribeOn it looks like the dispose function will be scheduled on the specified scheduler. Try something like this:

private static IObservable<long> GetObservable(IScheduler scheduler)
{
  return Observable.Create<long>(obs =>
  {
    var disposables = new CompositeDisposable();

    disposables.Add(
      Disposable.Create(() =>
      {
        Thread.Sleep(TimeSpan.FromSeconds(3));
        Log("Disposing is really done now!");
      }));

    disposables.Add(
      scheduler.Schedule(() =>
      {
        Log("Subscribing starting.. this will take a few seconds..");
        Thread.Sleep(TimeSpan.FromSeconds(2));
        disposables.Add(
          Observable.Interval(TimeSpan.FromSeconds(1)).Do(_ => Log("I am polling...")).Subscribe(obs));
      }));

    return disposables;
  });


private static void Main(string[] args)
{
  var foo = GetObservable(NewThreadScheduler.Default);

  Log("I am subscribing..");
  var disp = foo.Subscribe(i => Log("Processing " + i.ToString()));
  Log("I have returned from subscribing...");

  // SC.Current is null in a ConsoleApp :/  Can I get a SC that uses my current thread?
  //var dispSynced = new ContextDisposable(SynchronizationContext.Current, disp);
  Thread.Sleep(TimeSpan.FromSeconds(5));
  Log("I'm going to dispose...");
  //dispSynced.Dispose();
  disp.Dispose();
  Log("Disposed has returned...");
  Console.ReadKey();
}

这篇关于控制线程什么的接收订阅SubscribedOn已经使用后,设置在的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆