消息聚合的骆驼条件 [英] Camel condition on aggregate of messages

查看:24
本文介绍了消息聚合的骆驼条件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在寻找一种基于消息聚合有条件地处理消息的方法.我研究了很多方法来做到这一点,但似乎 Apache Camel 不支持它.我会解释这个场景,然后是我尝试过的解决方案.

I'm looking for a way to conditionally handle messages based on the aggregation of messages. I've looked into a lot of ways to do this, but it seems that Apache Camel doesn't support it. I'll explain the scenario and then the solutions I tried.

场景:我正在尝试有条件地清理目录.我每 x 天从目录中轮询一次并获取所有文件(file://...).我将其路由到一个聚合中,该聚合将文件聚合为一个大小(directorySize).然后我检查这个大小是否超过某个阈值.

Scenario: I'm trying to conditionally clean a directory. I poll from the directory every x days and fetch all the files (file://...). I route this into an aggregation, that aggregates the files into a single size (directorySize). I then check if this size passes a certain threshold.

问题就在这里.如果此条件通过,我现在想删除某些文件,但我无法再访问原始消息,因为它们已在新交换中聚合.

Here is where the problem lies. I now want to remove certain files if this condition passes, but I don't have access to the original messages anymore because they were aggregated in a new exchange.

解决方案:

  • 我尝试再次获取文件以处理它们.问题是,据我所知,你不能让消费者按需获取.我尝试使用 pollEnrich,但这只会获取一个文件,而不是目录中的所有文件.
  • 我尝试过滤/停止父路由.这里的问题是 filter()/choice...stop()/end() 只会停止带有目录大小的聚合路由,而不是带有文件消息的父路由.我无法有条件地处理这些.
  • 我尝试将聚合条件移动到我将调用的另一条路由,但这会导致与第一个解决方案相同的问题.

我考虑做的事情:

  • 重写聚合策略,不仅将大小聚合,还将文件本身聚合到 groupedExchange 中.这样我就可以在检查后再次拆分聚合.我真的不喜欢这个解决方案,因为它会导致很多样板,无论是在代码中还是在运行时.
  • 将文件大小计算器移至处理器而不是聚合器.这首先会破坏使用骆驼的目的..我会手动获取文件并添加大小..对于每个文件..
  • 使用 ControlBus 动态启动该目录上的删除路由.再一次,很多变通方法可以实现我认为应该可以通过简单的方式完成的事情.
  • 我想为每条父消息设置计算大小,但我不知道如何实现?
  • 另一种停止父路由的方法是我没有想到的?

我有点震惊,您无法根据这些消息的聚合来优雅地过滤消息.我在 Camel 中遗漏了什么可以提供优雅的解决方案吗?或者这是一个最不坏的解决方案?

I'm a bit stunned that you can't elegantly filter messages based on the aggregation of these messages. Is there something that I missed in Camel that would provide an elegant solution? Or is this a case of the least bad solution?

消息(文件)

Message(File) --> AggregatedMessage(directorySize) --> 删除某些文件?

Message(File) --> AggregatedMessage(directorySize) --> delete certain Files?

消息(文件)

推荐答案

Camel 确实很棒,但有时确实很难确切地知道要使用哪种设计模式 ;)

Camel is really awesome, but sometimes it's sure difficult to see exactly which design pattern to use ;)

首先,您需要保留文件对象的副本,因为在达到阈值之前您不知道是否删除它们 - 基本上(至少)有两种方法可以做到这一点.

Firstly, you need to keep a copy of the file objects, because you don't know whether to delete them or not until you reach your threshold - there are basically (at least) two ways to do this.

备选方案 1

第一种方法是在交换属性中使用 List.无论您如何处理交换主体,此属性都会存在.如果您查看 GroupedExchangeAggregationStrategy 的源代码,它就是这样做的:

The first way is to use a List in an exchange property. This property will hang around no matter what you do with the exchange body. If you have a look at the source code for GroupedExchangeAggregationStrategy, it does precisely this:

        list = new ArrayList<Exchange>();
        answer.setProperty(Exchange.GROUPED_EXCHANGE, list);
        // ...
        list.add(newExchange);

或者您可以在您自己的交换属性上手动执行相同的操作.在任何情况下,都可以像您一样使用分组聚合策略.

Or you could do the same thing manually on your own exchange property. In any case, it's completely fine to use the Grouped aggregation strategy as you have done.

备选方案 2

保留"旧消息的第二种方法是将副本发送到已停止的 SEDA 队列.所以你会做to("seda:xyz").您将此队列定义为 .noAutoStartup().然后您可以向它发送消息,它们将在由骆驼管理的内部队列中排队.当您想要处理消息时,您只需通过控制总线启动它,然后再将其停止.

The second way to "keep" old messages is to send a copy to a stopped SEDA queue. So you would do to("seda:xyz"). You define this queue as .noAutoStartup(). Then you can send messages to it and they will queue up on an internal queue, managed by camel. When you want to process the messages, you simply start it up via controlbus and stop it again afterwards.

通常,除非绝对必要,否则应该避免搞乱启动和停止队列,但这当然是另一种方法

Generally, messing around with starting and stopping queues should be avoided unless absolutely necessary, but that's certainly another way to do it

建议的解决方案

我建议你照你做的做(即备选方案 1):

I suggest you do as you have done (i.e. alternative 1):

  • aggregate 通过 GroupedExchangeAggregationStrategy 将单个文件保存在列表中
  • 计算总文件大小(使用处理器,或使用自定义聚合策略进行计算)
  • 使用 filter(simple("${body} <123"))
  • 通过splitter(simple("${property.CamelGroupedExchange}"))展开"聚合
  • 一一删除你的文件
  • aggregate via GroupedExchangeAggregationStrategy to keep the individual files in a list
  • Compute the total file size (use a processor, or do it along the way with a custom aggregation strategy)
  • Use a filter(simple("${body} < 123"))
  • "Unwind" your aggregation via a splitter(simple("${property.CamelGroupedExchange}"))
  • Delete your files one by one

请告诉我这是否有意义,或者我是否以任何方式误解了您的问题.

Please let me know if this doesn'y makes sense, or if I have misunderstood your problem in any way.

这篇关于消息聚合的骆驼条件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆