Spring Cloud 在生产者端流式传输 kafka 事务 [英] Spring cloud stream kafka transactions in producer side

查看:63
本文介绍了Spring Cloud 在生产者端流式传输 kafka 事务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们有一个使用 Kafka 的 Spring Cloud 流应用程序.要求是在生产者端,消息列表需要放在事务的主题中.同一个应用程序中的消息没有消费者.当我使用 spring.cloud.stream.kafka.binder.transaction.transaction-id 前缀启动事务时,我面临的错误是调度程序没有订阅者,并且从主题获得的分区总数小于交易配置.应用程序无法在事务模式下获取主题的分区.你能告诉我是否遗漏了什么.明天我会发布详细的日志.

谢谢

解决方案

您需要展示您的代码和配置以及您正在使用的版本.

仅限生产者的交易是 在文档中讨论.

<块引用>

通过将 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix 设置为非空值来启用事务,例如tx-.在处理器应用程序中使用时,消费者开始事务;在消费者线程上发送的任何记录都参与同一个事务.当监听器正常退出时,监听器容器会将偏移量发送到事务并提交.通用生产者工厂用于所有使用 spring.cloud.stream.kafka.binder.transaction.producer.* 属性配置的生产者绑定;单独绑定的 Kafka 生产者属性将被忽略.

如果您希望在源应用程序中使用事务,或者从某个任意线程中使用仅生产者事务(例如@Scheduled 方法),您必须获得对事务性生产者工厂的引用并使用它定义一个 KafkaTransactionManager bean.

@Beanpublic PlatformTransactionManager transactionManager(BinderFactory binders) {ProducerFactorypf = ((KafkaMessageChannelBinder) binders.getBinder(null,MessageChannel.class)).getTransactionalProducerFactory();返回新的 KafkaTransactionManager<>(pf);}

<块引用>

请注意,我们使用 BinderFactory 获得了对绑定器的引用;当只配置了一个活页夹时,在第一个参数中使用 null.如果配置了多个活页夹,请使用活页夹名称获取引用.一旦我们获得了对绑定器的引用,我们就可以获得对 ProducerFactory 的引用并创建一个事务管理器.

然后您就可以使用普通的 Spring 事务支持,例如TransactionTemplate 或@Transactional,例如:

public static class Sender {@交易public void doInTransaction(MessageChannel output, List stuffToSend) {stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));}}

如果您希望将仅生产者的事务与来自其他事务管理器的事务同步,请使用 ChainedTransactionManager.

We have a spring cloud stream app using Kafka. The requirement is that on the producer side the list of messages needs to be put in a topic in a transaction. There is no consumer for the messages in the same app. When i initiated the transaction using spring.cloud.stream.kafka.binder.transaction.transaction-id prefix, I am facing the error that there is no subscriber for the dispatcher and a total number of partitions obtained from the topic is less than the transaction configured. The app is not able to obtain the partitions for the topic in transaction mode. Could you please tell if I am missing anything. I will post detailed logs tomorrow.

Thanks

解决方案

You need to show your code and configuration as well as the versions you are using.

Producer-only transactions are discussed in the documentation.

Enable transactions by setting spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix to a non-empty value, e.g. tx-. When used in a processor application, the consumer starts the transaction; any records sent on the consumer thread participate in the same transaction. When the listener exits normally, the listener container will send the offset to the transaction and commit it. A common producer factory is used for all producer bindings configured using spring.cloud.stream.kafka.binder.transaction.producer.* properties; individual binding Kafka producer properties are ignored.

If you wish to use transactions in a source application, or from some arbitrary thread for producer-only transaction (e.g. @Scheduled method), you must get a reference to the transactional producer factory and define a KafkaTransactionManager bean using it.

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders) {
    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
            MessageChannel.class)).getTransactionalProducerFactory();
    return new KafkaTransactionManager<>(pf);
}

Notice that we get a reference to the binder using the BinderFactory; use null in the first argument when there is only one binder configured. If more than one binder is configured, use the binder name to get the reference. Once we have a reference to the binder, we can obtain a reference to the ProducerFactory and create a transaction manager.

Then you would just normal Spring transaction support, e.g. TransactionTemplate or @Transactional, for example:

public static class Sender {

    @Transactional
    public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
        stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
    }

}

If you wish to synchronize producer-only transactions with those from some other transaction manager, use a ChainedTransactionManager.

这篇关于Spring Cloud 在生产者端流式传输 kafka 事务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆