Akka ConsistentHashingRoutingLogic无法始终路由到同一调度程序线程 [英] Akka ConsistentHashingRoutingLogic not routing to the same dispatcher thread consistently

查看:109
本文介绍了Akka ConsistentHashingRoutingLogic无法始终路由到同一调度程序线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Akka的 ConsistentHashingRoutingLogic 来确保具有相同密钥的消息被路由到同一Actor。具有相同密钥的消息以FIFO顺序处理很重要。具有不同密钥的消息可以路由到不同的Actor,并可以自由地并行处理。我未在分布式模式下使用Akka。

I am trying to use Akka's ConsistentHashingRoutingLogic to guarantee that messages with the same key are routed to the same Actor. It is important that messages with the same key are processed in FIFO ordering. Messages with different keys can be routed to different Actors and processed in parallel freely. I am not using Akka in distributed mode.

消息实际上是从RabbitMQ代理读取的JSON消息,因此我的主演员收到AMQP消息并将路由键用作消息密钥。消息本身中也包含相同的密钥。 actor是Spring应用程序的一部分。

The messages are actually JSON messages being read from a RabbitMQ broker so my Master actor receives an AMQP message and uses the routing key as the message key. The same key is also in the message itself. The actor is part of a Spring application.

我的主演员看起来像这样:

My Master Actor looks like this:

@Named("MessageHandlerMaster")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class MessageHandlerMaster extends UntypedActor {

  private static final Logger log = LoggerFactory.getLogger(MessageHandlerMaster.class);

  private Router router;

  @Autowired
  public MessageHandlerMaster(final SpringProps springProps) {

  List<Routee> routees = Stream.generate(() -> {
      ActorRef worker = getContext().actorOf(springProps.create(MessageHandlerWorker.class));
      getContext().watch(worker);
      return new ActorRefRoutee(worker);
    }).limit(5) //todo: configurable number of workers
      .collect(Collectors.toList());

    router = new Router(new ConsistentHashingRoutingLogic(getContext().system()), routees);
  }

  public void onReceive(Object message) {
    if (message instanceof Message) {
      Message amqpMessage = (Message) message;
      String encoding = getMessageEncoding(amqpMessage);
      try {
        String json = new String(amqpMessage.getBody(), encoding);
        String routingKey = amqpMessage.getMessageProperties().getReceivedRoutingKey();
        log.debug("Routing message based on routing key " + routingKey);
        router.route(new ConsistentHashingRouter.ConsistentHashableEnvelope(json, routingKey), getSender());
      } catch (UnsupportedEncodingException e) {
        log.warn("Unknown content encoding sent in message! {}", encoding);
      }
    } else if (message instanceof Terminated) {
      //if one of the routee's died, remove it and replace it
      log.debug("Actor routee terminated!");
      router.removeRoutee(((Terminated) message).actor());
      ActorRef r = getContext().actorOf(Props.create(MessageHandlerWorker.class));
      getContext().watch(r);
      router = router.addRoutee(new ActorRefRoutee(r));
    }
  }

  private static String getMessageEncoding(Message message) {
    String encoding = message.getMessageProperties().getContentEncoding();
    if ((encoding == null) || (encoding.equals(""))) {
      encoding = "UTF-8";
    }
    return encoding;
  }
}

我最初是通过以下方式获得硕士学位的:

I am initially getting the master once by:

this.master = actorSystem.actorOf(springProps.create(MessageHandlerMaster.class), "master");

然后通过以下方式向其提交消息:

and then just submitting the messages to it by:

master.tell(message, ActorRef.noSender());

但是当我从工作人员的 onReceive()打印日志时, c $ c>我看到有时将不同的调度程序线程用于同一密钥。

But when I print the logs from my worker's onReceive() I see that different dispatcher threads are being used sometimes for the same key.

此外,也不清楚为什么有时有时将同一调度程序线程用于同一密钥。主演员和工人演员。这不应该是线程之间传递的异步消息吗?

Also it is not clear why sometimes the same dispatcher thread is being used for the Master actor and for a Worker actor. Shouldn't this be asynchronous message passing between threads?

16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.360 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186

正如您在此处看到的那样,Worker处理带有密钥的消息的调度程序线程10420186有时是9,有时是10。Master演员有时也使用这2个线程。

As you can see here, the dispatcher thread for the Worker processing message with key 10420186 was sometimes 9 and sometimes 10. The Master actor sometimes also used these 2 threads.

如何确定 ConsistentHashingRoutingLogic 确实在工作,并且同一线程使用相同的密钥处理消息?我在路由器初始化中做错什么了吗?

How can I be sure that the ConsistentHashingRoutingLogic is actually working and the same thread processes messages with the same key? Am I doing something wrong in my router initialisation?

推荐答案

所以@vrudkovsk的评论是正确的。我认为您在线程和参与者之间感到困惑。 Actor只是内存中具有地址和邮箱的对象。调度程序本质上是与actor执行操作的线程池。动作示例为:

So @vrudkovsk is right with his comment. I think you are getting confused between threads and actors. Actors are just objects in memory that have an address and a mailbox. Dispatchers are essentially thread pools that perform actions with the actor. Example actions are:


  • 从邮箱中取出邮件以在actor中对其进行处理

  • 排队邮件到邮箱。

不同的线程可以为同一个参与者执行动作。那是由调度员决定的。 Akka确保一次仅一个线程将在actor中处理消息。但这并不意味着它将始终是同一线程。

Different threads can perform actions for the same actor. That's decided by the dispatcher. Akka ensures that only one thread at a time will be processing a message within an actor. That does not mean it will always be the same thread.

如果要确保它们到达同一角色,则建议通过以下方式记录角色路径或地址使用 context.self.path context.self.path.address ,因为它们是同一<$中的唯一标识符c $ c> ActorSystem

If you want to ensure they are coming to the same actor, I would recommend logging the actor path or address by using context.self.path or context.self.path.address since those are unique identifiers within the same ActorSystem.

这篇关于Akka ConsistentHashingRoutingLogic无法始终路由到同一调度程序线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆