Kafka 流恰好一次交付 [英] Kafka streams exactly once delivery

查看:24
本文介绍了Kafka 流恰好一次交付的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的目标是从主题 A 消费,做一些处理并生产到主题 B,作为单个原子操作.为了实现这一点,我看到了两个选项:

My goal is to consume from topic A, do some processing and produce to topic B, as a single atomic action. To achieve this I see two options:

  1. 使用 spring-kafka @Kafkalistener 和 KafkaTemplate,如此处所述.
  2. 使用 Streams eos(恰好一次)功能.

我已成功验证选项 #1.成功,我的意思是如果我的处理失败(抛出 IllegalArgumentException),来自主题 A 的消费消息将继续被 KafkaListener 消费.这是我所期望的,因为未提交偏移量并且 使用了 >DefaultAfterRollbackProcessor.

I have successfully verified option #1. By successfully, I mean that if my processing fails (IllegalArgumentException is thrown) the consumed message from topic A keeps being consumed by the KafkaListener. This is what I expect, as the offset is not committed and DefaultAfterRollbackProcessor is used.

如果我使用流来从主题 A 消费、处理并发送到主题 B(选项 #2),我希望看到相同的行为.但是即使在我处理抛出 IllegalArgumentException 时,该消息也仅被流消耗一次.这是预期的行为吗?

I am expecting to see the same behaviour if instead of a KafkaListener I use a stream for consuming from topic A, processing and sending to topic B (option #2). But even though while I process an IllegalArgumentException is thrown the message is only consumed once by the stream. Is this the expected behaviour?

在 Streams 的情况下,我唯一的配置如下:

In the Streams case the only configuration I have is the following:

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfiguration {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig  kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "calculate-tax-sender-invoice-stream");        
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
        // this should be enough to enable transactions
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
        return new StreamsConfig(props);
    }
}

//required to create and start a new KafkaStreams, as when an exception is thrown the stream dies
// see here: https://docs.spring.io/spring-kafka/reference/html/_reference.html#after-rollback
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) {
    StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(streamsConfig);
    streamsBuilderFactoryBean.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            log.debug("StopStartStreamsUncaughtExceptionHandler caught exception {}, stopping StreamsThread ..", e);
            streamsBuilderFactoryBean.stop();
            log.debug("creating and starting a new StreamsThread ..");
            streamsBuilderFactoryBean.start();
        }
    });
    return streamsBuilderFactoryBean;
}

我的直播是这样的:

@Autowired
public SpecificAvroSerde<InvoiceEvents> eventSerde;

@Autowired
private TaxService taxService;

@Bean
public KStream<String, InvoiceEvents> kStream(StreamsBuilder builder) {

    KStream<String, InvoiceEvents> kStream = builder.stream("A",
            Consumed.with(Serdes.String(), eventSerde));

      kStream
        .mapValues(v -> 
            {
                // get tax from possibly remote service
                // an IllegalArgumentException("Tax calculation failed") is thrown by getTaxForInvoice()
                int tax = taxService.getTaxForInvoice(v);
                // create a TaxCalculated event
                InvoiceEvents taxCalculatedEvent = InvoiceEvents.newBuilder().setType(InvoiceEvent.TaxCalculated).setTax(tax).build();
                log.debug("Generating TaxCalculated event: {}", taxCalculatedEvent);
                return taxCalculatedEvent;
            })
        .to("B", Produced.with(Serdes.String(), eventSerde));

    return kStream;
}

快乐路径流场景有效:如果在处理过程中没有抛出异常,则消息在主题 B 中正确显示.

The happy path streams scenario works: if no exception is thrown while processing, message appears properly in topic B.

我的单元测试:

@Test
public void calculateTaxForInvoiceTaxCalculationFailed() throws Exception {
    log.debug("running test calculateTaxForInvoiceTaxCalculationFailed..");
    Mockito.when(taxService.getTaxForInvoice(any(InvoiceEvents.class)))
                        .thenThrow(new IllegalArgumentException("Tax calculation failed"));

    InvoiceEvents invoiceCreatedEvent = createInvoiceCreatedEvent();
    List<KeyValue<String, InvoiceEvents>> inputEvents = Arrays.asList(
            new KeyValue<String, InvoiceEvents>("A", invoiceCreatedEvent));

     Properties producerConfig = new Properties();
     producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
     producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
     producerConfig.put(ProducerConfig.RETRIES_CONFIG, 1);
     producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
     producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
     producerConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
     producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "unit-test-producer");

    // produce with key
    IntegrationTestUtils.produceKeyValuesSynchronously("A", inputEvents, producerConfig);

    // wait for 30 seconds - I should observe re-consumptions of invoiceCreatedEvent, but I do not
    Thread.sleep(30000);
// ...
}

更新:在我的单元测试中,我发送了 50 个 invoiceEvents(orderId=1,...,50),我处理它们并将它们发送到目标主题.

Update: In my unit test I sent 50 invoiceEvents (orderId=1,...,50), I process them and sent them to a destination topic.

在我的日志中,我看到的行为如下:

In my logs the behaviour I see is as follows:

invoiceEvent.orderId = 43 → consumed and successfully processed
invoiceEvent.orderId = 44 → consumed and IlleagalArgumentException thrown
..new stream starts..
invoiceEvent.orderId = 44 → consumed and successfully processed
invoiceEvent.orderId = 45 → consumed and successfully processed
invoiceEvent.orderId = 46 → consumed and successfully processed
invoiceEvent.orderId = 47 → consumed and successfully processed
invoiceEvent.orderId = 48 → consumed and successfully processed
invoiceEvent.orderId = 49 → consumed and successfully processed
invoiceEvent.orderId = 50 → consumed and IlleagalArgumentException thrown
...
[29-0_0-producer] task [0_0] Error sending record (key A value {"type": ..., "payload": {**"id": "46"**, ... }}} timestamp 1529583666036) to topic invoice-with-tax.t due to {}; No more records will be sent and no more offsets will be recorded for this task.
..new stream starts..
invoiceEvent.**orderId = 46** → consumed and successfully processed
invoiceEvent.orderId = 47 → consumed and successfully processed
invoiceEvent.orderId = 48 → consumed and successfully processed
invoiceEvent.orderId = 49 → consumed and successfully processed
invoiceEvent.orderId = 50 → consumed and successfully processed

为什么在第二次失败后,它会从 invoiceEvent 中重新使用.orderId = 46?

Why after the 2nd failure, it re-consumes from invoiceEvent.orderId = 46?

推荐答案

使选项 2(流事务)起作用的关键点是:

The key points to have option 2 (Streams Transactions) working are:

  • 分配一个 Thread.UncaughtExceptionHandler() 以便在出现任何未捕获的异常时启动一个新的 StreamThread(默认情况下 StreamThread 死亡 - 请参阅下面的代码片段).如果到 Kafka 代理的生产失败,这甚至可能发生,它不一定与流中的业务逻辑代码相关.
  • 考虑设置处理消息去链化的政策(当您消费时).检查 DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG(javadoc).例如,您应该忽略并使用下一条消息,还是停止使用相关的 Kafka 分区.
  • 在 Streams 的情况下,即使您设置 MAX_POLL_RECORDS_CONFIG=1(每个轮询/批次一条记录),仍然消耗的偏移量和生成的消息不会提交每条消息.这种情况会导致问题中描述的情况(请参阅为什么在第二次失败后,它会从 invoiceEvent.orderId = 46 中重新使用?").
  • Kafka 事务根本无法在 Windows 上运行.该修复程序将在 Kafka 1.1.1 (https://issues.apache.org/jira/browse/KAFKA-6052).
  • 考虑检查您如何处理序列化异常(或生产期间的一般异常)(这里这里)

  • Assign a Thread.UncaughtExceptionHandler() so that you start a new StreamThread in case of any uncaught exception (by default the StreamThread dies - see code snippet that follows). This can even happen if the production to Kafka broker fails, it does not have to be related to your business logic code in the stream.
  • Consider setting a policy for handling de-serailization of messages (when you consume). Check DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG (javadoc). For example, should you ignore and consume next message or stop consuming from the relevant Kafka partition.
  • In the case of Streams, even if you set MAX_POLL_RECORDS_CONFIG=1 (one record per poll/batch), still consumed offsets and produced messages are not committed per message. This case leads to cases as the one described in the question (see "Why after the 2nd failure, it re-consumes from invoiceEvent.orderId = 46?").
  • Kafka transactions simply do not work on Windows yet. The fix will be delivered in Kafka 1.1.1 (https://issues.apache.org/jira/browse/KAFKA-6052).
  • Consider checking how you handle serialisation exceptions (or in general exceptions during production) (here and here)

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfiguration {
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig  kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "blabla");
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
        // this should be enough to enable transactions
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
        return new StreamsConfig(props);
    }
}

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) 
{
    StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(streamsConfig);
    streamsBuilderFactoryBean.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            log.debug("StopStartStreamsUncaughtExceptionHandler caught exception {}, stopping StreamsThread ..", e);
            streamsBuilderFactoryBean.stop();
            log.debug("creating and starting a new StreamsThread ..");
            streamsBuilderFactoryBean.start();
        }
    });
    return streamsBuilderFactoryBean;
}

这篇关于Kafka 流恰好一次交付的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆