Kafka Connect用户引用偏移量并存储在消息中 [英] Kafka connect consumer referencing offset and storing in message

查看:145
本文介绍了Kafka Connect用户引用偏移量并存储在消息中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果我使用kafka-connect消耗消息并存储到s3(使用kafka-connect s3连接器),我是否还能将消息偏移量与事件有效负载一起存储?我想用这些数据对消息进行排序,并检查是否有任何间隙或检查我收到的消息中是否有重复项.(例如,如果我的消费者抵免额被意外破坏,而我重新启动了kafka-connect).这是否可能,或者我应该为此类型的功能编写自定义订阅服务器?

If I am using kafka-connect to consume messages and store to s3 (using the kafka-connect s3 connector), is there anyway I can store the message offset along with the event payload? I would like to have this data to put some order on the messages and also to check if there could be any gaps or check if there were any duplicates in the messages I have received. (e.g. if my consumer offsets get accidentally clobbered and I restarted kafka-connect). Is this possible or should I write a custom subscriber for this type of functionality?

推荐答案

根据插入字段转换,您可以使用 offset.field :

According to the documentation on Insert Field transformation, you could use offset.field:

Name            Description
offset.field    Field name for Apache Kafka® offset. This is only applicable to sink connectors. Suffix with ! to make this a required field, or ? to keep it optional (the default).

总体而言,您的单个消息转换(SMT)配置如下所示:

Overall, your single message transformation (SMT) configuration would look like this:

"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.offset.field": "offsetColumn"

如果这不是您想要的,那么始终可以选择创建

If this is not what you are looking for, then there is always the option to create your customised transformations

这篇关于Kafka Connect用户引用偏移量并存储在消息中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆