Kafka Connector Sink Task中的Put()vs Flush() [英] Put() vs Flush() in Kafka Connector Sink Task

查看:210
本文介绍了Kafka Connector Sink Task中的Put()vs Flush()的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Kafka Sink连接器将数据批量发送到NOSQL数据库.我正在关注 https://kafka.apache.org/documentation/#connect 文档,并且感到困惑关于必须在何处实现发送记录的逻辑.请帮助我了解如何在内部处理记录以及必须使用Put()或Flush()批量处理记录的原因.

I am trying to send the data in a batch to a NOSQL database using Kafka Sink Connector. I am following https://kafka.apache.org/documentation/#connect documentation and confused about where the logic of sending records has to be implemented. Please help me in understanding how the records are processed internally and what has to be used Put() or Flush() to process the records in a batch.

推荐答案

当Kafka Connect工作程序正在运行接收器任务时,它将使用分配给该任务的主题分区中的消息.这样做时,它将通过put(Collection<SinkRecord>)方法重复地将一批消息传递到接收器任务.只要连接器及其任务正在运行,此操作就会继续.

When a Kafka Connect worker is running a sink task, it will consume messages from the topic partition(s) assigned to the task. As it does so, it repeatedly passes a batch of messages to the sink task through the put(Collection<SinkRecord>) method. This will continue as long as the connector and its tasks are running.

Kafka Connect还将定期记录接收器任务的进度,即每个主题分区上最近处理的消息的偏移量.这称为提交偏移量,这样做是为了确保如果连接器意外且异常停止,Kafka Connect会知道任务应在每个主题分区中的何处恢复处理消息.但是就在Kafka Connect将偏移量写入Kafka之前,Kafka Connect工作者通过flush(...)方法在此阶段为接收器连接器提供了进行工作的机会.

Kafka Connect also will periodically record the progress of the sink tasks, namely the offset of the most recently processed message on each topic partition. This is called committing the offsets, and it does this so that if the connector stops unexpectedly and uncleanly, Kafka Connect knows where in each topic partition the task should resume processing messages. But just before Kafka Connect writes the offsets to Kafka, the Kafka Connect worker gives the sink connector an opportunity to do work during this stage via the flush(...) method.

特定的接收器连接器可能不需要执行任何操作(如果put(...)完成了所有工作),或者它可能借此机会将已经通过put(...)处理过的所有消息提交到数据存储.例如, Confluent的JDBC接收器连接器写入每批使用事务(通过连接器的使用者设置可以控制其大小)通过put(...)方法传递的消息,因此flush(...)方法不需要执行任何操作.另一方面, Confluent的ElasticSearch接收器连接器累积一系列put(...)方法的所有消息,并仅在flush(...)期间将它们写入Elasticsearch.

A particular sink connector might not need to do anything (if put(...) did all of the work), or it might use this opportunity to submit all of the messages already processed via put(...) to the data store. For example, Confluent's JDBC sink connector writes each batch of messages passed through the put(...) method using a transaction (the size of which can be controlled via the connector's consumer settings), and thus the flush(...) method doesn't need to do anything. Confluent's ElasticSearch sink connector, on the other hand, simply accumulates all of the messages for a series of put(...) methods and only writes them to Elasticsearch during flush(...).

为源连接器和接收器连接器提交偏移的频率由连接器的offset.flush.interval.ms配置属性控制.默认值是每60秒提交一次偏移,这很少能提高性能并减少开销,但是足够频繁,足以限制连接器任务意外死亡的潜在重新处理量.请注意,当连接器正常关闭或遇到异常时,Kafka Connect将始终有机会提交偏移量.只有当Kafka Connect工人意外被杀死时,它才可能没有机会提交用于标识已处理哪些消息的偏移量.因此,只有在发生此类故障之后重新启动之后,连接器才可能重新处理它在故障之前所做的某些消息.这是因为消息至少应一次出现一次,因此消息应该是幂等的.确定此设置的适当值时,请考虑连接器的所有 plus 行为.

The frequency that the offsets are committed for source and sink connectors is controlled by the connector's offset.flush.interval.ms configuration property. The default is to commit offsets every 60 seconds, which is infrequent enough to improve performance and reduce overhead, but is frequent enough to cap the potential amount of re-processing should the connector task unexpectedly die. Note that when the connector is shutdown gracefully or experiences an exception, Kafka Connect will always have a chance to commit the offsets. It's only when the Kafka Connect worker is killed unexpectedly that it might not have a chance to commit the offsets identifying what messages had been processed. Thus, only after restarting after such a failure will the connector potentially re-process some messages that it did just prior to the failure. And it's because messages will potentially be seen at least once that the messages should be idempotent. Take all of this plus your connectors' behavior into account when determining appropriate values for this setting.

查看有关Kafka Connect的融合文档并打开源宿连接器,以获取更多示例和详细信息.

Have a look at the Confluent documentation for Kafka Connect as well as open source sink connectors for more examples and details.

这篇关于Kafka Connector Sink Task中的Put()vs Flush()的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆