使用AsObservable观察TPL数据流块而无需消耗消息 [英] Using AsObservable to observe TPL Dataflow blocks without consuming messages

查看:81
本文介绍了使用AsObservable观察TPL数据流块而无需消耗消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一连串的TPL Dataflow块,想观察系统内部的进度.

I have a chain of TPL Dataflow blocks and would like to observe progress somewhere inside the system.

我知道我可以将TransformBlock塞入要观察的网格中,将其发布到各种进度更新器中,然后将消息原封不动地返回到下一个块.我不喜欢这种解决方案,因为该块纯粹是因为它的副作用而存在,而且我还必须在我想观察的任何地方更改块链接逻辑.

I am aware that I could just jam a TransformBlock into the mesh where I want to observe, get it to post to a progress updater of some variety and then return the message unchanged to the next block. I don't love this solution as the block would be purely there for its side-effect and I would also have to change the block linking logic wherever I want to observe.

所以我想知道是否可以使用ISourceBlock<T>.AsObservable观察消息在内的传递,而无需更改而不消耗消息.如果可行,这似乎既是一种更纯净又更实际的解决方案.

So I wondered if I could use ISourceBlock<T>.AsObservable to observe the passing of messages within the mesh without altering it and without consuming the messages. This seems both a purer and more practical solution, if it worked.

从对Rx的(有限的)理解来看,这意味着我需要可观察的对象是热的而不是冷的,以便我的progress更新程序可以看到该消息但不使用它.而.Publish().RefCount()似乎是使可观察的热点升温的方法.但是,它根本无法按预期工作-而是block2progress接收并使用每条消息.

From my (limited) understanding of Rx that means that I need the observable to be hot rather than cold, so that my progress updater sees the message but doesn't consume it. And .Publish().RefCount() seems to be the way to make an observable hot. However, it simply does not work as intended - instead either block2 or progress receives and consumes each message.

// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()), new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 }); 
var obs = block1.AsObservable().Publish().RefCount(); // Declare this here just in case it makes a difference to do it before the LinkTo call.
var l1 = block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true});

// Progress
obs.ForEachAsync(i => Debug.Print("progress:" + i.ToString()));

// Start
var vals = Enumerable.Range(1, 5);
foreach (var v in vals)
{
    block1.Post(v);
}
block1.Complete();

结果是不确定的,但我得到的结果是这样的:

Result is non-deterministic but I get something mixed like this:

block2:21
progress:22
progress:24
block2:23
progress:25

那么,我是在做错什么,还是由于TPL Dataflow AsObservable的实现方式而使这不可能吗?

So, am I doing something wrong, or is this impossible due to the way the way TPL Dataflow AsObservable is implemented?

我意识到我也可以用Observable/Observer对替换block1block2之间的LinkTo,这也许可以工作,但是用下游BoundedCapacity = 1LinkTo是我使用TPL的全部原因首先是数据流.

I realise I could also replace the LinkTo between block1 and block2 with an Observable/Observer pair and that might work, but LinkTo with downstream BoundedCapacity = 1 is the whole reason I'm using TPL Dataflow in the first place.

修改: 一些澄清:

  • 我确实打算在block2中设置BoundedCapacity=1.尽管在这个琐碎的示例中没有必要,但是在下游约束的情况下,我发现TPL Dataflow确实很有用.
  • 为阐明我在第二段中拒绝的解决方案,将添加以下链接在block1和block2之间的块:

  • I did intend to set BoundedCapacity=1 in block2. While it's unnecessary in this trivial example, the downstream-constrained case is where I find TPL Dataflow really useful.
  • To clarify the solution I rejected in my second paragraph, it would be to add the following block linked in between block1 and block2:

var progressBlock = new TransformBlock<int, int>( i => {SomeUpdateProgressMethod(i); return i;});

我还想保持背压,以便如果进一步的上游区块将工作分配给block1和其他等效的工作人员,则如果该链是已经很忙.

I would also like to maintain back-pressure so that if a further-upstream block was distributing work to block1 and also other equivalent workers, it wouldn't send work to block1 if that chain was already busy.

推荐答案

您的代码存在的问题是,您要连接两个block1使用者.然后,数据流只是为首先存在的消费者提供一个价值.

The issue with your code is that you're wiring up two consumers of block1. Dataflow is then just giving a value to which ever consumer is there first.

因此,您需要将block1中的值广播到其他两个块中,然后才能独立使用它们.

So you need to broadcast the values from block1 into two other blocks to then be able to consume those independently.

请注意,请勿执行.Publish().RefCount(),因为它不会执行您认为的操作.这将有效地使一个运行仅可观察到,即在该运行期间将允许多个观察者连接并看到相同的值.它与数据源无关,也与Dataflow块如何相互作用无关.

Just a side note, don't do .Publish().RefCount() as it doesn't do what you think. It will effectively make a one run only observable that during that one run will allow multiple observers to connect and see the same values. It has nothing to do with the source of the data nor how the Dataflow blocks interact.

尝试以下代码:

// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20);
var block_boadcast = new BroadcastBlock<int>(i => i, new DataflowBlockOptions());
var block_buffer = new System.Threading.Tasks.Dataflow.BufferBlock<int>();
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()));
var obs = block_buffer.AsObservable();
var l1 = block1.LinkTo(block_boadcast);
var l2 = block_boadcast.LinkTo(block2);
var l3 = block_boadcast.LinkTo(block_buffer);

// Progress
obs.Subscribe(i => Debug.Print("progress:" + i.ToString()));

// Start
var vals = Enumerable.Range(1, 5);
foreach (var v in vals)
{
    block1.Post(v);
}
block1.Complete();

那给了我


block2:21
block2:22
block2:23
block2:24
block2:25
progress:21
progress:22
progress:23
progress:24
progress:25

我想你想要的是什么.

现在,作为一个进一步的说法,为此使用Rx可能是一个更好的选择.它比任何TPL或Dataflow选项都更加强大和声明.

Now, just as a further aside, using Rx for this might be a better option all around. It's much more powerful and declarative than any TPL or Dataflow option.

您的代码可以归结为:

Observable
    .Range(1, 5)
    .Select(i => i + 20)
    .Do(i => Debug.Print("progress:" + i.ToString()));
    .Subscribe(i => Debug.Print("block2:" + i.ToString()));

几乎可以给您相同的结果.

That pretty much gives you same result.

这篇关于使用AsObservable观察TPL数据流块而无需消耗消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆