Spring Cloud Stream kafka 事务配置 [英] Spring cloud stream kafka transaction configuration

查看:218
本文介绍了Spring Cloud Stream kafka 事务配置的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在关注这个模板用于Spring-cloud-stream-kafka 但在使生产者方法 transactional 时卡住了.我之前没有使用过 kafka 所以需要帮助以防 kafka

I am following this template for Spring-cloud-stream-kafka but got stuck while making the producer method transactional. I have not used kafka earlier so need help with this in case any configuration changes needed in kafka

如果没有添加事务配置它运行良好,但是当添加事务配置时它在启动时超时 -

It works well if no transactional configuration added but when transactional configurations are added it gets timed out at startup -

2020-11-21 15:07:55.349 ERROR 20432 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Failed to obtain partition information
org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

下面是我对 Spring-cloud-stream 的设置

below is my setup for Spring-cloud-stream

pom.xml

<properties>
    <java.version>11</java.version>
    <spring-boot.version>2.3.3.RELEASE</spring-boot.version>
    <spring-cloud.version>Hoxton.SR8</spring-cloud.version>
    <kafka-avro-serializer.version>5.2.1</kafka-avro-serializer.version>
    <avro.version>1.8.2</avro.version>
</properties>

<dependencyManagement>
    <dependencies>
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-dependencies</artifactId>
          <version>${spring-boot.version}</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

transactionManager

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders) {
    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder("kafka",
            MessageChannel.class)).getTransactionalProducerFactory();
    return new KafkaTransactionManager<>(pf);
}

application.yml

spring:
  cloud:
    stream:
      default: 
         producer: 
          useNativeEncoding: true
         consumer:  
          useNativeEncoding: true     
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
          group: group-1
          concurrency: 3
        output:
          destination: employee-details
          content-type: application/*+avro
      kafka:
          binder:        
           producer-properties:
             key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
             value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
             schema.registry.url: http://localhost:8081
             acks: all
             max.block.ms: 60000             
           consumer-properties:
             key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
             value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
             schema.registry.url: http://localhost:8081
             specific.avro.reader: true
           transaction:
            transactionIdPrefix: tx-
           producer:
             enable:
               idempotence: true
#             requiredAcks: all
           brokers:
           - localhost:9094

我在 minikube 中运行 kafka,下面是我主题的配置

I am running kafka in minikube, below is config for my topic

[2020-11-21 06:18:21,655] INFO [ZooKeeperClient] Connected. (kafka.zookeeper.ZooKeeperClient)
Topic: employee-details PartitionCount: 1       ReplicationFactor: 1    Configs:
        Topic: employee-details Partition: 0    Leader: 0       Replicas: 0     Isr: 0

来自 kafka 控制器的日志

TRACE [Controller id=0] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
[2020-11-24 06:56:21,379] DEBUG [Controller id=0] Topics not in preferred replica for broker 0 Map() (kafka.controller.KafkaController)
[2020-11-24 06:56:21,379] TRACE [Controller id=0] Leader imbalance ratio for broker 0 is 0.0 (kafka.controller.KafkaController)

推荐答案

查看服务器日志.

如果事务状态日志的副本少于所需数量,事务生产者将超时.默认情况下需要 3 个副本,并且至少需要 2 个同步.

Transactional producers will time out if there are fewer replicas of the transaction state log than required. By default 3 replicas are required and a minimum of 2 need to be in sync.

transaction.state.log.replication.factortransaction.state.log.min.isr.

这篇关于Spring Cloud Stream kafka 事务配置的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆