卡夫卡消费者。 commitSync vs commitAsync [英] Kafka-consumer. commitSync vs commitAsync

查看:240
本文介绍了卡夫卡消费者。 commitSync vs commitAsync的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

来自 https的引用: //www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callout_kafka_consumers__reading_data_from_kafka_CO2-1


缺点是,虽然commitSync()将重试提交,直到
成功或遇到不可恢复的失败,commitAsync()
将不会重试。

The drawback is that while commitSync() will retry the commit until it either succeeds or encounters a nonretriable failure, commitAsync() will not retry.

这句话对我来说并不清楚。我想消费者会向代理发送提交请求,如果代理在某些超时内没有响应,则意味着提交失败。我错了吗?

This phrase is not clear for me. I suppose that consumer sends commit request to broker and in case if broker doesn't respond within some timeout it means that aommit failed. Am I wrong ?

你能澄清 commitSync commitAsync 详细信息?

另外,请提供我更喜欢哪种提交类型的用例。

Can you clarify difference of commitSync and commitAsync in details?
Also, please provide use cases when which commit type should I prefer.

推荐答案

正如API文档中所述:

As it is said in the API documentation:

  • commitSync

这是一个同步提交,将阻塞,直到提交成功或遇到不可恢复的错误(在这种情况下,它被抛给调用者。)

这意味着, commitSync 是一种阻止方法。调用它会阻塞你的线程,直到它成功或失败。

That means, the commitSync is a blocking method. Calling it will block your thread until it either succeeds or fails.

例如,

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
        consumer.commitSync();
    }
}

对于for循环中的每次迭代,只有在 consumer.commitSync()在抛出异常时成功返回或中断,您的代码将移至下一次迭代。

For each iteration in the for-loop, only after consumer.commitSync() successfully returns or interrupted with exception thrown, your code will move to the next iteration.

  • commitAsync

这是一个异步调用,不会阻止。遇到的任何错误都会传递给回调(如果提供)或被丢弃。

这意味着, commitAsync 是一种非阻塞方法。调用它不会阻止你的线程。相反,它将继续处理以下指令,无论它最终是成功还是失败。

That means, the commitAsync is a non-blocking method. Calling it will not block your thread. Instead, it will continue processing the following instructions, no matter whether it will succeed or fail eventually.

例如,类似于前面的例子,但这里我们使用 commitAsync

For example, similar to previous example, but here we use commitAsync:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
        consumer.commitAsync(callback);
    }
}

对于for循环中的每次迭代,无论如何什么会发生在 consumer.commitAsync()最终,您的代码将转移到下一次迭代。并且,提交的结果将由您定义的回调函数处理。

For each iteration in the for-loop, no matter what will happen to consumer.commitAsync() eventually, your code will move to the next iteration. And, the result of the commit is going to be handled by the callback function you defined.

权衡:延迟与数据一致性


  • 如果您必须确保数据一致性,请选择 commitSync(),因为它会确保在执行任何操作之前进一步的操作,您将知道偏移提交是成功还是失败。但是因为它是同步和阻塞,你将花费更多的时间等待提交完成,这会导致高延迟。

  • 如果你确定某些数据不一致并想要具有低延迟,选择 commitAsync()因为它不会等待完成。相反,它只会发出提交请求并稍后处理来自Kafka的响应(成功或失败),同时,您的代码将继续执行。

  • If you have to ensure the data consistency, choose commitSync() because it will make sure that, before doing any further actions, you will know whether the offset commit is successful or failed. But because it is sync and blocking, you will spend more time on waiting for the commit to be finished, which leads to high latency.
  • If you are ok of certain data inconsistency and want to have low latency, choose commitAsync() because it will not wait to be finished. Instead, it will just send out the commit request and handle the response from Kafka (success or failure) later, and meanwhile, your code will continue executing.

一般来说,实际行为将取决于您的实际代码以及您调用方法的位置。

This is all generally speaking, the actually behaviour will depend on your actual code and where you are calling the method.

这篇关于卡夫卡消费者。 commitSync vs commitAsync的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆