Akka.Net流和远程处理(Sink.ActorRefWithAck) [英] Akka.Net Streams and remoting (Sink.ActorRefWithAck)

查看:188
本文介绍了Akka.Net流和远程处理(Sink.ActorRefWithAck)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 Sink.ActorRefWithAck 使用Akka.net Streams进行了非常简单的实现:订阅者向发布者索要一个大字符串,然后按片段将其发送。
它在本地(UT)正常工作,但不是远程。我不明白怎么了?确切地说:订阅者可以将请求发送给发布者,发布者以 OnInit 消息作为响应,然后以 OnInit.Ack 将永远不会返回发布者。此 Ack 消息以死信结尾:

  INFO Akka.Actor.EmptyLocalActorRef-从akka.tcp:// OutOfProcessTaskProcessing @ localhost:12100 / user / Streamer_636568240846733287到akka:// OutOfProcessTaskProcessing / user / StreamSupervisor-0 / StageActorRef-0的消息确认未送达。遇到1个死信。 

请注意,日志来自目标参与者,因此消息将在正确的过程中处理。没有明显的路径错误。



看看无法处理此消息的发布者代码,我真的不知道自己在做什么错了:

  public static void ReplyWithStreamedString(IUntypedActorContext context,string toStream,int chunkSize = 2000)
{
Source< string,未使用>源= Source.From(toStream.SplitBy(chunkSize));
source.To(Sink.ActorRefWithAck< string>(context.Sender,新StreamMessage.OnInit(),
新StreamMessage.OnInit.Ack(),
新StreamMessage.Completed(),
exception => new StreamMessage.Failure(exception.Message)))
.Run(context.System.Materializer());
}

这是订户代码:

  public static Task< string> AskStreamedString(此ICanTell自身,对象消息,ActorSystem上下文,TimeSpan?超时=空)
{
var tcs = new TaskCompletionSource< string>();
if(timeout.HasValue)
{
CancellationTokenSource ct = new CancellationTokenSource(timeout.Value);
ct.Token.Register(()=> tcs.TrySetCanceled());
}

var props = Props.Create(()=> new StreamerActorRef(tcs));
var tempActor = context.ActorOf(props,$ Streamer_ {DateTime.Now.Ticks});

self.Tell(消息,tempActor);

返回tcs.Task.ContinueWith(task =>
{
context.Stop(tempActor);
if(task.IsCanceled)
throw new OperationCanceledException();
if(task.IsFaulted)
throw task.Exception.GetBaseException();
return task.Result;
});
}

内部类StreamerActorRef:ReceiveActor
{
readonly TaskCompletionSource< string> _tcs;

私有只读StringBuilder _stringBuilder = new StringBuilder();

public StreamerActorRef(TaskCompletionSource< string> tcs)
{
_tcs = tcs;
Ready();
}

private void Ready()
{
ReceiveAny(message =>
{
switch(message)
{
case StreamMessage.OnInit _:
Sender.Tell(new StreamMessage.OnInit.Ack());
break;
case StreamMessage.Completed_:
字符串结果= _stringBuilder.ToString();
_tcs.TrySetResult(结果);
中断;
大小写字符串分片:
_stringBuilder.Append(slice);
发件人。 Tell(new StreamMessage.OnInit.Ack());
中断;
情况StreamMessage.Failure错误:
_tcs.TrySetException(new InvalidOperationException(error.Reason));
中断;
}
});
}
}

有消息:

 公共类StreamMessage 
{
公共类OnInit
{
公共类Ack {}
}

公共课程已完成{}

公共课程失败
{
公共字符串原因{ }

公共失败(字符串原因)
{
原因=原因;
}
}
}


解决方案

通常,与actor ref配合使用的源和接收器均未设计为可通过远程连接使用-它们不覆盖消息重试,如果某些流控制消息无法传递,则可能导致系统死锁。



您要查找的功能称为 StreamRefs (其作用类似于actor ref,但用于流),并将作为一部分提供v1.4版本(有关更多详细信息,请参见 github pull request )。 / p>

I've made quite a simple implementation with Akka.net Streams using Sink.ActorRefWithAck: a subscriber asks for a large string to a publisher which sends it by slices. It works perfectly fine locally (UT) but not remotely. And I cannot understand what's wrong? Concretly: the subscriber is able to send the request to the publisher which responds with an OnInit message but then the OnInit.Ack will never goes back to the publisher. This Ack message ends up as a dead letter:

INFO  Akka.Actor.EmptyLocalActorRef - Message Ack from akka.tcp://OutOfProcessTaskProcessing@localhost:12100/user/Streamer_636568240846733287 to akka://OutOfProcessTaskProcessing/user/StreamSupervisor-0/StageActorRef-0 was not delivered. 1 dead letters encountered.

Note that the log is from the destination actor so the message is handled in the right process. There is no obvious path error.

Looking at the publisher code which does not handle this message, I really don't know what I'm doing wrong:

    public static void ReplyWithStreamedString(IUntypedActorContext context, string toStream, int chunkSize = 2000)
    {
        Source<string, NotUsed> source = Source.From(toStream.SplitBy(chunkSize));
        source.To(Sink.ActorRefWithAck<string>(context.Sender, new StreamMessage.OnInit(),
                new StreamMessage.OnInit.Ack(),
                new StreamMessage.Completed(),
                exception => new StreamMessage.Failure(exception.Message)))
            .Run(context.System.Materializer());
    }

Here is the subscriber code:

public static Task<string> AskStreamedString(this ICanTell self, object message, ActorSystem context, TimeSpan? timeout = null)
    {
        var tcs = new TaskCompletionSource<string>();
        if (timeout.HasValue)
        {
            CancellationTokenSource ct = new CancellationTokenSource(timeout.Value);
            ct.Token.Register(() => tcs.TrySetCanceled());
        }

        var props = Props.Create(() => new StreamerActorRef(tcs));
        var tempActor = context.ActorOf(props, $"Streamer_{DateTime.Now.Ticks}");

        self.Tell(message, tempActor);

        return tcs.Task.ContinueWith(task =>
        {
            context.Stop(tempActor);
            if(task.IsCanceled)
                throw new OperationCanceledException();
            if (task.IsFaulted)                    
                throw task.Exception.GetBaseException();
            return task.Result;
        });
    }

    internal class StreamerActorRef : ReceiveActor
    {
        readonly TaskCompletionSource<string> _tcs;

        private readonly StringBuilder _stringBuilder = new StringBuilder();

        public StreamerActorRef(TaskCompletionSource<string> tcs)
        {
            _tcs = tcs;
            Ready();
        }

        private void Ready()
        {
            ReceiveAny(message =>
            {
                switch (message)
                {
                    case StreamMessage.OnInit _:
                        Sender.Tell(new StreamMessage.OnInit.Ack());
                        break;
                    case StreamMessage.Completed _:
                        string result = _stringBuilder.ToString();
                        _tcs.TrySetResult(result);
                        break;
                    case string slice:
                        _stringBuilder.Append(slice);
                        Sender.Tell(new StreamMessage.OnInit.Ack());
                        break;
                    case StreamMessage.Failure error:
                        _tcs.TrySetException(new InvalidOperationException(error.Reason));
                        break;
                }
            });
        }
    }

With messages:

public class StreamMessage
{
        public class OnInit
        {
            public class Ack{}
        }

        public class Completed { }

        public class Failure
        {
            public string Reason { get; }

            public Failure(string reason)
            {
                Reason = reason;
            }
        }
    }

解决方案

In general sources and sinks working with actor refs have not been designed to work over remote connections - they don't cover message retries, which can cause deadlocks in your system if some stream control message won't be passed in.

The feature you're looking for is called StreamRefs (which works like actor refs, but for streams), and will be shipped as part of v1.4 release (see github pull request for more details).

这篇关于Akka.Net流和远程处理(Sink.ActorRefWithAck)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆