具有流大小而不是流数的缓冲区反应扩展 C# [英] buffer with stream size instead of number of streams reactive extension C#
本文介绍了具有流大小而不是流数的缓冲区反应扩展 C#的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
- 我有一个
Producer()
将数据推送到blocking
集合. - 在
Consumer()
中,我使用System.Reactive
(4.1.2). - 我正在使用
Buffer
,但只能缓冲多个流.
- I have a
Producer()
which push data to ablocking
collection. - In
Consumer()
, I subscribed to theblocking
collection asObservable
, usingSystem.Reactive
(4.1.2). - I'm using
Buffer
, but only able to buffer on numbers of streams.
问题 - 我可以将 buffer
运算符用于流的大小而不是流的数量吗?
Question - Can I use buffer
operator with size of streams rather than number of streams?
当缓冲区大小超过(例如 1024 KB 或 1 MB)时,创建新缓冲区?
class Program
{
private static readonly BlockingCollection<Message> MessagesBlockingCollection = new BlockingCollection<Message>();
private static void Producer()
{
int ctr = 1;
while (ctr <= 11)
{
MessagesBlockingCollection.Add(new Message { Id = ctr, Name = $"Name-{ctr}" });
Thread.Sleep(1000);
ctr++;
}
}
private static void Consumer()
{
var observable = MessagesBlockingCollection.GetConsumingEnumerable().ToObservable();
var bufferedNumberStream = observable.BufferWithThrottle(TimeSpan.FromSeconds(60), 5)
.Subscribe(ts =>
{
WriteToFile(ts.ToList());
});
}
private static void WriteToFile(List<Message> listToWrite)
{
using (StreamWriter outFile = System.IO.File.CreateText(Path.Combine(@"C:\TEMP", $"{DateTime.Now.ToString("yyyyMMddHHmmssfff")}.json")))
{
outFile.Write(JsonConvert.SerializeObject(listToWrite));
}
}
static void Main(string[] args)
{
var producer = Task.Factory.StartNew(() => Producer());
var consumer = Task.Factory.StartNew(() => Consumer());
Console.Read();
}
}
可观察的扩展方法,
public static IObservable<IList<TSource>> BufferWithThrottle<TSource>(this IObservable<TSource> source,
TimeSpan threshold, int noOfStream)
{
return Observable.Create<IList<TSource>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge(g.Buffer(noOfStream).Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}
推荐答案
很高兴看到使用中的扩展方法 :)
Glad to see the extension method in use :)
您可以稍微修改它以使其Scan
Message
大小的运行计数.这样做我们就失去了类型泛型.
You can modify it slightly to have it Scan
the running count of Message
sizes. By doing so we lose type generics.
public class Message
{
public string Payload { get; set; }
public int Size { get; set; }
}
public static IObservable<IList<Message>> BufferWithThrottle(this IObservable<Message> source,
TimeSpan threshold, int maxSize)
{
return Observable.Create<IList<Message>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge(g.Select( i => i.Size)
.Scan(0, (a, b) => a + b)
.Where(a => a >= maxSize)
.Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}
这篇关于具有流大小而不是流数的缓冲区反应扩展 C#的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文