Reactive Kafka:使用事务处理一次 [英] Reactive Kafka : Exactly Once Processing with Transaction
问题描述
最初通过 api 调用触发
Initially triggered via api call
1. Service A produces m1 to topic1 (non transactional send)
2. Service B consumes topic1 and does some processing
(begin tx)
3. Service B produces m2 to topic2
(commit tx)
4. Service A consumes topic2
(begin tx)
这是我的生产者配置:
final Map<String, Object> props = Maps.newConcurrentMap();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer-tx-1");
这是我的消费者配置:
final Map<String, Object> props = Maps.newHashMap();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
我读了这个示例场景
我尝试遵循但我遇到了一些问题:
And I try to follow but i'm getting some issues:
这是我的生产者代码:
@Override
public Mono<SenderResult<Void>> buy(Message msg) {
final ReactiveKafkaProducerTemplate kafkaProducerTemplate = kafkaConfig.getKafkaProducerTemplate();
return kafkaProducerTemplate.send(mytopic, msg);
}
我的消费者代码:
@Override
public void run(ApplicationArguments arg0) throws Exception {
final ReactiveKafkaProducerTemplate kafkaProducerTemplate = kafkaConfig.getKafkaProducerTemplate();
final ReactiveKafkaConsumerTemplate kafkaConsumerTemplate = kafkaConfig.getKafkaConsumerTemplate(mytopic, Message.class);
final Flux<ConsumerRecord<String, Message>> flux = kafkaConsumerTemplate.receiveExactlyOnce(kafkaProducerTemplate.transactionManager())
.concatMap(receiverRecordFlux -> receiverRecordFlux );
flux.subscribe(record -> {
final Message message = record.value();
System.out.printf("received message: timestamp=%s key=%d value=%s\n",
dateFormat.format(new Date(record.timestamp())),
record.key(),
message);
transactionService.processAndSendToNextTopic(message)
.doOnSuccess(aVoid -> kafkaProducerTemplate.transactionManager().commit())
.subscribe();
});
}
在测试以生成和消费消息到单个分区主题的情况下,我总是遇到以下错误.
I'm always getting following error when testing to produce and consume message to a single partitioned topic under happy case.
Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
有人可以告诉我如何正确开始事务,以及何时提交和关闭事务吗?
Can someone please show me how to correctly begin a transaction, and when to commit and close a transaction?
请注意,如果我不使用事务性 - receive() 而不是 receiveExactlyOnce,这一切都可以正常工作
Note that this all works fine if i don't use transactional - receive() instead of receiveExactlyOnce
推荐答案
您可能需要将 KafkaTransactionManager 同步配置为 SYNCHRONIZATION_ALWAYS
.
You may have to configure KafkaTransactionManager synchronization to SYNCHRONIZATION_ALWAYS
.
请检查.
另见
这篇关于Reactive Kafka:使用事务处理一次的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!