可以观察到对不同线程上更改的阻塞收集没有反应 [英] Observable not reacting to blocking collection changed on different thread

查看:68
本文介绍了可以观察到对不同线程上更改的阻塞收集没有反应的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下代码:

class Program
{
    static void Main(string[] args)
    {
        var watcher = new SNotifier(DumpToConsole);
        watcher.StartQueue();

        Console.ReadLine();
    }

    private static void DumpToConsole(IList<Timestamped<int>> currentCol)
    {
        Console.WriteLine("buffer time elapsed, current collection contents is: {0} items.", currentCol.Count);
        Console.WriteLine("holder has: {0}", currentCol.Count);
    }
}

SNotifier:

the SNotifier:

public class SNotifier
{
    private BlockingCollection<int> _holderQueue;
    private readonly Action<IList<Timestamped<int>>> _dumpAction;

    public SNotifier(Action<IList<Timestamped<int>>> dumpAction)
    {
        PopulateListWithStartValues();
        _dumpAction = dumpAction;
    }

    public void StartQueue()
    {
        PopulateQueueOnDiffThread();

        var observableCollection = _holderQueue.ToObservable();

        var myCollectionTimestamped = observableCollection.Timestamp();
        var bufferedTimestampedCollection = myCollectionTimestamped.Buffer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(3));

        using (bufferedTimestampedCollection.Subscribe(_dumpAction))
        {
            Console.WriteLine("started observing collection");
        }
    }

    private void PopulateQueueOnDiffThread()
    {
        Action addToCollectionAction = AddToCollection;
        var t = new TaskFactory();
        t.StartNew(addToCollectionAction);

    }

    private static IEnumerable<int> GetInitialElements()
    {
        var random = new Random();
        var items = new List<int>();
        for (int i = 0; i < 10; i++)
            items.Add(random.Next(1, 10));

        return items;
    }

    private void AddToCollection()
    {
        while (true)
        {
            var newElement = new Random().Next(1, 10);
            _holderQueue.Add(newElement);
            Console.WriteLine("added {0}", newElement);
            Console.WriteLine("holder has: {0}", _holderQueue.Count);
            Thread.Sleep(1000);
        }
    }

    private void PopulateListWithStartValues()
    {
        _holderQueue = new BlockingCollection<int>();
        var elements = GetInitialElements();
        foreach (var i in elements)
            _holderQueue.Add(i);
    }
}

我需要运行DumpToConsole()方法以每 3秒显示集合计数,而此集合的内容在另一个线程上已更改.我的问题是DumpToConsole()仅被调用一次.这是为什么?!我已经花了整整一天的时间.由于我已经使用我的dump方法订阅了observable,因此它应该观察"集合的更改并每3秒调用一次DumpToConsole()方法.那就是我所需要的.

I need to run the DumpToConsole() method to show the collection count every 3 seconds, while this collection is having its contents changed on another thread. My problem is that DumpToConsole() is only called once. Why is that?! Ive spent already the entire day on this. Since I've subscribed with my dump method to the observable, it should "observe" the collection changes and re-call the DumpToConsole() method every 3 seconds; thats what I need.

想法?谢谢

(PS传递给SNotifier类的动作是我删除SNotifier中与控制台相关的内容的方式,我需要对其进行更好的重构,由于与问题本身无关,因此可以忽略不计)

(P.S. the action passed to the SNotifier class is my way of removing console related stuff in the SNotifier, i'll need to refactor that better, it can be ignored as it has nothing to do with the problem itself)

推荐答案

您正在BlockingCollection<int>上调用ToObservable().此扩展方法仅采用集合上的IEnumerable<int>接口并将其转换为IObservable<int>.这样的效果是获取订阅时集合的内容列表,并通过Observable流将其转储出去.

You are calling ToObservable() on your BlockingCollection<int>. This extension method simply takes the IEnumerable<int> interface on the collection and converts it to an IObservable<int>. This has the effect of getting a list of the contents of the collection at the point of subscription and dumping them out via an Observable stream.

添加项目后,它将不会继续枚举项目.

It will not continue to enumerate items as they are added.

在其中使用 GetConsumingEnumerable() ToObservable()的开头将解决此问题.

Use GetConsumingEnumerable() in front of ToObservable() would address this.

但是,需要谨慎,因为这也会从集合中删除项目,这可能是不希望的.

However, caution is required as this will also remove items from the collection though, which might not be desirable.

如果可以接受,则在有多个订阅者的情况下,您可能希望发布可观察到的结果,以免造成严重破坏.

If this is acceptable, you may want to publish the resulting observable in the case of multiple subscribers to avoid wreaking havoc.

如果您只是添加内容,则可以考虑扭转整个局面-使用Subject返回"Add"方法,并让一个订阅者填写一个List(如果需要,则填写BlockingCollection)以跟踪集合,然后第二位订阅者可以报告进度.

If you are just adding, you could consider turning the whole thing around - use a Subject to back an "Add" method and have one subscriber fill a List (or BlockingCollection if you need that) to track the collection, and a second subscriber can then report on progress.

另一种方法是使用 ObservableCollection 并订阅其事件.

Another approach would be to use an ObservableCollection and subscribe to its events.

在最后两个建议中,由于Subject<T>ObservableCollection<T>本身都不是线程安全的,因此您需要使添加"成为线程安全的.

In the last two suggestions, you would need to make your "Add" thread-safe since neither Subject<T> nor ObservableCollection<T> are thread-safe themselves.

布兰登关于您要在StartQueue中处置订阅的评论使我意识到另一个问题-StartQueue将永远不会返回!这是因为在IEnumerableToObservable()转换中对Subscribe的调用在枚举完成之前不会返回-因此也保留了处置(因为IDisposableSubscribe的返回值) ),这就是为什么我没有注意到using @Brandon指出的原因!

Brandon's comment that you are disposing the subscription in StartQueue made me realise another problem - StartQueue will never return! This is because a call to Subscribe made on a ToObservable() conversion of an IEnumerable will not return until the enumeration has completed - it also therefore holds up disposal (since the IDisposable is the return value of Subscribe) which is why I didn't notice the using @Brandon pointed out either!

根据以上两点,您需要进行以下附加更改.首先,删除订阅周围的using语句,隐式处置将取消订阅.当我们解决阻塞的Subscribe呼叫时,这将导致立即取消订阅.如果确实需要在某个时候显式退订,则应保留IDisposable句柄.

With the above two points, you need to make the following additional changes. First, remove the using statement around the subscription, the implicit disposal would cancel the subscription. When we solve the blocking Subscribe call this would then cause the subscription to be immediately cancelled. You should preserve the IDisposable handle if you do need to explicitly unsubscribe at some point.

第二,在ToObservable()之后立即添加对SubscribeOn(Scheduler.Default)的呼叫,以防止Subscribe呼叫阻塞.

Second, add a call to SubscribeOn(Scheduler.Default) immediately after the ToObservable() to prevent the Subscribe call blocking.

这篇关于可以观察到对不同线程上更改的阻塞收集没有反应的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆