用Camel RoutePolicy设置sqs的高优先级/低优先级路由使用者的简便方法? [英] Elegant way to setup high-priority/low-priority route consumers of sqs with Camel RoutePolicy?

查看:96
本文介绍了用Camel RoutePolicy设置sqs的高优先级/低优先级路由使用者的简便方法?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个SQS队列:一个用于低优先级,另一个用于高优先级消息.逻辑是不要触摸低优先级队列上的消息,除非高优先级队列为空.

I have two SQS queues: one meant for low priority and another meant for high-priority messages. The logic is meant to be don't touch the messages on the low-priority queue unless the high-priority queue is empty.

SuspendLowPriorityRoutePolicy suspendLowPriorityRoutePolicy = new SuspendLowPriorityRoutePolicy(LOW_PRIORITY_ROUTE_ID, camelContext);

        from(UriBuilder.buildSqsUri(sqsProperties)).routeId(LOW_PRIORITY_ROUTE_ID)
                .log(LoggingLevel.INFO, log, "NON-PRIORITY: ${body}");

        from(UriBuilder.buildPrioritySqsUri(sqsProperties)).routeId(HIGH_PRIORITY_ROUTE_ID)
                .routePolicy(suspendLowPriorityRoutePolicy)
                .log(LoggingLevel.INFO, log, "PRIORITY: ${body}");

这两个使用者的 concurrentConsumers 属性均设置为1,这意味着他们将一次处理一条消息.

Both of these consumers have their concurrentConsumers property set to 1, meaning they will process a single message at a time.

现在,我设置了这两个路由,以同时消耗队列中的消息.我想要的是进入高优先级路由的消息会触发低优先级路由的停止.为了尝试获得此功能,我尝试使用一种路由策略,当在高优先级路由上启动新的交换时,该策略将停止低优先级队列:

Right now, I have these two routes setup to consume messages off the queues simultaneously. What I would like is for a message that comes in to the high-priority route triggers the cessation of the low-priority route. To try to get this functionality, I tried using a route policy that stops the low-priority queue when a new exchange is started on the high-priority route:

(来自 SuspendLowPriorityRoutePolicy 的摘录)

 @Override
    public void onExchangeBegin(Route route, Exchange exchange) {
        Route lowPriorityRoute = context.getRoute(lowPriorityRouteId);
        ServiceStatus routeStatus = context.getRouteStatus(lowPriorityRouteId);
        if (!routeStatus.isStopped()) {
            try {
                lock.lock();
                log.info("High priority request came in, stopping consumer");
                stopConsumer(lowPriorityRoute.getConsumer());
            } catch (Exception e) {
                log.error("Exception stopping consumer " + e);
                handleException(e);
            } finally {
                lock.unlock();
            }
        }
    }

但是,我不确定如何重新启动低优先级的消费者.Camel RoutePolicy 提供的其他钩子允许覆盖 onExchangeDone ,但是在那时,逻辑应该是仅在高优先级队列为空时重新启动低优先级使用者.我认为没有办法检查队列是否为空,我们可以检查交换完成挂钩上的 roximatenumberofmess 属性,但这可能不准确.

However, I am unsure about how to go about re-starting the low-priority consumer. The other hooks provided by the Camel RoutePolicy allow overriding onExchangeDone but at that point the logic should be to restart the low-priority consumer only if the high-priority queues is empty. I don't think there is a way to check if the queue is empty, we could check the ApproximateNumberOfMessages attribute on the exchange done hook but that may be inaccurate.

另一种想法是要有一个调度的后台轮询器,该轮询器检查 CamelContext 的inFlightRequestsRepository并仅在高优先级路由没有运行中请求时重新启动低优先级队列.

Another thought would to have a scheduled background poller that checks the inFlightRequestsRepository of the CamelContext and restarts the the low priority queues only if the high priority route has no inflight requests.

推荐答案

好吧,我能想到的最简单的方法是,使用比低优先级消息更多的消费者来消费高优先级消息强>.这样,高优先级的消息通常会很快消耗掉,而低优先级的消息可能会堆积起来,因此必须等待.

Well, the most simple way I can think of, is, to consume the high priority messages with much more consumers than the low priority messages. That way, a high priority message is typically consumed very fast, while low priority messages could pile up and therefore have to wait.

但是,无论同时处理了多少高优先级消息,低优先级使用者仍将处理消息.

However, the low priority consumers will still process messages, no matter how much more high priority messages are processed at the same time.

更接近您的用例的是使用 JMS优先级 .

A bit closer to your use case is to use JMS priorities.

您可以以相同的方式使用两个队列,只需在每个队列的消息上设置不同的JMS优先级.例如,低优先级消息的标准优先级为4,高优先级消息的标准优先级为8.

You could consume both queues the same way and simply set a different JMS priority on the message of each queue. For example standard priority of 4 for low-priority messages and a priority of 8 for high-priority messages.

然后,您将两个使用者的消息转发到第三个队列.因此,在这第三个队列中,高优先级消息和低优先级消息混合到达,但是每个消息都设置了适当的优先级.

Then you forward the messages of both consumers to a third queue. So in this third queue the high and low priority messages arrive mixed, but each message with the appropriate priority set.

然后,您将使用者附加到第三个队列,并且由于消息优先级高,应先使用高优先级消息.仅当队列不包含高优先级消息时才使用低优先级消息.

Then you attach a consumer to the third queue and due to the message priorities, the high priority messages should be consumed first. The low priority messages are only consumed if the queue contains no high priority messages.

但是,请记住以下几点:

However, there are some things to keep in mind:

  • 使用邮件优先级根据需要对队列中的邮件重新排序.这会减慢包含大量消息的队列的性能.
  • 消息优先级可能必须在您的代理或队列上激活.否则,优先级将被忽略.与您的经纪人一起搜索优先级示例以进行检查.
  • 我不确定,但是我认为JMS连接上还存在一些标志,以尊重用户的优先级
  • 您必须保持消费者的预取偏低.如果您的消费者以低优先级预取了1000条消息,则在执行其他任何操作之前先对其进行处理.无论在此期间是否有高优先级的消息到达队列.预取越大,在处理新到达的高优先级消息之前,低优先级消息可以处理的越多.
  • Using message priority reorders messages in the queue as needed. This can slow down the performance of a queue with a lot of messages.
  • Message priority must probably be activated on your broker or queue. Otherwise the priorities are ignored. Search for a priority example with your broker to check this.
  • I am not sure, but I think there are also some flags on the JMS connection to respect priorities on the consumer
  • You have to keep the consumer prefetch low. If your consumer prefetches 1000 messages with low-priority it processes them all before doing anything else. No matter if in the meantime a high priority message arrives in the queue. The bigger the prefetch the more low prio messages could be processed before a newly arrived high prio message is processed.

这篇关于用Camel RoutePolicy设置sqs的高优先级/低优先级路由使用者的简便方法?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆