Kafka Connect用户引用偏移量并存储在消息中 [英] Kafka connect consumer referencing offset and storing in message
问题描述
如果我使用kafka-connect消耗消息并存储到s3(使用kafka-connect s3连接器),我是否还能将消息偏移量与事件有效负载一起存储?我想用这些数据对消息进行排序,并检查是否有任何间隙或检查我收到的消息中是否有重复项.(例如,如果我的消费者抵免额被意外破坏,而我重新启动了kafka-connect).这是否可能,或者我应该为此类型的功能编写自定义订阅服务器?
If I am using kafka-connect to consume messages and store to s3 (using the kafka-connect s3 connector), is there anyway I can store the message offset along with the event payload? I would like to have this data to put some order on the messages and also to check if there could be any gaps or check if there were any duplicates in the messages I have received. (e.g. if my consumer offsets get accidentally clobbered and I restarted kafka-connect). Is this possible or should I write a custom subscriber for this type of functionality?
推荐答案
根据插入字段转换,您可以使用 offset.field
:
According to the documentation on Insert Field transformation, you could use offset.field
:
Name Description
offset.field Field name for Apache Kafka® offset. This is only applicable to sink connectors. Suffix with ! to make this a required field, or ? to keep it optional (the default).
总体而言,您的单个消息转换(SMT)配置如下所示:
Overall, your single message transformation (SMT) configuration would look like this:
"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.offset.field": "offsetColumn"
If this is not what you are looking for, then there is always the option to create your customised transformations
这篇关于Kafka Connect用户引用偏移量并存储在消息中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!