在RabbitMQ中对接收到的消息进行分组,最好使用Spring AMQP? [英] Group received messages in RabbitMQ, preferably using Spring AMQP?

查看:140
本文介绍了在RabbitMQ中对接收到的消息进行分组,最好使用Spring AMQP?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在从服务(S)接收消息,该服务将每个单独的属性更改作为单独的消息发布到实体.一个人为的例子就是这样的实体:

I'm receiving messages from a service (S) that publishes each individual property change to an entity as a separate message. A contrived example would be an entity like this:

Person {
    id: 123
    name: "Something",
    address: {...}
}

如果名称和地址在同一事务中更新,则(S)将发布两条消息,PersonNameCorrectedPersonMoved.问题出在接收方,我正在存储此Person实体的投影,并且每个属性更改都会导致对数据库的写入.因此,在此示例中,将有两次写入数据库的操作,但是如果我可以在短时间内批处理消息并将其按ID分组,则只需要对数据库进行一次写入操作即可.

If name and address are updated in the same transaction then (S) will publish two messages, PersonNameCorrected and PersonMoved. The problem is on the receiving side where I'm storing a projection of this Person entity and each property change causes a write to the database. So in this example there would be two writes to the database but if I could batch messages for a short period of time and group them by id then I would only have to make a single write to the database.

在RabbitMQ中通常如何处理? Spring AMQP是否提供更简单的抽象?

How does one typically handle this in RabbitMQ? Does Spring AMQP provide an easier abstraction?

请注意,我已经简要查看了预取,但是我不确定是否这是要走的路.如果我正确理解,则预取也是基于每个连接的.我试图在每个队列的基础上实现这一目标,因为如果采用批处理(并因此增加了延迟)的方式,我不想将此延迟添加到我消耗的所有队列中服务(但仅适用于需要按ID分组"功能的用户).

Note that I have looked briefly at prefetch but I'm not sure if this is the way to go. Also prefetch, if I understand it correctly, is per connection basis. I'm trying to achieve this on a per-queue basis, because if batching (and thus added latency) is the way to go I wouldn't like to add this latency to ALL queues consumed by my service (but only to those that need the "group-by-id" features).

推荐答案

在这种情况下,预取无济于事.

Prefetch won't help for a case like this.

考虑使用 Spring集成,该适配器的适配器位于春季AMQP;它还提供了一个聚集器,可用于将消息分组在一起,然后再将其发送到管道的下一个阶段.

Consider using Spring Integration which has adapters that sit on top of Spring AMQP; it also provides an aggregrator which can be used to group messages together before sending them on to the next stage in the pipeline.

编辑

这是一个用于演示...的快速启动应用程序

Here's a quick boot app to demostrate...

@SpringBootApplication
public class So42969130Application implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(So42969130Application.class, args)
            .close();
    }

    @Autowired
    private RabbitTemplate template;

    @Autowired
    private Handler handler;

    @Override
    public void run(String... args) throws Exception {
        this.template.convertAndSend("so9130", new PersonNameChanged(123));
        this.template.convertAndSend("so9130", new PersonMoved(123));
        this.handler.latch.await(10, TimeUnit.SECONDS);
    }

    @Bean
    public IntegrationFlow flow(ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "so9130")
                        .messageConverter(converter()))
                .aggregate(a -> a
                        .correlationExpression("payload.id")
                        .releaseExpression("false") // open-ended release, timeout only
                        .sendPartialResultOnExpiry(true)
                        .groupTimeout(2000))
                .handle(handler())
                .get();
    }

    @Bean
    public Jackson2JsonMessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public Handler handler() {
        return new Handler();
    }

    @Bean
    public Queue queue() {
        return new Queue("so9130", false, false, true);
    }

    public static class Handler {

        private final CountDownLatch latch = new CountDownLatch(1);

        @ServiceActivator
        public void handle(Collection<?> aggregatedData) {
            System.out.println(aggregatedData);
            this.latch.countDown();
        }

    }

    public static class PersonNameChanged {

        private int id;

        PersonNameChanged() {
        }

        PersonNameChanged(int id) {
            this.id = id;
        }

        public int getId() {
            return this.id;
        }

        public void setId(int id) {
            this.id = id;
        }

        @Override
        public String toString() {
            return "PersonNameChanged [id=" + this.id + "]";
        }

    }

    public static class PersonMoved {

        private int id;

        PersonMoved() {
        }

        PersonMoved(int id) {
            this.id = id;
        }

        public int getId() {
            return this.id;
        }

        public void setId(int id) {
            this.id = id;
        }

        @Override
        public String toString() {
            return "PersonMoved [id=" + this.id + "]";
        }

    }

}

Pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>so42969130</artifactId>
    <version>2.0.0-BUILD-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>so42969130</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-java-dsl</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

结果:

2017-03-23 09:56:57.501  INFO 75217 --- [ask-scheduler-2] .s.i.a.AbstractCorrelatingMessageHandler : 
    Expiring MessageGroup with correlationKey[123]
[PersonNameChanged [id=123], PersonMoved [id=123]]

这篇关于在RabbitMQ中对接收到的消息进行分组,最好使用Spring AMQP?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆