正确的设计在akka。 -讯息传递 [英] Right design in akka. - Message delivery

查看:60
本文介绍了正确的设计在akka。 -讯息传递的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经浏览了一些关于akka不保证消息传递方式和原因的文章。 文档,此讨论和小组的其他讨论都很好地解释了这一点。

I have gone through some posts on how and why akka does not guarantee message delivery. The documentation, this discussion and the other discussions on group do explain it well.

我对akka来说还很陌生,希望知道它的合适设计。例如说我在不同的机器上有3个不同的角色。一个负责烹饪书,另一个负责历史书籍,最后一个负责技术书籍。

I am pretty new to akka and wish to know the appropriate design for a case. For example say I have 3 different actors all on different machines. One is responsible for cookbooks, the other for history and the last for technology books.

我在另一台机器上有一个主要演员。假设对主角有一个查询,以搜索是否有可用的书。主要参与者将请求发送到3个远程参与者,并期望结果。所以我这样做:

I have a main actor on another machine. Suppose there is a query to the main-actor to search if we have some book available. The main actor sends requests to the 3 remote actors, and expects the result. So I do this:

  val scatter = system.actorOf(
        Props[SearchActor].withRouter(ScatterGatherFirstCompletedRouter(
              routees=someRoutees, within = 10 seconds)), "router")
  implicit val timeout = Timeout(10 seconds)
  val futureResult = scatter ?  Text("Concurrency in Practice")
  //      What should I do here?.
  //val result = Await.result(futureResult, timeout.duration) line(a)



<简而言之,我已经向所有3个远程参与者发送了请求,并期望在10秒内得到结果。

In short, I have sent requests to all 3 remote actors and expect the result in 10 seconds.

应该采取什么措施?


  1. 说我没有得到结果在10秒内,我应该再次向所有其他人发送新请求吗?

  2. 如果在上述时间内 为时过早该怎么办?但是我不知道会花多少时间。

  3. 如果时间之内足够,但是消息得到了,该怎么办呢?

  1. Say I do not get the result in 10 seconds, should I send a new request to all of them again?
  2. What if within time above is premature. But I do not know pre-hand on how much time it might take.
  3. What if within time was sufficient but the message got dropped.

如果我没有在之内得到答复并重新发送请求再次。像这样的东西,仍然保持异步:

If i dont get response in within time and resend the request again. Something like this, it remain asynchronous:

futureResult onComplete{
  case Success(i) => println("Result "+i)
  case Failure(e) => //send again
}

但是在查询过多的情况下,线程是否会过多在通话和笨重?如果取消注释 line(a),它会变得同步并且在负载下的性能可能会很差。

But under too many queries, wont it be too many threads on the call and bulky? If I uncomment line(a), it becomes synchronous and under load might perform badly.

说我在10秒钟内没有得到回应。如果时间内还为时过早,那么它将再次发生大量无用的计算。如果消息丢失了,那么将浪费 10 秒钟的宝贵时间。如果说我知道消息已经传递,我可能会等待更长的时间而不会怀疑。

Say I dont get response in 10 seconds. If within time was premature, then its a heavy useless computation happening again. If messsage got dropped, then 10 seconds of valuable time wasted. In case, say I knew that the message got delivered, I would probably wait for longer duration without being skeptical.

人们如何解决此类问题问题? ACK ?但是然后我必须将状态存储在所有查询的参与者中。

How do people solve such issues? ACK? But then I have to store the state in actor of all queries. It must be a common thing and I am looking for right design.

推荐答案

我将尝试回答其中的一些问题给你的问题。我不会为所有问题提供具体的答案,但是希望我能指导您正确的方向。

I'm going to try and answer some of these questions for you. I'm not going to have concrete answers for everything, but hopefully I can guide you in the right direction.

对于初学者来说,您将需要改变方式您正在将请求传达给进行图书搜索的3个参与者。在这里使用 ScatterGatherFirstCompletedRouter 可能不是正确的方法。该路由器将仅等待其中一个路由的响应(第一个响应),因此您的结果集将不完整,因为它将不包含其他2条路由的结果。还有一个 BroadcastRouter ,但这也不能满足您的需要,因为它只能处理 tell(!)而不能处理问(?)。要执行您想做的事情,一种选择是将请求发送给每个接收者,获取期货作为响应,然后将它们组合为总计未来,使用 Future.sequence 。一个简化的示例如下所示:

For starters, you will need to make a change in how you are communicating the request to the 3 actors that do book searches. Using a ScatterGatherFirstCompletedRouter is probably not the correct approach here. This router will only wait for an answer from one of the routees (the first one to respond), so your set of results will be incomplete as it will not contain results from the other 2 routees. There is also a BroadcastRouter, but that will not fit your needs either as it only handles tell (!) and not ask (?). To do what you want to do, one option is to send the request to each receipient, getting Futures for the responses and then combine them into an aggregate Future using Future.sequence. A simplified example could look like this:

case class SearchBooks(title:String)
case class Book(id:Long, title:String)

class BookSearcher extends Actor{

  def receive = {
    case req:SearchBooks =>
      val routees:List[ActorRef] = ...//Lookup routees here
      implicit val timeout = Timeout(10 seconds)
      implicit val ec = context.system.dispatcher

      val futures = routees.map(routee => (routee ? req).mapTo[List[Book]])
      val fut = Future.sequence(futures)

      val caller = sender //Important to not close over sender
      fut onComplete{
        case Success(books) => caller ! books.flatten

        case Failure(ex) => caller ! Status.Failure(ex)
      }
  }
}

现在,这不是我们的最终代码,而是您的样本正在尝试执行的操作的近似值。在此示例中,如果任何下游路由失败/超时,我们将击中 Failure 块,而调用方也将失败。如果它们全部成功,则调用者将获得 Book 对象的汇总列表。

Now that's not going to be our final code, but it's an approximation of what your sample was attempting to do. In this example, if any one of the downstream routees fails/times out, we will hit our Failure block, and the caller will also get a failure. If they all succeed, the caller will get the aggregate List of Book objects instead.

现在进入您的问题。首先,您询问是否在超时时间内没有从其中一个路由得到答复,是否应该再次向所有参与者发送请求。这个问题的答案确实取决于您。您是要让另一端的用户看到部分结果(即3个参与者中2个参与者的结果),还是每次都必须始终将其作为全部结果?如果答案是肯定的,则可以调整发送到路线的代码,如下所示:

Now onto your questions. First, you ask if you should send a request to all of the actors again if you do not get an answer from one of the routees within the timeout. The answer to this question really up to you. Would you allow your user on the other end to see a partial result (i.e. the results from 2 of the 3 actors), or does it always have to be the full set of results every time? If the answer is yes, you could tweak the code that is sending to the routees to look like this:

val futures = routees.map(routee => (routee ? req).mapTo[List[Book]].recover{
  case ex =>
    //probably log something here
    List()
})

使用此代码,如果任何路由超时或由于任何原因失败,一个空白的 Book列表将代替响应而不是失败。现在,如果您无法获得部分结果,则可以再次发送整个请求,但您必须记住,另一端可能有人在等待他们的书本结果,他们不想永远等待。

With this code, if any of the routees timesout or fails for any reason, an empty list of 'Book` will be substituted in for the response instead of the failure. Now, if you can't live with partial results, then you could resend the entire request again, but you have to remember that there is probably someone on the other end waiting for their book results and they don't want to wait forever.

对于第二个问题,您问超时是否过早怎么办?您选择的超时值将完全取决于您,但是最有可能应该基于两个因素。第一个因素将来自测试搜索的通话时间。为了安全起见,请平均找出需要多长时间,并根据该值选择一个值,并稍加缓冲。第二个因素是另一端的某人愿意等待结果的时间。您可以在超时方面非常保守,将其设置为60秒只是为了安全起见,但是如果另一端确实有人在等待结果,他们愿意等待多长时间?我希望收到失败响应,指示我应该重试而不是永远等待。因此,考虑到这两个因素,您应该选择一个值,该值将使您有很高的百分比获得响应,同时又不会使另一端的调用者等待太久。

For your second question, you ask if what if your timeout is premature? The timeout value you select is going to be completely up to you, but it most likely should be based on two factors. The first factor will come from testing the call times of the searches. Find out on average how long it takes and select a value based on that with a little cushion just to be safe. The second factor is how long someone on the other end is willing to wait for their results. You could just be very conservative in your timeout, making it like 60 seconds just to be safe, but if there is indeed someone on the other end waiting for results, how long are they willing to wait? I'd rather get a failure response indicating that I should try again instead of waiting forever. So taking those two factors into account, you should select a value that will allow you to get responses a very high percentage of the time while still not making the caller on the other end wait too long.

对于问题3,您问如果删除该消息会发生什么。在这种情况下,我猜想接收该消息的人的未来只会超时,因为它不会得到响应,因为接收方参与者将永远不会收到要响应的消息。 Akka不是JMS;它没有确认模式,如果收件人没有收到并确认该消息,则可以多次重发该消息。

For question 3, you ask what happens if the message gets dropped. In this case I'm guessing that the future for whoever was to receive that message will just timeout because it will not get a response because the recipient actor will never receive a message to respond to. Akka is not JMS; it doesn't have acknowledgement modes where a message can be resent a number of times if the recipient does not receive and ack the message.

此外,如您所见在我的示例中,我同意不使用 Await 阻止总计 Future 。我更喜欢使用非阻塞回调。阻塞接收功能并不理想,因为 Actor 实例将停止处理其邮箱,直到阻塞操作完成。通过使用非阻塞回调,您可以释放该实例以返回到处理其邮箱的权限,并允许结果的处理只是在 ExecutionContext

Also, as you can see from my example, I agree with not blocking on the aggregate Future by using Await. I prefer using the non-blocking callbacks. Blocking in a receive function is not ideal as that Actor instance will stop processing its mailbox until that blocking operation completes. By using a non-blocking callback, you free that instance up to go back to processing its mailbox and allow the handling of the result to be just another job that is executed in the ExecutionContext, decoupled from the actor processing its mailbox.

现在,如果您真的想在网络不可靠时不浪费通信,可以查看可靠代理在Akka 2.2中可用。如果您不想走这条路线,可以通过定期向该路线发送 ping 类型消息来进行滚动。如果没有及时响应,则将其标记为已关闭并且不向其发送消息,直到您能够在短时间内获得可靠的 ping 它,就像每个路由的FSM。如果您绝对需要这种行为,那么这两种方法都可以使用,但是您需要记住,这些解决方案会增加复杂性,并且仅在您绝对需要这种行为时才应采用。如果您正在开发银行软件,并且绝对需要保证交付的语义,否则将导致严重的财务隐患,因此请务必采用这种方法。明智地决定您是否需要这样的东西,因为我敢打赌90%的时间都不需要。在您的模型中,等待您可能已经知道的某件事可能不会成功的唯一人是另一端的呼叫者。通过在actor中使用非阻塞回调,它不会因为某些事情可能会花费很长时间而停止;它已经移至下一封邮件。如果您决定在失败时重新提交,则也需要小心。您不想泛滥接收方的邮箱。如果您决定重新发送,则将其设置为固定的次数。

Now if you really want to not waste communications when the network is not reliable, you could look into the Reliable Proxy available in Akka 2.2. If you don't want to go this route, you could roll it yourself by sending ping type messages to the routees periodically. If one does not respond in time, you mark it as down and do not send messages to it until you can get a reliable (in a very short amount of time) ping from it, sort of like a FSM per routee. Either of these can work if you absolutely need this behavior, but you need to remember that these solutions add complexity and should only be employed if you absolutely need this behavior. If you're developing bank software and you absolutely need guaranteed delivery semantics as bad financial implications will result otherwise, by all means go with this kind of approach. Just be judicious in deciding if you need something like this because I bet 90% of the time you don't. In your model, the only person probably affected by waiting on something that you might have already known won't be successful is the caller on the other end. By using non-blocking callbacks in the actor, it's not being halted by the fact that something might take a long time; it's already moved in to its next message. You also do need to be careful if you decide to resubmit on failure. You don't want to flood the receiving actors mailboxes. If you decide to resend, cap it at a fixed number of times.

如果需要这些保证的语义,另一种可能的方法是研究Akka的集群模型。如果将下游路由群集在一起,并且其中一台服务器发生故障,则所有流量都将被路由到仍处于运行状态的节点,直到该其他节点恢复为止。

One other possible approach if you need these guaranteed kind of semantics might be to look into Akka's Clustering Model. If you clustered the downstream routees, and one of the servers was failing, then all traffic would be routed to the node that was still up until that other node recovered.

这篇关于正确的设计在akka。 -讯息传递的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆