Akka.net-如何等待子参与者在停止之前处理所有挂起的消息 [英] Akka.net - How to wait child actor to process all pending messages prior to stop

查看:29
本文介绍了Akka.net-如何等待子参与者在停止之前处理所有挂起的消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们有一个名为A的集群分片参与者,并且它有多个子参与者使用每实体模式创建的子项,如下所示。 当我们将100条消息从演员B告知D并且演员D花费比方说500毫秒来处理每条消息时,同时,当我们使用Context.Parent.Tell(new钝化(PoisonPill.Instance))将毒丸发送给演员A时; 它会立即停止所有子执行元(包括执行元D),而不处理挂起的消息。

    A
    |
    B    
   / 
  C   D

是否有办法等待执行元D处理所有消息?

推荐答案

https://stackoverflow.com/a/70286526/377476是一个良好的开端;您将需要一个自定义关闭消息。当父执行元终止时,其子级将通过/system消息自动终止,这些消息将取代其队列中任何未处理的/user消息。

因此,您需要做的是确保在父级终止自身之前处理它们的所有/user消息。有一种简单的方法可以将GracefulStop扩展方法与您的自定义停止消息结合使用:

public sealed class ActorA : ReceiveActor{
    private IActorRef _actorB;  
    
    private readonly ILoggingAdapter _log = Context.GetLogger();
    
    public ActorA(){
        Receive<StartWork>(w => {
            foreach(var i in Enumerable.Range(0, w.WorkCount)){
                _actorB.Tell(i);
            }
        });
        
        ReceiveAsync<MyStopMessage>(async _ => {
            _log.Info("Begin shutdown");
            
            // stop child actor B with the same custom message
            await _actorB.GracefulStop(TimeSpan.FromSeconds(10), _);
            
            // shut ourselves down after child is done
            Context.Stop(Self);
        });
    }
    
    protected override void PreStart(){
        _actorB = Context.ActorOf(Props.Create(() => new ActorB()), "b");
    }
}

public sealed class ActorB : ReceiveActor{
    private IActorRef _actorC;
    private IActorRef _actorD;
    
    private readonly ILoggingAdapter _log = Context.GetLogger();
    
    public ActorB(){
        Receive<int>(i => {
            _actorC.Tell(i);
            _actorD.Tell(i);
        });
        
        ReceiveAsync<MyStopMessage>(async _ => {
            
            _log.Info("Begin shutdown");
            
            // stop both actors in parallel
            var stopC = _actorC.GracefulStop(TimeSpan.FromSeconds(10));
            var stopD = _actorD.GracefulStop(TimeSpan.FromSeconds(10));
            
            // compose stop Tasks
            var bothStopped = Task.WhenAll(stopC, stopD);
            await bothStopped;
            
            // shut ourselves down immediately
            Context.Stop(Self);
        });
    }
    
    protected override void PreStart(){
        var workerProps = Props.Create(() => new WorkerActor());
        _actorC = Context.ActorOf(workerProps, "c");
        _actorD = Context.ActorOf(workerProps, "d");
    }
}

public sealed class WorkerActor : ReceiveActor {
    private readonly ILoggingAdapter _log = Context.GetLogger();
    
    public WorkerActor(){
        ReceiveAsync<int>(async i => {
            await Task.Delay(10);
            _log.Info("Received {0}", i);
        });
    }
}

我在这里创建了此示例的可运行版本:https://dotnetfiddle.net/xiGyWM-您将看到MyStopMessage在示例开始后不久收到,但C和D完成工作后收到。在此方案中,所有工作都在任何参与者终止之前完成。

这篇关于Akka.net-如何等待子参与者在停止之前处理所有挂起的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆