实现可重试块的正确完成 [英] Implementing correct completion of a retryable block

查看:147
本文介绍了实现可重试块的正确完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

预告:家伙,这个问题不是如何实现重试策略。这是一个TPL数据流块约正确完成。



这问题主要是我刚才的问题的重试ITargetBlock 内的政策。回答这个问题是@ svick的,它利用 TransformBlock (源)和 TransformManyBlock (目标)的智能解决方案。剩下唯一的问题是在正确的方式来完成此块的:等待所有的重试次数,以先完成,然后完成目标块。以下是我结束了(它只是一个片段,不付出太多的关注到非线程重试集):

  VAR重试=新的HashSet< RetryingMessage< TInput>>(); 

TransformManyBlock< RetryableMessage< TInput>中TOutput>目标= NULL;
TARGET = NEW TransformManyBlock< RetryableMessage< TInput>中TOutput>(
异步消息=>
{

{
VAR的结果=新[ ] {的await变换(message.Data)};
retries.Remove(消息);
返回结果;
}
赶上(异常前)
{
message.Exceptions.Add(除息);
如果(message.RetriesRemaining == 0)
{
如果(failureHandler!= NULL)
failureHandler(message.Exceptions) ;

retries.Remove(消息);
}
,否则
{
retries.Add(消息);
message.RetriesRemaining- - ;

Task.Delay(retryDelay)
.ContinueWith(_ => target.Post(消息));
}
返回NULL;
}
},dataflowBlockOptions);

source.LinkTo(目标);

source.Completion.ContinueWith(异步_ =>
{
,而(target.InputCount大于0 || retries.Any())
等待任务.Delay(100);

target.Complete();
});



我们的想法是执行某种轮询,并验证是否还有消息是等待处理并且没有需要重试的消息。但是,在这种解决方案我不喜欢投票的想法。



是的,我可以封装添加/删除重试到一个单独的类的逻辑,甚至如执行某些操作时设定的重试变成空的,但如何应对 target.InputCount> 0 的条件?没有这样的,当有该块没有挂起的消息被调用的回调,这样看来,在一个小延迟循环验证 target.ItemCount 是一个唯一的选项。



有谁知道一个更聪明的方式实现这一目标?


解决方案
< 。p>结合hwcverwe答案,JamieSee评论可能是理想的解决方案。



首先,你需要创建多个事件:

  VAR信号=新的ManualResetEvent(假); 
变种completedEvent =新的ManualResetEvent(假);



然后,你必须创建一个观察者,并订阅 TransformManyBlock ,这样您就可以在相关事件发生时通知:

  VAR观察者=新RetryingBlockObserver< TOutput>( completedEvent); 
无功可观察= target.AsObservable();
observable.Subscribe(观察员);



可观察可以很容易的:

 私有类RetryingBlockObserver< T> :IObserver< T> {
私人的ManualResetEvent completedEvent;

公共RetryingBlockObserver(ManualResetEvent的completedEvent){
this.completedEvent = completedEvent;
}

公共无效OnCompleted(){
completedEvent.Set();
}

公共无效的OnError(异常错误){
// TODO
}

公共无效OnNext(T值){
// TODO
}
}

和您可以等待无论是信号,或完成(全部的源项的耗尽),或两者

  source.Completion.ContinueWith(异步_ = > {

WaitHandle.WaitAll的(completedEvent,信号);!
//或WaitHandle.WaitAny,根据您的需要

target.Complete();
});

您可以检查为WaitAll的结果值来了解哪个事件成立,并做出相应的反应。
你也可以将其他事件添加到代码,它们传递给观察者,以便在需要的时候才可以进行设置。你可以区分你的行为,并有不同的反应时,引发错误,例如:


Teaser: guys, this question is not about how to implement retry policy. It's about correct completion of a TPL Dataflow block.

This question is mostly a continuation of my previous question Retry policy within ITargetBlock. The answer to this question was @svick's smart solution that utilizes TransformBlock (source) and TransformManyBlock (target). The only problem left is to complete this block in a right way: wait for all the retries to be completed first, and then complete the target block. Here is what I ended up with (it's just a snippet, don't pay too many attention to a non-threadsafe retries set):

var retries = new HashSet<RetryingMessage<TInput>>();

TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message =>
    {
        try
        {
            var result = new[] { await transform(message.Data) };
            retries.Remove(message);
            return result;
        }
        catch (Exception ex)
        {
            message.Exceptions.Add(ex);
            if (message.RetriesRemaining == 0)
            {
                if (failureHandler != null)
                    failureHandler(message.Exceptions);

                retries.Remove(message);
            }
            else
            {
                retries.Add(message);
                message.RetriesRemaining--;

                Task.Delay(retryDelay)
                    .ContinueWith(_ => target.Post(message));
            }
            return null;
        }
    }, dataflowBlockOptions);

source.LinkTo(target);

source.Completion.ContinueWith(async _ =>
{
    while (target.InputCount > 0 || retries.Any())
        await Task.Delay(100);

    target.Complete();
});

The idea is to perform some kind of polling and verify whether there are still messages that waiting to be processed and there are no messages that require retrying. But in this solution I don't like the idea of polling.

Yes, I can encapsulate the logic of adding/removing retries into a separate class, and even e.g. perform some action when the set of retries becomes empty, but how to deal with target.InputCount > 0 condition? There is not such a callback that get called when there are no pending messages for the block, so it seems that verifying target.ItemCount in a loop with a small delay is an only option.

Does anybody knows a smarter way to achieve this?

解决方案

Combining hwcverwe answer and JamieSee comment could be the ideal solution.

First, you need to create more than one event:

var signal  = new ManualResetEvent(false);
var completedEvent = new ManualResetEvent(false);

Then, you have to create an observer, and subscribe to the TransformManyBlock, so you are notified when a relevant event happens:

var observer = new RetryingBlockObserver<TOutput>(completedEvent);
var observable = target.AsObservable();
observable.Subscribe(observer);

The observable can be quite easy:

private class RetryingBlockObserver<T> : IObserver<T> {
        private ManualResetEvent completedEvent;

        public RetryingBlockObserver(ManualResetEvent completedEvent) {                
            this.completedEvent = completedEvent;
        }

        public void OnCompleted() {
            completedEvent.Set();
        }

        public void OnError(Exception error) {
            //TODO
        }

        public void OnNext(T value) {
            //TODO
        }
    }

And you can wait for either the signal, or completion (exhaustion of all the source items), or both

 source.Completion.ContinueWith(async _ => {

            WaitHandle.WaitAll(completedEvent, signal);
            // Or WaitHandle.WaitAny, depending on your needs!

            target.Complete();
        });

You can inspect the result value of WaitAll to understand which event was set, and react accordingly. You can also add other events to the code, passing them to the observer, so that it can set them when needed. You can differentiate your behaviour and respond differently when an error is raised, for example

这篇关于实现可重试块的正确完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆