骆驼RaabitMQ确认 [英] Camel RaabitMQ Acknowledgement

查看:338
本文介绍了骆驼RaabitMQ确认的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用骆驼我的消息应用程序。在我的使用情况下,我有一个制片人(这是RabbitMQ的在这里),而消费者是一个bean。

<$p$p><$c$c>from(\"rabbitmq://127.0.0.1:5672/exDemo?queue=testQueue&username=guest&password=guest&autoAck=false&durable=true&exchangeType=direct&autoDelete=false\")
                .throttle(100).timePeriodMillis(10000)
                .process(新处理器(){
                        @覆盖
                        公共无效过程(外汇兑换)抛出异常{
                            MyCustomConsumer.consume(exchange.getIn()。getBody())
                        }
                    });

显然,当AUTOA​​CK是假的,确认是当过程中()执行完毕发送(请纠正我,如果我错了这里)

现在我不想承认,当这个过程()执行完毕后,我想在以后阶段做。我有一个的BlockingQueue 在我MyCustomConsumer哪里消费()是把消息和MyCustomConsumer有不同的机制来处理它们。我想,只有当MyCustomConsumer完成从的BlockingQueue 处理信息确认消息。我怎样才能做到这一点?


解决方案

我碰到了同样的问题。

骆驼RabbitMQConsumer.RabbitConsumer实现执行

  consumer.getProcessor()过程(交换);长deliveryTag = envelope.getDeliveryTag();
如果(!consumer.endpoint.isAutoAck()){
  log.trace(确认收货[delivery_tag = {}],deliveryTag);
  channel.basicAck(deliveryTag,FALSE);
}

所以它只是期待一个同步的处理器。
如果您绑定此为实例SEDA路线,工艺方法立即返回,而你是pretty多少回AUTOA​​CK情况。

我的理解是,我们需要使我们自己的RabbitMQ组件做这样的事情。

  consumer.getAsyncProcessor()过程(交流,新AsynCallback(){
  公共无效完成(doneSync){
    如果(!consumer.endpoint.isAutoAck()){
      长deliveryTag = envelope.getDeliveryTag();
      log.trace(确认收货[delivery_tag = {}],deliveryTag);
      channel.basicAck(deliveryTag,FALSE);
    }
  }
});

即便如此,doneSync参数的语义是我也不清楚。我认为它只是一个标记,以确定我们是否正在处理一个真正的异步处理或同步处理时自动裹成一个异步的。

也许有人可以有效或无效该解决方案?

有一个打火机/更快/更强大的选择吗?

还是会这样建议作为RabbitMQConsumer?

默认的实现

I am using Camel for my messaging application. In my use case I have a producer (which is RabbitMQ here), and the Consumer is a bean.

from("rabbitmq://127.0.0.1:5672/exDemo?queue=testQueue&username=guest&password=guest&autoAck=false&durable=true&exchangeType=direct&autoDelete=false")
                .throttle(100).timePeriodMillis(10000)
                .process(new Processor() {                              
                        @Override
                        public void process(Exchange exchange) throws Exception {
                            MyCustomConsumer.consume(exchange.getIn().getBody())
                        }
                    });

Apparently, when autoAck is false, acknowledgement is sent when the process() execution is finished (please correct me if I am wrong here)

Now I don't want to acknowledge when the process() execution is finished, I want to do it at a later stage. I have a BlockingQueue in my MyCustomConsumer where consume() is putting messages, and MyCustomConsumer has different mechanism to process them. I want to acknowledge message only when MyCustomConsumer finishes processing messages from BlockingQueue. How can I achieve this?

解决方案

I bumped into the same issue.

The Camel RabbitMQConsumer.RabbitConsumer implementation does

consumer.getProcessor().process(exchange);

long deliveryTag = envelope.getDeliveryTag();
if (!consumer.endpoint.isAutoAck()) {
  log.trace("Acknowledging receipt [delivery_tag={}]", deliveryTag);
  channel.basicAck(deliveryTag, false);
}

So it's just expecting a synchronous processor. If you bind this to a seda route for instance, the process method returns immediately and you're pretty much back to the autoAck situation.

My understanding is that we need to make our own RabbitMQ component to do something like

consumer.getAsyncProcessor().process(exchange, new AsynCallback() {
  public void done(doneSync) {
    if (!consumer.endpoint.isAutoAck()) {
      long deliveryTag = envelope.getDeliveryTag();
      log.trace("Acknowledging receipt [delivery_tag={}]", deliveryTag);
      channel.basicAck(deliveryTag, false);
    }
  }
});

Even then, the semantics of the "doneSync" parameter is not clear to me. I think it's merely a marker to identify whether we're dealing with a real async processor or a synchronous processor that was automatically wrapped into an async one.

Maybe someone can validate or invalidate this solution?

Is there a lighter/faster/stronger alternative?

Or could this be suggested as the default implementation for the RabbitMQConsumer?

这篇关于骆驼RaabitMQ确认的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆