Apache Kafka:版本0.10中只有一次 [英] Apache Kafka: Exactly Once in Version 0.10

查看:83
本文介绍了Apache Kafka:版本0.10中只有一次的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

要让Kafka使用者一次完成消息的处理,我一次要提交一条消息,如下所示:

To achieve exactly-once processing of messages by Kafka consumer I am committing one message at a time, like below

public void commitOneRecordConsumer(long seconds) {
        KafkaConsumer<String, String> consumer = consumerConfigFactory.getConsumerConfig();

        try {

            while (running) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                try {
                    for (ConsumerRecord<String, String> record : records) {

                        processingService.process(record);

                        consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(),record.partition()), new OffsetAndMetadata(record.offset() + 1)));

                        System.out.println("Committed Offset" + ": " + record.offset());

                    }
                } catch (CommitFailedException e) {
                    // application specific failure handling
                }
            }
        } finally {
            consumer.close();
        }
    }

上面的代码将消息的处理异步委派给下面的另一个类.

The above code delegates the processing of message asynchronously to another class below.

@Service
public class ProcessingService {

    @Async
    public void process(ConsumerRecord<String, String> record) throws InterruptedException {
        Thread.sleep(5000L);
        Map<String, Object> map = new HashMap<>();
        map.put("partition", record.partition());
        map.put("offset", record.offset());
        map.put("value", record.value());
        System.out.println("Processed" + ": " + map);
    }

}

但是,这仍然不能保证一次发送,因为如果处理失败,它可能仍会提交其他消息,并且以前的消息将永远不会被处理和提交,我在这里有什么选择?

However, this still does not guarantee exactly-once delivery, because if the processing fails, it might still commit other messages and the previous messages will never be processed and committed, what are my options here?

推荐答案

0.10.2和更低版本的原始答案(对于0.11和更高版本,请参见答案)

当前,Kafka无法提供开箱即用的精确处理.如果在成功处理消息后提交消息,则可以进行一次最少处理;如果在开始处理之前直接在poll()之后提交消息,则可以进行一次最多处理.

Currently, Kafka cannot provide exactly-once processing out-of-the box. You can either have at-least-once processing if you commit messages after you successfully processed them, or you can have at-most-once processing if you commit messages directly after poll() before you start processing.

(另请参见 http://docs.confluent.io/3.0.0/clients/consumer.html#synchronous-commits )

但是,如果处理是幂等的,则至少一次保证是足够好",即,即使您处理记录两次,最终结果也将是相同的.幂等处理的示例是将消息添加到键值存储中.即使您两次添加相同的记录,第二次插入也只会替换当前的第一个键值对,并且KV存储区中仍将包含正确的数据.

However, at-least-once guarantee is "good enough" if your processing is idempotent, i.e., the final result will be the same even if you process a record twice. Examples for idempotent processing would be adding a message to a key-value store. Even if you add the same record twice, the second insert will just replace the first current key-value-pair and the KV-store will still have the correct data in it.

在上面的示例代码中,您更新了HashMap,这将是幂等操作.即使在发生故障的情况下状态可能不一致,例如在崩溃前仅执行两个put调用.但是,这种不一致的状态将在再次重新处理同一记录时得到解决.

In your example code above, you update a HashMap and this would be an idempotent operation. Even if your might have an inconsistent state in case of failure if for example only two put calls are executed before the crash. However, this inconsistent state would be fixed on reprocessing the same record again.

println()的调用不是幂等的,因为这是带有副作用"的操作.但是我想打印的内容仅供调试之用.

The call to println() is not idempotent though because this is an operation with "side effect". But I guess the print is for debugging purpose only.

作为替代,您将需要在用户代码中实现事务语义,这需要在失败的情况下撤消"(部分执行)操作.一般来说,这是一个难题.

As an alternative, you would need to implement transaction semantics in your user code which requires to "undo" (partly executed) operation in case of failure. In general, this is a hard problem.

Apache Kafka 0.11+的更新(有关0.11之前的版本,请参见上面的答案)

从0.11开始,Apache Kafka支持幂等生产者,事务生产者以及使用Kafka Streams进行的一次精确处理.它还向使用者添加了"read_committed"模式,以仅读取已提交的消息(并丢弃/过滤中止的消息).

Since 0.11, Apache Kafka supports idempotent producers, transactional producer, and exactly-once-processing using Kafka Streams. It also adds a "read_committed" mode to the consumer to only read committed messages (and to drop/filter aborted messages).

  • https://kafka.apache.org/documentation/#semantics
  • https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
  • https://www.confluent.io/blog/transactions-apache-kafka/
  • https://www.confluent.io/blog/enabling-exactly-kafka-streams/

这篇关于Apache Kafka:版本0.10中只有一次的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆