Kafka一次传送就流 [英] Kafka streams exactly once delivery

查看:49
本文介绍了Kafka一次传送就流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的目标是作为单个原子动作从主题A进行消费,进行一些处理并生成主题B.为此,我看到两个选项:

My goal is to consume from topic A, do some processing and produce to topic B, as a single atomic action. To achieve this I see two options:

  1. 按照此处所述使用spring-kafka @Kafkalistener和KafkaTemplate. a>.
  2. 使用Streams eos(一次)功能.
  1. Use a spring-kafka @Kafkalistener and a KafkaTemplate as described here.
  2. Use Streams eos (exactly-once) functionality.

我已经成功验证了选项#1.所谓成功,是指如果我的处理失败(引发IllegalArgumentException),则来自主题A的已消费消息将继续被KafkaListener消费.这是我期望的,因为未提交偏移量,并且 DefaultAfterRollbackProcessor .

I have successfully verified option #1. By successfully, I mean that if my processing fails (IllegalArgumentException is thrown) the consumed message from topic A keeps being consumed by the KafkaListener. This is what I expect, as the offset is not committed and DefaultAfterRollbackProcessor is used.

如果我不是使用KafkaListener而是使用流从主题A进行消费,处理并发送到主题B(选项#2),则希望看到相同的行为.但是,即使在我处理IllegalArgumentException时,流也只消耗一次消息.这是预期的行为吗?

I am expecting to see the same behaviour if instead of a KafkaListener I use a stream for consuming from topic A, processing and sending to topic B (option #2). But even though while I process an IllegalArgumentException is thrown the message is only consumed once by the stream. Is this the expected behaviour?

在Streams情况下,我唯一的配置如下:

In the Streams case the only configuration I have is the following:

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfiguration {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig  kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "calculate-tax-sender-invoice-stream");        
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
        // this should be enough to enable transactions
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
        return new StreamsConfig(props);
    }
}

//required to create and start a new KafkaStreams, as when an exception is thrown the stream dies
// see here: https://docs.spring.io/spring-kafka/reference/html/_reference.html#after-rollback
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) {
    StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(streamsConfig);
    streamsBuilderFactoryBean.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            log.debug("StopStartStreamsUncaughtExceptionHandler caught exception {}, stopping StreamsThread ..", e);
            streamsBuilderFactoryBean.stop();
            log.debug("creating and starting a new StreamsThread ..");
            streamsBuilderFactoryBean.start();
        }
    });
    return streamsBuilderFactoryBean;
}

我的流是这样的:

@Autowired
public SpecificAvroSerde<InvoiceEvents> eventSerde;

@Autowired
private TaxService taxService;

@Bean
public KStream<String, InvoiceEvents> kStream(StreamsBuilder builder) {

    KStream<String, InvoiceEvents> kStream = builder.stream("A",
            Consumed.with(Serdes.String(), eventSerde));

      kStream
        .mapValues(v -> 
            {
                // get tax from possibly remote service
                // an IllegalArgumentException("Tax calculation failed") is thrown by getTaxForInvoice()
                int tax = taxService.getTaxForInvoice(v);
                // create a TaxCalculated event
                InvoiceEvents taxCalculatedEvent = InvoiceEvents.newBuilder().setType(InvoiceEvent.TaxCalculated).setTax(tax).build();
                log.debug("Generating TaxCalculated event: {}", taxCalculatedEvent);
                return taxCalculatedEvent;
            })
        .to("B", Produced.with(Serdes.String(), eventSerde));

    return kStream;
}

快乐路径流方案有效:如果在处理过程中未引发任何异常,则消息将正确显示在主题B中.

The happy path streams scenario works: if no exception is thrown while processing, message appears properly in topic B.

我的单元测试:

@Test
public void calculateTaxForInvoiceTaxCalculationFailed() throws Exception {
    log.debug("running test calculateTaxForInvoiceTaxCalculationFailed..");
    Mockito.when(taxService.getTaxForInvoice(any(InvoiceEvents.class)))
                        .thenThrow(new IllegalArgumentException("Tax calculation failed"));

    InvoiceEvents invoiceCreatedEvent = createInvoiceCreatedEvent();
    List<KeyValue<String, InvoiceEvents>> inputEvents = Arrays.asList(
            new KeyValue<String, InvoiceEvents>("A", invoiceCreatedEvent));

     Properties producerConfig = new Properties();
     producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
     producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
     producerConfig.put(ProducerConfig.RETRIES_CONFIG, 1);
     producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
     producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
     producerConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
     producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "unit-test-producer");

    // produce with key
    IntegrationTestUtils.produceKeyValuesSynchronously("A", inputEvents, producerConfig);

    // wait for 30 seconds - I should observe re-consumptions of invoiceCreatedEvent, but I do not
    Thread.sleep(30000);
// ...
}

更新: 在单元测试中,我发送了50个invoiceEvent(orderId = 1,...,50),我对其进行处理并将其发送到目标主题.

Update: In my unit test I sent 50 invoiceEvents (orderId=1,...,50), I process them and sent them to a destination topic.

在我的日志中,我看到的行为如下:

In my logs the behaviour I see is as follows:

invoiceEvent.orderId = 43 → consumed and successfully processed
invoiceEvent.orderId = 44 → consumed and IlleagalArgumentException thrown
..new stream starts..
invoiceEvent.orderId = 44 → consumed and successfully processed
invoiceEvent.orderId = 45 → consumed and successfully processed
invoiceEvent.orderId = 46 → consumed and successfully processed
invoiceEvent.orderId = 47 → consumed and successfully processed
invoiceEvent.orderId = 48 → consumed and successfully processed
invoiceEvent.orderId = 49 → consumed and successfully processed
invoiceEvent.orderId = 50 → consumed and IlleagalArgumentException thrown
...
[29-0_0-producer] task [0_0] Error sending record (key A value {"type": ..., "payload": {**"id": "46"**, ... }}} timestamp 1529583666036) to topic invoice-with-tax.t due to {}; No more records will be sent and no more offsets will be recorded for this task.
..new stream starts..
invoiceEvent.**orderId = 46** → consumed and successfully processed
invoiceEvent.orderId = 47 → consumed and successfully processed
invoiceEvent.orderId = 48 → consumed and successfully processed
invoiceEvent.orderId = 49 → consumed and successfully processed
invoiceEvent.orderId = 50 → consumed and successfully processed

为什么第二次失败后,它会从invoiceEvent中重新使用. orderId = 46 ?

Why after the 2nd failure, it re-consumes from invoiceEvent.orderId = 46?

推荐答案

使选项2(Streams事务)起作用的关键点是:

The key points to have option 2 (Streams Transactions) working are:

  • 分配一个Thread.UncaughtExceptionHandler(),以便在发生任何未捕获的异常的情况下启动新的StreamThread(默认情况下,StreamThread死亡-请参见后面的代码段).如果向Kafka经纪人的生产失败了,甚至与流中的业务逻辑代码无关,这甚至可能发生.
  • 请考虑设置一个策略来处理消息的脱轨(使用时).检查DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG( javadoc ).例如,您应该忽略并使用下一条消息还是停止从相关的Kafka分区中使用.
  • 对于Streams,即使您设置MAX_POLL_RECORDS_CONFIG = 1(每个轮询/批处理一个记录),仍然消耗的偏移量和产生的消息不会按消息提交.这种情况导致出现问题中所描述的情况(请参阅为什么第二次失败后,它会从invoiceEvent.orderId = 46中重新使用?".)
  • Kafka事务根本无法在Windows上运行.该修复程序将在Kafka 1.1.1中提供( https://issues.apache.org/jira/browse/KAFKA-6052 ).
  • 考虑检查如何处理序列化异常(或生产过程中的一般异常)(此处

  • Assign a Thread.UncaughtExceptionHandler() so that you start a new StreamThread in case of any uncaught exception (by default the StreamThread dies - see code snippet that follows). This can even happen if the production to Kafka broker fails, it does not have to be related to your business logic code in the stream.
  • Consider setting a policy for handling de-serailization of messages (when you consume). Check DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG (javadoc). For example, should you ignore and consume next message or stop consuming from the relevant Kafka partition.
  • In the case of Streams, even if you set MAX_POLL_RECORDS_CONFIG=1 (one record per poll/batch), still consumed offsets and produced messages are not committed per message. This case leads to cases as the one described in the question (see "Why after the 2nd failure, it re-consumes from invoiceEvent.orderId = 46?").
  • Kafka transactions simply do not work on Windows yet. The fix will be delivered in Kafka 1.1.1 (https://issues.apache.org/jira/browse/KAFKA-6052).
  • Consider checking how you handle serialisation exceptions (or in general exceptions during production) (here and here)

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfiguration {
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public StreamsConfig  kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "blabla");
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
        // this should be enough to enable transactions
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
        return new StreamsConfig(props);
    }
}

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) 
{
    StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(streamsConfig);
    streamsBuilderFactoryBean.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            log.debug("StopStartStreamsUncaughtExceptionHandler caught exception {}, stopping StreamsThread ..", e);
            streamsBuilderFactoryBean.stop();
            log.debug("creating and starting a new StreamsThread ..");
            streamsBuilderFactoryBean.start();
        }
    });
    return streamsBuilderFactoryBean;
}

这篇关于Kafka一次传送就流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆