使用 Spring 集成实现 MongoDB 入站流 [英] Implementing a MongoDB Inbound Flow with Spring Integration

查看:24
本文介绍了使用 Spring 集成实现 MongoDB 入站流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们将有一个包含多个工作单元的 Mongo 集合.我的想法是文档将有一个包含四个选项的状态字段:未处理、正在处理、已完成、已失败.Spring Integration 将被配置为从此数据库读取并处理存储在那里的消息.

We will have a Mongo collection contain multiple units of work. My idea is that the document will have a status field with four options: UNPROCESSED, PROCESSING, DONE, FAILED. Spring Integration will be configured to read from this db and process the messages stored there.

入站 Mongo DSL 流将根据 UNPROCESSED 的值从集合中读取:

An inbound Mongo DSL flow will read from the collection based on a value of UNPROCESSED:

MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{'status' : 'UNPROCESSED'}"));
return IntegrationFlows.from(messageSource)...

问题是:如果我有几台工作机器从同一个数据库中读取数据,我想阻止它们在相同行的 UNPROCESSED 数据上进行操作,因为我的轮询器使用了 maxMessagesPerPoll 的保守值或消息处理需要一段时间.

Here's the problem: if I have a few worker machines reading from the same database I want to prevent them from operating on the same rows of UNPROCESSED data given my poller utilizes a conservative value for maxMessagesPerPoll or message processing takes a while.

似乎正确的方法是使用 TransactionSynchronizationFactory 定义一个 ProcessBeforeCommit 阶段以将状态更新为 PROCESSING,并定义一个 ProcessAfterCommit 阶段以将状态更新为 DONE 或 FAILED.但是,在查看用于轮询器和事务管理器的 API 时,我并不清楚添加此功能的机制.有 XML 中的示例,但我看不到使用 DSL 的示例.

It seems the right venue is to use TransactionSynchronizationFactory to define a ProcessBeforeCommit phase to update the status to PROCESSING, and a ProcessAfterCommit phase to update the status to DONE or FAILED. However, the mechanism to add this is not clear to me looking at the API for Pollers and TransactionManagers. There are examples in XML, but none that I can see utilizing DSL.

我还想确保 ProcessBeforeCommit 发生在读取数据库时而不是在处理之后......是吗?此外,如果这不是设计从 Mongo 集合读取的解决方案的最佳方式,请随时提出更好的架构建议.

I would also want to ensure that ProcessBeforeCommit happens at the time of data base read and not after processing... does it? Also, if this is not the optimal way to engineer a solution that reads from a Mongo collection please feel free to suggest a better architecture.

推荐答案

不,ProcessBeforeCommitProcessAfterCommit 是非常接近的回调.它们肯定会在您的流程结束时发生.让我们假设您有一个类似的方法:

No, ProcessBeforeCommit and ProcessAfterCommit are very close callbacks. They definitely happen in the end of your process. Let's consider you have a method like:

@Transactional
void foo() {}

当您调用这样的方法时,事务在进入方法主体之前开始.当我们在执行后退出方法体时,会执行 beforeCommit 回调.它可能会失败,因为在我们的过程中,外部连接(DB ?)可能会丢失.并且只有在没有问题的情况下,我们才继续执行 afterCommit.

When you call such a method, the transaction starts before entering a method body. When we exit a method body after its execution, the beforeCommit callback is performed. It may fail because during our process an external connection (DB ?) may be lost. And only if it is OK, we proceed to the afterCommit.

您的要求可以通过 AbstractMessageSourceAdvice 实现来完成:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-channels-section.html#conditional-pollers.因此,在 afterReceive() 实现中,您可以将文档更新为 PROCESSING,甚至决定返回 null 而不是消息:只是因为它在数据库中的状态已经是 PROCESSING.这样的Advice可以注入到PollerSpec中:

What you are asking can be done via AbstractMessageSourceAdvice implementation: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-channels-section.html#conditional-pollers. So, in the afterReceive() implementation you can update your document to the PROCESSING and even decided to return null instead of the message: just because its status in the DB is already PROCESSING. Such an Advice can be injected into the PollerSpec:

/**
 * Specify AOP {@link Advice}s for the {@code pollingTask}.
 * @param advice the {@link Advice}s to use.
 * @return the spec.
 */
public PollerSpec advice(Advice... advice) {

DONEFAILED 确实可以通过应用于 PollerSpecTransactionSynchronizationFactoryBean 来实现:

The DONE and FAILED really can be achieved via TransactionSynchronizationFactoryBean applied to the PollerSpec:

/**
 * Specify the {@link TransactionSynchronizationFactory} to attach a
 * {@link org.springframework.transaction.support.TransactionSynchronization}
 * to the transaction around {@code poll} operation.
 * @param transactionSynchronizationFactory the TransactionSynchronizationFactory to use.
 * @return the spec.
 */
public PollerSpec transactionSynchronizationFactory(

这篇关于使用 Spring 集成实现 MongoDB 入站流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆