tpl-dataflow相关内容
我已经使用TPL数据流实现了生产者..消费者模式。用例是代码从Kafka总线读取消息。为提高效率,我们需要在进入数据库时批量处理消息。 TPL数据流中是否有办法在达到大小或持续时间阈值时保留消息并触发? 例如,一旦消息从队列中拉出,当前实现将发布该消息。 postedSuccessfully = targetBuffer.Post(msg.Value); 推荐答案
..
我刚刚将 Visual Studio 11 Beta 升级到新的 Visual Studio 2012 RC,但在引用 TPL 数据流时遇到了问题. 首先,我尝试像以前一样通过从框架中添加引用来引用 Dataflow.但是当我尝试这样做时,我得到一个错误框: 无法添加对“System.Threading.Tasks.Dataflow"的引用. 然后整个 Visual Studio
..
我需要从旧数据库中导入与客户相关的数据,并在此过程中执行多项转换.这意味着单个条目需要执行额外的“事件"(同步产品、创建发票等). 我最初的解决方案是一个简单的并行方法.它工作正常,但有时它有问题.如果当前处理的客户需要等待相同类型的事件,他们的处理队列可能会卡住并最终超时,导致每个底层事件也失败(它们取决于失败的事件).它不会一直发生,但很烦人. 所以我有了另一个想法,分批工作.我的
..
当两个转换块完成时,我如何重写代码完成的代码?我认为完成意味着它被标记为完成并且“输出队列"为空? 公共测试(){BroadCastBlock = new BroadcastBlock(i =>{返回我;});transformBlock1 = new TransformBlock(i =>{Console.WriteLine("1 个输入计数:" + transformBlock1.Input
..
我对通过 Post() 或 SendAsync() 发送项目之间的区别感到困惑.我的理解是,在所有情况下,一旦项目到达数据块的输入缓冲区,控制权就会返回到调用上下文,对吗?那我为什么需要 SendAsync 呢?如果我的假设不正确,那么我想知道相反,如果使用数据块的整个想法是建立并发和异步环境,为什么有人会使用 Post(). 我当然理解技术上的区别在于 Post() 返回一个 bool 而
..
我想运行一堆异步任务,并限制在任何给定时间待完成的任务数量. 假设您有 1000 个 URL,并且您只想一次打开 50 个请求;但是一旦一个请求完成,您就会打开到列表中下一个 URL 的连接.那样的话,一次总是正好有 50 个连接打开,直到 URL 列表用完为止. 如果可能,我还想使用给定数量的线程. 我想出了一个扩展方法,ThrottleTasksAsync 可以满足我的需求.
..
我正在寻找 .NET 4.0 版本的 TPL 数据流库. Nuget 包有一个 4.0 版本的库,但它似乎面向 .NET 4.5. 我发现了对 4.0 版本的各种引用,例如在这个论坛中: http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/6206c714-6dee-4d17-a880-26d0c137
..
考虑这个例子: class 程序{私有静态只读 ITargetBlock网格 = 创建网格();private static readonly AsyncLocal;异步本地上下文= new AsyncLocal();静态异步任务 Main(string[] args){var 任务 = Enumerable.Range(1, 4).Select(ProcessMessage);等待 Task.
..
简介: 我正在构建一个单节点网络爬虫来简单地验证 .NET Core 控制台应用程序中的 URL 是否200 OK.我在不同的主机上有一组 URL,我使用 HttpClient 向这些主机发送请求.我刚开始使用 Polly 和 TPL Dataflow. 要求: 我想支持与一个并行发送多个 HTTP 请求可配置的MaxDegreeOfParallelism. 我想将对任何给定主
..
我正在开发一个 Dataflow 管道,它读取一组文件,并针对每个文件中的每一行执行一系列 Dataflow 块. 在为文件中的每一行完成所有步骤后,我想对文件本身执行更多块,但我不知道这怎么可能. 通过 TransformManyBlock 拆分处理很简单,但是如何合并? 我习惯了 Apache Camel 的 Splitter 和 Aggregator 功能 - 或者 Dataf
..
我有一个持续处理消息的系统.我想确保仅在处理上一条消息时才从外部队列请求消息.让我们想象一下 GetMessages 方法从外部队列请求消息. 有事件 1. 将推送它 推送 1 有活动 2.将推动它 - 我的音乐会到了.由于我们在处理之前获得项目 处理 1 已处理 1 已删除 1 个 代码: 使用系统;使用 System.Collections.Generic;使用 Sys
..
我正在开发一个 C# 应用程序,该应用程序具有必须异步执行的耗时顺序工作流.它在用户按下按钮时启动,应用程序在几毫秒内接收到从相机捕获的一些图像.然后是工作流程. 将图像保存到磁盘 对齐它们. 从它们生成 3d 数据. 将它们组合成一个更大的集体对象(称为“扫描"). 向此扫描添加可选的分析数据并执行它. 最终保存扫描本身与图像一起保存到一个 xml 文件中. 其中一些步骤是
..
我有一个使用 TPL 数据流实现由 3 个步骤组成的数据流的类. 在构造函数中,我将步骤创建为 TransformBlocks,并使用 LinkTo 将它们链接起来,DataflowLinkOptions.PropagateCompletion 设置为 true.该类公开了一个方法,该方法通过在第一步调用 SendAsync 来启动工作流.该方法返回工作流最后一步的“完成"属性. 目前
..
我一直试图通过创建示例应用程序来理解 TPL 数据流.我一直在尝试做的一件事是从 ActionBlock 更新一个 TextBox 控件.使用 TPL Dataflow 的原因是在保持顺序的同时执行并行异步操作.下面的函数是我写的, private TaskScheduler 调度器 = null;公共 Form1(){this.scheduler = TaskScheduler.FromCur
..
我正在研究 TPL 数据流管道,并注意到一些与 TransformManyBlock 中的排序/并行相关的奇怪行为(也可能适用于其他块). 这是我要重现的代码(.NET 4.7.2,TPL Dataflow 4.9.0): class 程序{static void Main(string[] args){var sourceBlock = new TransformManyBlock>(i
..
给定一个 BroadcastBlock 在缓冲区中有一条消息,是否有可能阻止该消息被发送到新链接的目标?例如: static void Main(string[] args){var myBroadcastBlock = new BroadcastBlock(msg => msg);var myActionBlock = new ActionBlock(msg => Console.WriteL
..
我正在阅读 Dataflow (任务并行库),并且有一部分说: 当您指定的最大并行度大于 1 时,会同时处理多条消息,因此,消息可能不会按照接收顺序进行处理.但是,从块中输出消息的顺序将是正确排序的. 什么意思? 例如,我将操作块设置为并行度 = 5: testActionBlock = new ActionBlock(i => Consumer(i),新的 Execution
..
我想知道.如何删除块之间的链接?换句话说.我想与 LinkTo 相反. 我想写一个基于 tlp 数据流的记录器. 我写了这个接口,想在需要的时候删除对 ILogListener 的订阅. 公共接口ILogManager{void RemoveListener(ILogListener 监听器);} 解决方案 链接块时: var link = someSourceBlock.Li
..
在这个场景中,我必须从队列中轮询 AWS SQS 消息,每个异步请求最多可以获取 10 个 sqs 项目/消息.轮询项目后,我必须在 kubernetes pod 上处理这些项目.项目处理包括从少数 API 调用中获取响应,这可能需要一些时间 &然后将项目保存到 DB &S3.我做了一些 R&D &得出以下结论 为了使用消费者生产者模型,1 个线程将轮询项目 &另一个线程将处理该项目或使用多
..
刚刚使用 TPL DataFlow 编写了一个示例生产者消费者模式.我有一些基本的问题. 只有在生产者发布所有项目后,消费者才处于活动状态.异步是否意味着生产和消费任务可以并行运行. 给消费者一个睡眠时间来验证它是否阻塞了其他数据项.它似乎是按顺序执行的,没有获得任何并行性. 我在这里做错了吗? class AscDataBlocks{公共 Int64 开始;公共 Int64
..