tpl-dataflow相关内容

使用TPL数据流对持续时间或阈值进行批处理

我已经使用TPL数据流实现了生产者..消费者模式。用例是代码从Kafka总线读取消息。为提高效率,我们需要在进入数据库时批量处理消息。 TPL数据流中是否有办法在达到大小或持续时间阈值时保留消息并触发? 例如,一旦消息从队列中拉出,当前实现将发布该消息。 postedSuccessfully = targetBuffer.Post(msg.Value); 推荐答案 ..
发布时间:2022-03-16 14:25:39 C#/.NET

VS 2012 RC 中引用 TPL 数据流和 TPL 的问题

我刚刚将 Visual Studio 11 Beta 升级到新的 Visual Studio 2012 RC,但在引用 TPL 数据流时遇到了问题. 首先,我尝试像以前一样通过从框架中添加引用来引用 Dataflow.但是当我尝试这样做时,我得到一个错误框: 无法添加对“System.Threading.Tasks.Dataflow"的引用. 然后整个 Visual Studio ..

TPL 完成 vs 完成

我需要从旧数据库中导入与客户相关的数据,并在此过程中执行多项转换.这意味着单个条目需要执行额外的“事件"(同步产品、创建发票等). 我最初的解决方案是一个简单的并行方法.它工作正常,但有时它有问题.如果当前处理的客户需要等待相同类型的事件,他们的处理队列可能会卡住并最终超时,导致每个底层事件也失败(它们取决于失败的事件).它不会一直发生,但很烦人. 所以我有了另一个想法,分批工作.我的 ..

TPL Dataflow,Post() 和 SendAsync() 之间的功能区别是什么?

我对通过 Post() 或 SendAsync() 发送项目之间的区别感到困惑.我的理解是,在所有情况下,一旦项目到达数据块的输入缓冲区,控制权就会返回到调用上下文,对吗?那我为什么需要 SendAsync 呢?如果我的假设不正确,那么我想知道相反,如果使用数据块的整个想法是建立并发和异步环境,为什么有人会使用 Post(). 我当然理解技术上的区别在于 Post() 返回一个 bool 而 ..

限制异步任务

我想运行一堆异步任务,并限制在任何给定时间待完成的任务数量. 假设您有 1000 个 URL,并且您只想一次打开 50 个请求;但是一旦一个请求完成,您就会打开到列表中下一个 URL 的连接.那样的话,一次总是正好有 50 个连接打开,直到 URL 列表用完为止. 如果可能,我还想使用给定数量的线程. 我想出了一个扩展方法,ThrottleTasksAsync 可以满足我的需求. ..
发布时间:2021-11-30 13:30:51 C#/.NET

AsyncLocal 值与 TPL 数据流不正确

考虑这个例子: class 程序{私有静态只读 ITargetBlock网格 = 创建网格();private static readonly AsyncLocal;异步本地上下文= new AsyncLocal();静态异步任务 Main(string[] args){var 任务 = Enumerable.Range(1, 4).Select(ProcessMessage);等待 Task. ..
发布时间:2021-11-24 16:49:31 C#/.NET

使用 HttpClient 和 Polly 发送并行请求,但每个主机只有一个,以优雅地处理 429 响应

简介: 我正在构建一个单节点网络爬虫来简单地验证 .NET Core 控制台应用程序中的 URL 是否200 OK.我在不同的主机上有一组 URL,我使用 HttpClient 向这些主机发送请求.我刚开始使用 Polly 和 TPL Dataflow. 要求: 我想支持与一个并行发送多个 HTTP 请求可配置的MaxDegreeOfParallelism. 我想将对任何给定主 ..
发布时间:2021-11-24 13:23:16 C#/.NET

合并数据流结果

我正在开发一个 Dataflow 管道,它读取一组文件,并针对每个文件中的每一行执行一系列 Dataflow 块. 在为文件中的每一行完成所有步骤后,我想对文件本身执行更多块,但我不知道这怎么可能. 通过 TransformManyBlock 拆分处理很简单,但是如何合并? 我习惯了 Apache Camel 的 Splitter 和 Aggregator 功能 - 或者 Dataf ..
发布时间:2021-11-11 22:50:52 C#/.NET

TPL DataFlow 一一处理

我有一个持续处理消息的系统.我想确保仅在处理上一条消息时才从外部队列请求消息.让我们想象一下 GetMessages 方法从外部队列请求消息. 有事件 1. 将推送它 推送 1 有活动 2.将推动它 - 我的音乐会到了.由于我们在处理之前获得项目 处理 1 已处理 1 已删除 1 个 代码: 使用系统;使用 System.Collections.Generic;使用 Sys ..
发布时间:2021-09-04 19:37:59 C#/.NET

如何监控 TPL 数据流网格中的进度?

我正在开发一个 C# 应用程序,该应用程序具有必须异步执行的耗时顺序工作流.它在用户按下按钮时启动,应用程序在几毫秒内接收到从相机捕获的一些图像.然后是工作流程. 将图像保存到磁盘 对齐它们. 从它们生成 3d 数据. 将它们组合成一个更大的集体对象(称为“扫描"). 向此扫描添加可选的分析数据并执行它. 最终保存扫描本身与图像一起保存到一个 xml 文件中. 其中一些步骤是 ..
发布时间:2021-09-04 19:37:42 C#/.NET

如何发出数据流完成的信号?

我有一个使用 TPL 数据流实现由 3 个步骤组成的数据流的类. 在构造函数中,我将步骤创建为 TransformBlocks,并使用 LinkTo 将它们链接起来,DataflowLinkOptions.PropagateCompletion 设置为 true.该类公开了一个方法,该方法通过在第一步调用 SendAsync 来启动工作流.该方法返回工作流最后一步的“完成"属性. 目前 ..
发布时间:2021-09-04 19:37:33 C#/.NET

从 ActionBlock 更新 UI 控件

我一直试图通过创建示例应用程序来理解 TPL 数据流.我一直在尝试做的一件事是从 ActionBlock 更新一个 TextBox 控件.使用 TPL Dataflow 的原因是在保持顺序的同时执行并行异步操作.下面的函数是我写的, private TaskScheduler 调度器 = null;公共 Form1(){this.scheduler = TaskScheduler.FromCur ..
发布时间:2021-09-04 19:37:27 C#/.NET

理解 TPL 数据流并行度排序

我正在阅读 Dataflow (任务并行库),并且有一部分说: 当您指定的最大并行度大于 1 时,会同时处理多条消息,因此,消息可能不会按照接收顺序进行处理.但是,从块中输出消息的顺序将是正确排序的. 什么意思? 例如,我将操作块设置为并行度 = 5: testActionBlock = new ActionBlock(i => Consumer(i),新的 Execution ..
发布时间:2021-09-04 19:37:03 C#/.NET

TPL Dataflow 如何去除块之间的链接

我想知道.如何删除块之间的链接?换句话说.我想与 LinkTo 相反. 我想写一个基于 tlp 数据流的记录器. 我写了这个接口,想在需要的时候删除对 ILogListener 的订阅. 公共接口ILogManager{void RemoveListener(ILogListener 监听器);} 解决方案 链接块时: var link = someSourceBlock.Li ..
发布时间:2021-09-04 19:37:00 C#/.NET

处理SQS item Queue的多线程方法

在这个场景中,我必须从队列中轮询 AWS SQS 消息,每个异步请求最多可以获取 10 个 sqs 项目/消息.轮询项目后,我必须在 kubernetes pod 上处理这些项目.项目处理包括从少数 API 调用中获取响应,这可能需要一些时间 &然后将项目保存到 DB &S3.我做了一些 R&D &得出以下结论 为了使用消费者生产者模型,1 个线程将轮询项目 &另一个线程将处理该项目或使用多 ..

TPL 数据流生产者消费者模式

刚刚使用 TPL DataFlow 编写了一个示例生产者消费者模式.我有一些基本的问题. 只有在生产者发布所有项目后,消费者才处于活动状态.异步是否意味着生产和消费任务可以并行运行. 给消费者一个睡眠时间来验证它是否阻塞了其他数据项.它似乎是按顺序执行的,没有获得任何并行性. 我在这里做错了吗? class AscDataBlocks{公共 Int64 开始;公共 Int64 ..
发布时间:2021-09-04 19:36:04 C#/.NET