具有流大小而不是流数的缓冲区反应扩展 C# [英] buffer with stream size instead of number of streams reactive extension C#

查看:24
本文介绍了具有流大小而不是流数的缓冲区反应扩展 C#的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

  1. 我有一个 Producer() 将数据推送到 blocking 集合.
  2. Consumer() 中,我使用 System.Reactive (4.1.2).
  3. 我正在使用 Buffer,但只能缓冲多个流.
  1. I have a Producer() which push data to a blocking collection.
  2. In Consumer(), I subscribed to the blocking collection as Observable, using System.Reactive (4.1.2).
  3. I'm using Buffer, but only able to buffer on numbers of streams.

问题 - 我可以将 buffer 运算符用于流的大小而不是流的数量吗?

Question - Can I use buffer operator with size of streams rather than number of streams?

当缓冲区大小超过(例如 1024 KB 或 1 MB)时,创建新缓冲区?

    class Program
    {
        private static readonly BlockingCollection<Message> MessagesBlockingCollection = new BlockingCollection<Message>();

    private static void Producer()
    {
        int ctr = 1;
        while (ctr <= 11)
        {
            MessagesBlockingCollection.Add(new Message { Id = ctr, Name = $"Name-{ctr}" });
            Thread.Sleep(1000);
            ctr++;
        }
    }

    private static void Consumer()
    {
        var observable = MessagesBlockingCollection.GetConsumingEnumerable().ToObservable();

        var bufferedNumberStream = observable.BufferWithThrottle(TimeSpan.FromSeconds(60), 5)
                                    .Subscribe(ts =>
                                    {
                                        WriteToFile(ts.ToList());
                                    });
    }

    private static void WriteToFile(List<Message> listToWrite)
    {
        using (StreamWriter outFile = System.IO.File.CreateText(Path.Combine(@"C:\TEMP", $"{DateTime.Now.ToString("yyyyMMddHHmmssfff")}.json")))
        {
            outFile.Write(JsonConvert.SerializeObject(listToWrite));
        }
    }

    static void Main(string[] args)
    {
        var producer = Task.Factory.StartNew(() => Producer());
        var consumer = Task.Factory.StartNew(() => Consumer());
        Console.Read();
     }
    }

可观察的扩展方法,

public static IObservable<IList<TSource>> BufferWithThrottle<TSource>(this IObservable<TSource> source,
                                                                            TimeSpan threshold, int noOfStream)
    {
        return Observable.Create<IList<TSource>>((obs) =>
        {
            return source.GroupByUntil(_ => true,
                                       g => g.Throttle(threshold).Select(_ => Unit.Default)
                                             .Merge(g.Buffer(noOfStream).Select(_ => Unit.Default)))
                         .SelectMany(i => i.ToList())
                         .Subscribe(obs);
        });
    }

推荐答案

很高兴看到使用中的扩展方法 :)

Glad to see the extension method in use :)

您可以稍微修改它以使其ScanMessage 大小的运行计数.这样做我们就失去了类型泛型.

You can modify it slightly to have it Scan the running count of Message sizes. By doing so we lose type generics.

public class Message
{
    public string Payload { get; set; }
    public int Size { get; set; }
}

public static IObservable<IList<Message>> BufferWithThrottle(this IObservable<Message> source,
                                                     TimeSpan threshold, int maxSize)
{
    return Observable.Create<IList<Message>>((obs) =>
    {
        return source.GroupByUntil(_ => true,
                                   g => g.Throttle(threshold).Select(_ => Unit.Default)
                                         .Merge(g.Select( i => i.Size)
                                                 .Scan(0, (a, b) => a + b)
                                                 .Where(a => a >= maxSize)
                                                 .Select(_ => Unit.Default)))
                     .SelectMany(i => i.ToList())
                     .Subscribe(obs);
    });
}

这篇关于具有流大小而不是流数的缓冲区反应扩展 C#的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆