tpl-dataflow相关内容

如何使用 TPL 数据流库指定无序执行块?

我想设置一个 TransformBlock 来并行处理它的项目.因此,我将 ExecutionDataflowBlockOptions.MaxDegreeOfParallelism 设置为 > 1.我不关心消息的顺序,但是 文档说: 当您指定的最大并行度大于 1 时,会同时处理多条消息,因此,消息可能不会按照接收顺序进行处理.但是,从块中输出消息的顺序将是正确排序的. “正确排序"是否 ..
发布时间:2021-09-04 19:35:52 C#/.NET

TPL DataFlow - 按持续时间或阈值进行批处理

我已经使用 TPL 数据流实现了生产者..消费者模式.用例是代码从 Kafka 总线读取消息.为了效率,我们需要在去数据库时批量处理消息. 在 TPL 数据流中是否有一种方法可以保留消息并在达到大小或持续时间阈值时触发? 例如,当前实现一旦从队列中拉出消息就发布消息. postedSuccessfully = targetBuffer.Post(msg.Value); 解决方案 ..
发布时间:2021-09-04 19:35:49 其他开发

异步任务,视频缓冲

我正在尝试理解 C# 中的任务,但仍有一些问题.我正在尝试创建一个包含视频的应用程序.主要目的是从文件中读取视频(我使用的是 Emgu.CV)并通过 TCP/IP 将其发送到板中进行处理,然后以流(实时)方式返回.首先,我是连续做的.因此,读取Bitmap,从板子发送-接收,并绘图.但是读取位图并绘制它们需要太多时间.我想要一个传输、接收 FIFO 缓冲区来保存视频帧,以及一个不同的任务来完成发送 ..
发布时间:2021-09-04 19:33:12 C#/.NET

对缓冲的 Observable 进行排序

我有一个生成非常快的令牌流和一个相对较慢的处理器.令牌分为三个子类型,我希望它们按优先级进行处理.因此,我希望令牌在生成并等待处理后进行缓冲,并按优先级排序该缓冲区. 这是我的课程: 公共枚举优先级{高 = 3,中 = 2,低 = 1}公共类 Base : IComparable{公共 int Id { 获取;放;}public int CompareTo(Base other){返回 I ..
发布时间:2021-09-04 18:36:56 C#/.NET

如何获取异常的上下文

我正在使用 TaskParallelLibrary DataFlow 结合由 Stephen Cleary 设计的 Try 库 (https://github.com/StephenCleary/Try) 来实现所谓的“铁路编程",这样我就可以通过管道传递 Exception 数据.我想知道是否有可能在 ActionBlock 中获取一些上下文,或者(在我的情况下)究竟是哪个项目导致了 Excep ..
发布时间:2021-06-18 18:34:13 C#/.NET

具有延迟的 TPL 数据流队列

我正在使用 ActionBlock 同时处理一个队列. 这里的一个问题是,在处理队列中的一项时,我可能想等到处理队列中的另一项满足依赖关系. 我认为我应该能够使用 TPL DataFlow 库来实现这一点,其中包含链接、延迟和延迟释放,但我不确定要使用什么结构. 在伪代码中: 公共类项目{公共字符串名称{获取;放;}公共列表DependsOn = new List();}Act ..

作为单个输入的结果,是否可以让任何数据流块类型发送多个中间结果?

如果等待整个 IEnumerable 被填充,是否有可能让 TransformManyBlock 在创建中间结果时将它们发送到下一步? 我所做的所有测试表明,TransformManyBlock 仅在完成时将结果发送到下一个块;下一个块然后一次读取这些项目. 这似乎是基本功能,但我在任何地方都找不到任何示例. 用例是在读取文件时处理文件块.在我的情况下,在我可以处理任何事情之前需 ..
发布时间:2021-06-14 18:51:15 C#/.NET

动作块可以包含状态吗?

我正在编写一个使用TPL数据流的应用程序.我正在尝试配置一个动作块以写入数据库. 但是,我需要此操作块对收到的第一条消息执行初始化步骤(请注意,我必须等待第一条消息并且不能在创建操作块期间执行初始化操作.) 因此,我的操作块需要保持某种状态,以指示其是否已经收到第一条消息. ActionBlock是否可以维持状态? 参考下面的Microsoft示例代码,如何将状态变量添加到 ..
发布时间:2021-05-28 20:08:50 C#/.NET

与仅循环相比,为什么我的TPL数据流管道在读取巨大的CSV文件时速度较慢?

所以我的要求是读取多个CSV文件(每个文件至少有一百万行),然后解析每一行.目前,按照我分解管道的方式,我首先创建一个单独的管道,仅将CSV文件读入字符串[],然后计划稍后再创建解析管道. 但是看到文件读取管道的结果,我很傻,因为它比循环遍历CSV文件然后遍历行要慢得多. 静态公共IPropagatorBlockCreatePipeline(int b ..
发布时间:2021-04-23 20:39:59 C#/.NET

如何以(线程)安全的方式跟踪TPL管道中的故障项

我正在将TPL管道设计与Stephen Cleary的 Try库一起使用简而言之,它包装了价值/例外并将其浮动到管道中.因此,即使在处理方法中引发异常的项目,最后当我等待resultBlock.Completion; 时,其状态代码也仍然为 Status = RunToCompletion .因此,我需要其他方式来注册有缺陷的项目.这是小样本: var downloadBlock = new ..
发布时间:2021-04-18 20:15:52 C#/.NET

任务数据流,可以从完成状态更改数据块吗?

我想知道是否可以更改数据块的完成状态? 例如,我标记了一个 var block = new BufferBlock(); 数据块,其中包含 block.Complete().该块链接到其他数据块.我想知道是否可以通过将 block 的完成状态更改回其原始!complete状态来使其再次运行. 如果不可能,那么如何执行多个运行(包括完成),而不必a)取消链接所有块,b)重新实 ..
发布时间:2021-04-18 20:10:10 C#/.NET

BatchBlock生成带有在TriggerBatch()之后发送的元素的批处理

我有一个由几个块组成的数据流管道.当元素流过我的处理管道时,我想按字段 A 将它们分组.为此,我有一个 BatchBlock ,它的 BoundedCapacity 很高.在其中存储我的元素,直到我决定应该发布它们为止.因此,我调用 TriggerBatch()方法. private void Forward(TStronglyTyped数据){如果(ShouldCreateNewGroup( ..
发布时间:2021-04-18 19:58:00 C#/.NET

具有限制容量的转换块中的TPL数据流异常

我需要构建将处理大量消息的TPL数据流管道.因为有很多消息,所以我不能简单地将它们 Post 放入 BufferBlock 的无限队列中,否则我将面临内存问题.所以我想使用 BoundedCapacity = 1 选项禁用队列,并使用 MaxDegreeOfParallelism 进行并行任务处理,因为我的 TransformBlock 可能需要一些时间每条消息.我还使用 PropagateCom ..
发布时间:2021-04-18 19:32:31 C#/.NET

如果排队的项目数小于BatchSize,如何在超时后自动调用TriggerBatch?

使用数据流CTP(在TPL中) 如果超时后当前排队或推迟的项目数小于BatchSize,是否可以自动调用BatchBlock.TriggerBatch? 更好的是:每次块收到新项目时,此超时应重置为0. 解决方案 是的,您可以通过将块链接在一起来完成此操作.在这种情况下,您要设置一个TransformBlock,该链接在BatchBlock之前“链接".看起来像这样: Ti ..
发布时间:2021-04-18 19:25:42 C#/.NET

带有前提条件的数据流TPL实现管道

我有一个关于使用Dataflow TPL库实现管道的问题. 我的情况是我有一个需要同时处理某些任务的软件.处理如下所示:首先我们在全局级别处理相册,然后进入相册并分别处理每张图片.假设应用程序具有处理插槽,并且它们是可配置的(为示例起见,假设插槽= 2).这意味着应用程序可以处理以下任一情况: a)同时发行两张专辑 b)一张专辑+一张来自不同专辑的照片 c)同一张相册中的两张照片 ..
发布时间:2021-04-18 19:17:29 C#/.NET

对于TPL数据流:如何在阻塞直到处理完所有输入之前如何使用TransformBlock产生的所有输出?

我正在向单个数据库同步提交一系列 select 语句(查询-成千上万个),并且每个查询取回一个 DataTable (注意:该程序这样就可以知道它仅在运行时进行扫描的数据库模式,因此可以使用 DataTables ).该程序在客户端计算机上运行,​​并连接到远程计算机上的数据库.运行如此多的查询需要很长时间.因此,假设异步或并行执行它们可以加快处理速度,我正在研究 TPL Dataflow(TDF ..
发布时间:2021-04-18 19:17:27 C#/.NET

在TPL数据流中保证交付的BroadcastBlock

我有一种数据流,可以用几种不同的方式处理...所以我想将收到的每条消息的副本发送到多个目标,以便这些目标可以并行执行...但是,我需要在我的数据块上设置 BoundedCapacity ,因为数据流的传输速度比我的目标可以处理它们的速度快,并且有大量数据.没有 BoundedCapacity ,我很快就会用光内存. 但是问题是 BroadcastBlock 如果目标无法处理它,则会丢弃消息( ..
发布时间:2021-04-18 18:35:33 C#/.NET

如何使用2个数据源返回异步流

我具有以下函数,该函数以运行 System.Diagnostics.Process 的结果作为异步流返回标准输出数据.该方法中当前的所有操作均按预期进行.我可以在 await foreach()循环中调用它,并获得由外部exe生成的每一行输出. 私有静态异步IAsyncEnumerableProcessAsyncStream(ProcessStartInfo processSt ..
发布时间:2021-04-12 19:07:03 C#/.NET