一次消耗一次IEnumerable [英] Consuming an IEnumerable multiple times in one pass
问题描述
是否有可能编写一个使 IEnumerable
被多次使用但仅一次通过而又不将所有数据读入内存的高阶函数?[请参阅下面的编辑"以澄清我要寻找的内容.]
Is it possible to write a higher-order function that causes an IEnumerable
to be consumed multiple times but in only one pass and without reading all the data into memory? [See Edit below for a clarification of what I'm looking for.]
例如,在下面的代码中,可枚举的是 mynums
(我在上面标记了 .Trace()
,以查看我们枚举了多少次).确定目标是否有大于5的数字以及所有数字的总和.处理两次可枚举的函数是 Both_TwoPass
,但是将其枚举两次.相反, Both_NonStream
仅枚举一次,但是以将其读入内存为代价.原则上,可以像 Any5Sum
所示以单次传递和流式执行这两项任务,但这是特定的解决方案.是否可以编写与 Both _ *
具有相同签名的函数,但这是两全其美的方法?
For example, in the code below the enumerable is mynums
(onto which I've tagged a .Trace()
in order to see how many times we enumerate it). The goal is figure out if it has any numbers greater than 5, as well as the sum of all of the numbers. A function which processes an enumerable twice is Both_TwoPass
, but it enumerates it twice. In contrast Both_NonStream
only enumerates it once, but at the expense of reading it into memory. In principle it is possible carry out both of these tasks in a single pass and in a streaming fashion as shown by Any5Sum
, but that is specific solution. Is it possible to write a function with the same signature as Both_*
but that is the best of both worlds?
(在我看来,这应该可以使用线程.是否有更好的解决方案,例如使用 async
?)
(It seems to me that this should be possible using threads. Is there a better solution using, say, async
?)
以下是有关我要寻找的内容的说明.我所做的是在方括号中包括了每个属性的详尽描述.
Below is a clarification regarding what I'm looking for. What I've done is included a very down-to-earth description of each property in square brackets.
我正在寻找具有以下特征的函数 Both
:
I'm looking for a function Both
with the following characteristics:
- 它具有签名
(S1,S2)两者都是T,S1,S2(此IEnumerable T tt,Func
(并产生正确的"输出!)) - 它仅将第一个参数
tt
迭代一次.[我的意思是,当传递mynums
(如下定义)时,它仅输出mynums:0 1 2 ...
一次.这排除了功能Both_TwoPass
.] - 它以流方式处理第一个参数
tt
中的数据.[[我的意思是,例如,没有足够的内存来同时存储tt
中的所有项目,因此排除了功能Both_NonStream
.]
- It has signature
(S1, S2) Both<T, S1, S2>(this IEnumerable<T> tt, Func<IEnumerable<T>, S1>, Func<IEnumerable<T>, S2>)
(and produces the "right" output!) - It only iterates the first argument,
tt
, once. [What I mean by this is that when passedmynums
(as defined below) it only outputsmynums: 0 1 2 ...
once. This precludes functionBoth_TwoPass
.] - It processes the data from the first argument,
tt
, in a streaming fashion. [What I mean by this is that, for example, there is insufficient memory to store all the items fromtt
in memory simultaneously, thus precluding functionBoth_NonStream
.]
using System;
using System.Collections.Generic;
using System.Linq;
namespace ConsoleApp
{
static class Extensions
{
public static IEnumerable<T> Trace<T>(this IEnumerable<T> tt, string msg = "")
{
Console.Write(msg);
try
{
foreach (T t in tt)
{
Console.Write(" {0}", t);
yield return t;
}
}
finally
{
Console.WriteLine('.');
}
}
public static (S1, S2) Both_TwoPass<T, S1, S2>(this IEnumerable<T> tt, Func<IEnumerable<T>, S1> f1, Func<IEnumerable<T>, S2> f2)
{
return (f1(tt), f2(tt));
}
public static (S1, S2) Both_NonStream<T, S1, S2>(this IEnumerable<T> tt, Func<IEnumerable<T>, S1> f1, Func<IEnumerable<T>, S2> f2)
{
var tt2 = tt.ToList();
return (f1(tt2), f2(tt2));
}
public static (bool, int) Any5Sum(this IEnumerable<int> ii)
{
int sum = 0;
bool any5 = false;
foreach (int i in ii)
{
sum += i;
any5 |= i > 5; // or: if (!any5) any5 = i > 5;
}
return (any5, sum);
}
}
class Program
{
static void Main()
{
var mynums = Enumerable.Range(0, 10).Trace("mynums:");
Console.WriteLine("TwoPass: (any > 5, sum) = {0}", mynums.Both_TwoPass(tt => tt.Any(k => k > 5), tt => tt.Sum()));
Console.WriteLine("NonStream: (any > 5, sum) = {0}", mynums.Both_NonStream(tt => tt.Any(k => k > 5), tt => tt.Sum()));
Console.WriteLine("Manual: (any > 5, sum) = {0}", mynums.Any5Sum());
}
}
}
推荐答案
我认为我在评论中描述了同一件事.但是,无需创建此类专用 IEnumerable
",因为
I think you and I are describing the same thing in the comments. There is no need to create such a "special-purpose IEnumerable
", though, because the BlockingCollection<>
class already exists for such producer-consumer scenarios. You'd use it as follows...
- 为每个使用函数(即
tt1
和tt2
)创建一个BlockingCollection<>
.- 默认情况下,
BlockingCollection<>
会包装ConcurrentQueue<>
,因此元素将以FIFO顺序到达. - 为了满足您一次只在内存中保留一个元素的要求,将为
- Create a
BlockingCollection<>
for each consuming function (i.e.tt1
andtt2
).- By default, a
BlockingCollection<>
wraps aConcurrentQueue<>
, so the elements will arrive in FIFO order. - To satisfy your requirement that only one element be held in memory at a time,
1
will be specified for the bounded capacity. Note that this capacity is per collection, so with two collections there will be up to two queued elements at any given moment. - Each collection will hold the input elements for that consumer.
- 线程/任务将仅调用 <代码> GetConsumingEnumerator() 用于其输入集合,将生成的
IEnumerable<>
传递给其使用函数,然后返回该结果.-
GetConsumingEnumerable()
的作用与其名称所暗示的一样:它创建了一个IEnumerable<>
,该元素消费(从集合中删除).如果集合为空,则枚举将阻塞,直到添加元素为止.CompleteAdding()
生产者完成后将调用a>,这允许消耗的枚举器在集合为空时退出.
- The thread/task will simply call
GetConsumingEnumerator()
for its input collection, pass the resultingIEnumerable<>
to its consuming function, and return that result.GetConsumingEnumerable()
does just as its name implies: it creates anIEnumerable<>
that consumes (removes) elements from the collection. If the collection is empty, enumeration will block until an element is added.CompleteAdding()
is called once the producer is finished, which allows the consuming enumerator to exit when the collection empties.
- <代码> BlockingCollection<>.添加() 将在集合达到其容量时阻止,从而阻止整个
tt
缓存在内存中.
BlockingCollection<>.Add()
will block if the collection has reached its capacity, preventing the entirety oftt
from being buffered in-memory.
这就是代码中的样子……
Here's what that looks like in code...
public static (S1, S2) Both<T, S1, S2>(this IEnumerable<T> tt, Func<IEnumerable<T>, S1> tt1, Func<IEnumerable<T>, S2> tt2) { const int MaxQueuedElementsPerCollection = 1; using (BlockingCollection<T> collection1 = new BlockingCollection<T>(MaxQueuedElementsPerCollection)) using (Task<S1> task1 = StartConsumerTask(collection1, tt1)) using (BlockingCollection<T> collection2 = new BlockingCollection<T>(MaxQueuedElementsPerCollection)) using (Task<S2> task2 = StartConsumerTask(collection2, tt2)) { foreach (T element in tt) { collection1.Add(element); collection2.Add(element); } // Inform any enumerators created by .GetConsumingEnumerable() // that there will be no more elements added. collection1.CompleteAdding(); collection2.CompleteAdding(); // Accessing the Result property blocks until the Task<> is complete. return (task1.Result, task2.Result); } Task<S> StartConsumerTask<S>(BlockingCollection<T> collection, Func<IEnumerable<T>, S> func) { return Task.Run(() => func(collection.GetConsumingEnumerable())); } }
请注意,为了提高效率,您可以将
MaxQueuedElementsPerCollection
增加到例如10
或100
,这样消费者就不用了彼此同步运行.Note that, for efficiency's sake, you could increase
MaxQueuedElementsPerCollection
to, say,10
or100
so that the consumers don't have to run in lock-step with each other.此代码存在一个问题.当集合为空时,消费者必须等待生产者生产元素,而当集合已满时,生产者必须等待消费者消耗元素.考虑执行
tt =>的过程中发生了什么.tt.Any(k => k> 5)
lambda ...There is one problem with this code, though. When a collection is empty the consumer has to wait for the producer to produce an element, and when a collection is full the producer has to wait for the consumer to consume an element. Consider what happens mid-way through the execution of your
tt => tt.Any(k => k > 5)
lambda...- 生产者等待集合不完整,然后添加
5
. - 使用者等待该集合为非空并删除
5
.-
5>5
返回false
并继续枚举.
-
- The producer waits for the collection to be non-full and adds
5
. - The consumer waits for the collection to be non-empty and removes
5
.5 > 5
returnsfalse
and enumeration continues.
-
6>5
返回true
,并且枚举停止.Any()
,lambda和使用者任务都将返回.
6 > 5
returnstrue
and enumeration stops.Any()
, the lambda, and the consumer task all return.
- 消费者已经放弃了枚举,因此它不会消耗任何元素来为新元素腾出空间.
Add()
将永远不会返回.
- The consumer has already abandoned the enumeration, so it won't consume any elements to make room for the new one.
Add()
will never return.
我想出的最干净的方法来防止这种死锁是即使
func
不这样做,也要确保对整个集合进行枚举.只需对StartConsumerTask<>()
The cleanest way I could come up with to prevent this deadlock is to ensure the entire collection gets enumerated even if
func
doesn't do so. This just requires a simple change to theStartConsumerTask<>()
local method...Task<S> StartConsumerTask<S>(BlockingCollection<T> collection, Func<IEnumerable<T>, S> func) { return Task.Run( () => { try { return func(collection.GetConsumingEnumerable()); } finally { // Prevent BlockingCollection<>.Add() calls from // deadlocking by ensuring the entire collection gets // consumed even if func abandoned its enumeration early. foreach (T element in collection.GetConsumingEnumerable()) { // Do nothing... } } } ); }
这样做的缺点是,即使两者
tt1
和tt2
尽早放弃其枚举器.The downside of this is that
tt
will always be enumerated to completion, even if bothtt1
andtt2
abandon their enumerators early.解决了这个问题,
static void Main() { IEnumerable<int> mynums = Enumerable.Range(0, 10).Trace("mynums:"); Console.WriteLine("Both: (any > 5, sum) = {0}", mynums.Both(tt => tt.Any(k => k > 5), tt => tt.Sum())); }
...输出此...
...outputs this...
mynums: 0 1 2 3 4 5 6 7 8 9. Both: (any > 5, sum) = (True, 45)
这篇关于一次消耗一次IEnumerable的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
-
- By default, a
- 默认情况下,