卡夫卡消费者.commitSync 与 commitAsync [英] Kafka-consumer. commitSync vs commitAsync

查看:507
本文介绍了卡夫卡消费者.commitSync 与 commitAsync的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

引自 https:/www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callout_kafka_consumers__reading_data_from_kafka_CO2-1

缺点是虽然 commitSync() 会重试提交,直到它要么成功,要么遇到不可重试的失败,commitAsync()不会重试.

The drawback is that while commitSync() will retry the commit until it either succeeds or encounters a non-retriable failure, commitAsync() will not retry.

这句话我不太清楚.我想消费者向代理发送提交请求,如果代理在某个超时内没有响应,则意味着提交失败.我错了吗?

This phrase is not clear to me. I suppose that consumer sends commit request to broker and in case if the broker doesn't respond within some timeout it means that the commit failed. Am I wrong?

能否详细说明commitSynccommitAsync的区别?
另外,请提供我应该选择哪种提交类型的用例.

Can you clarify the difference of commitSync and commitAsync in details?
Also, please provide use cases when which commit type should I prefer.

推荐答案

正如 API 文档中所说:

As it is said in the API documentation:

这是一个同步提交,将阻塞直到提交成功或遇到不可恢复的错误(在这种情况下它会被抛出给调用者).

这意味着,commitSync 是一种阻塞方法.调用它会阻塞你的线程,直到它成功或失败.

That means, the commitSync is a blocking method. Calling it will block your thread until it either succeeds or fails.

例如,

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
        consumer.commitSync();
    }
}

对于 for 循环中的每次迭代,只有在 consumer.commitSync() 成功返回或被抛出异常中断后,您的代码才会移动到下一次迭代.

For each iteration in the for-loop, only after consumer.commitSync() successfully returns or interrupted with exception thrown, your code will move to the next iteration.

这是一个异步调用,不会阻塞.遇到的任何错误都会传递给回调(如果提供)或丢弃.

这意味着,commitAsync 是一种非阻塞方法.调用它不会阻塞你的线程.相反,它会继续处理下面的指令,不管它最终是成功还是失败.

That means, the commitAsync is a non-blocking method. Calling it will not block your thread. Instead, it will continue processing the following instructions, no matter whether it will succeed or fail eventually.

例如,类似于前面的例子,但这里我们使用commitAsync:

For example, similar to previous example, but here we use commitAsync:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
        consumer.commitAsync(callback);
    }
}

对于 for 循环中的每次迭代,无论 consumer.commitAsync() 最终会发生什么,您的代码都会移动到下一次迭代.并且,提交的结果将由您定义的回调函数处理.

For each iteration in the for-loop, no matter what will happen to consumer.commitAsync() eventually, your code will move to the next iteration. And, the result of the commit is going to be handled by the callback function you defined.

权衡:延迟与数据一致性

  • 如果必须保证数据的一致性,请选择commitSync(),因为它可以确保在执行任何进一步操作之前,您将知道偏移量提交是成功还是失败.但由于它是同步和阻塞的,您将花费更多时间等待提交完成,从而导致高延迟.
  • 如果您可以接受某些数据不一致并希望获得低延迟,请选择 commitAsync(),因为它不会等待完成.相反,它只会发出提交请求并稍后处理来自 Kafka 的响应(成功或失败),同时,您的代码将继续执行.
  • If you have to ensure the data consistency, choose commitSync() because it will make sure that, before doing any further actions, you will know whether the offset commit is successful or failed. But because it is sync and blocking, you will spend more time on waiting for the commit to be finished, which leads to high latency.
  • If you are ok of certain data inconsistency and want to have low latency, choose commitAsync() because it will not wait to be finished. Instead, it will just send out the commit request and handle the response from Kafka (success or failure) later, and meanwhile, your code will continue executing.

一般来说,实际行为将取决于您的实际代码以及调用方法的位置.

This is all generally speaking, the actually behaviour will depend on your actual code and where you are calling the method.

这篇关于卡夫卡消费者.commitSync 与 commitAsync的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆