Spring Cloud Stream 中的事务 [英] Transaction in Spring cloud Stream

查看:71
本文介绍了Spring Cloud Stream 中的事务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

问题:我正在尝试逐行读取一个大文件并将消息放入 RabbitMQ.我想在文件末尾提交rabbitMQ.如果文件中有任何记录是坏的,那么我想撤消发布到队列中的消息.

Problem: I am trying to read a big file line by line and putting the message in a RabbitMQ. I want to commit to rabbitMQ at the end of the file. If any record in the file is bad, then I want to revoke the messages published to the queue.

技术:弹簧靴,春云流,兔MQ

Technologies: Spring boot, Spring cloud stream, RabbitMQ

你能帮我实现这个过渡的东西吗?我知道如何使用 Spring Cloud 流读取文件并发布到队列.

Could you please help me in implementing this transition stuff. I know how to read a file and publish to a queue using spring cloud stream.

  @Transactional
  public void sendToQueue(List<Data> dataList) {

      for(Data data:dataList)
      {
          this.output.send(MessageBuilder.withPayload(data).build());
          counter++; // I can see message getting published in the queue though management plugin
      }
      LOGGER.debug("message sent to Q2");

  }

这是我的配置:

spring: 
   cloud:    
      stream:
        bindings:
           # Q1 input channel
           tpi_q1_input:
            destination: TPI_Q1
            binder: local_rabbit
            content-type: application/json
            group: TPIService
            # Q2 output channel  
           tpi_q2_output:
            destination: TPI_Q2
            binder: local_rabbit
            content-type: application/json
            group: TPIService
            # Q2 input channel
           tpi_q2_input:
            destination: TPI_Q2
            binder: local_rabbit
            content-type: application/json
            group: TPIService     
        binders:
          local_rabbit:
            type: rabbit
            environment:
              spring:
                rabbitmq:
                  host: localhost
                  port: 5672
                  username: guest
                  password: guest
                  virtual-host: /
          rabbit:
            bindings:
                  tpi_q2_output:
                    producer:
                          #autoBindDlq: true
                          transacted: true
                          #batchingEnabled: true
                  tpi_q2_input:  
                   consumer:
                        acknowledgeMode: AUTO
                        #autoBindDlq: true
                        #recoveryInterval: 5000
                        transacted: true       

spring.cloud.stream.default-binder: local_rabbit

Java 配置

@EnableTransactionManagement
public class QueueConfig {

  @Bean
  public RabbitTransactionManager transactionManager(ConnectionFactory cf) {
    return new RabbitTransactionManager(cf);
  }
}

接收方

@StreamListener(JmsQueueConstants.QUEUE_2_INPUT)
  @Transactional
  public void receiveMesssage(Data data) {

    logger.info("Message Received in Q2:");
  }

推荐答案

  1. 配置生产者使用事务...producer.transacted=true

在事务范围内发布消息(使用 RabbitTransactionManager).

Publish the messages within the scope of a transaction (using the RabbitTransactionManager).

对 #2 使用普通的 Spring 事务机制(@Transacted 注释或 TransactionTemplate).

Use normal Spring transaction mechanisms for #2 (@Transacted annotation or a TransactionTemplate).

如果正常退出事务会提交,如果抛出异常则回滚.

The transaction will commit if you exit normally, or roll back if you throw an exception.

编辑

示例:

@SpringBootApplication
@EnableBinding(Source.class)
@EnableTransactionManagement
public class So50372319Application {

    public static void main(String[] args) {
        SpringApplication.run(So50372319Application.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(MessageChannel output, RabbitTemplate template, AmqpAdmin admin,
            TransactionalSender sender) {
        admin.deleteQueue("so50372319.group");
        admin.declareQueue(new Queue("so50372319.group"));
        admin.declareBinding(new Binding("so50372319.group", DestinationType.QUEUE, "output", "#", null));
        return args -> {
            sender.send("foo", "bar");
            System.out.println("Received: " + new String(template.receive("so50372319.group", 10_000).getBody()));
            System.out.println("Received: " + new String(template.receive("so50372319.group", 10_000).getBody()));
            try {
                sender.send("baz", "qux");
            }
            catch (RuntimeException e) {
                System.out.println(e.getMessage());
            }
            System.out.println("Received: " + template.receive("so50372319.group", 3_000));
        };
    }

    @Bean
    public RabbitTransactionManager transactionManager(ConnectionFactory cf) {
        return new RabbitTransactionManager(cf);
    }

}

@Component
class TransactionalSender {

    private final MessageChannel output;

    public TransactionalSender(MessageChannel output) {
        this.output = output;
    }

    @Transactional
    public void send(String... data) {
        for (String datum : data) {
            this.output.send(new GenericMessage<>(datum));
            if ("qux".equals(datum)) {
                throw new RuntimeException("fail");
            }
        }
    }

}

spring.cloud.stream.bindings.output.destination=output
spring.cloud.stream.rabbit.bindings.output.producer.transacted=true

Received: foo
Received: bar
fail
Received: null

这篇关于Spring Cloud Stream 中的事务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆