Kafka连接消费者引用偏移量并存储在消息中 [英] Kafka connect consumer referencing offset and storing in message

查看:42
本文介绍了Kafka连接消费者引用偏移量并存储在消息中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果我使用 kafka-connect 来消费消息并存储到 s3(使用 kafka-connect s3 连接器),那么我是否可以将消息偏移量与事件有效负载一起存储?我想要这些数据来对消息进行排序,并检查是否有任何差距或检查我收到的消息中是否有任何重复.(例如,如果我的消费者偏移被意外破坏并且我重新启动了 kafka-connect).这是可能的还是我应该为此类功能编写自定义订阅者?

If I am using kafka-connect to consume messages and store to s3 (using the kafka-connect s3 connector), is there anyway I can store the message offset along with the event payload? I would like to have this data to put some order on the messages and also to check if there could be any gaps or check if there were any duplicates in the messages I have received. (e.g. if my consumer offsets get accidentally clobbered and I restarted kafka-connect). Is this possible or should I write a custom subscriber for this type of functionality?

推荐答案

根据 插入字段 转换,你可以使用 offset.field:

According to the documentation on Insert Field transformation, you could use offset.field:

Name            Description
offset.field    Field name for Apache Kafka® offset. This is only applicable to sink connectors. Suffix with ! to make this a required field, or ? to keep it optional (the default).

总体而言,您的单消息转换 (SMT) 配置如下所示:

Overall, your single message transformation (SMT) configuration would look like this:

"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.offset.field": "offsetColumn"

如果这不是您要找的,那么总是可以选择创建您的 自定义 转换

If this is not what you are looking for, then there is always the option to create your customised transformations

这篇关于Kafka连接消费者引用偏移量并存储在消息中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆