如何在 KStream 中获取偏移值 [英] How can I get the offset value in KStream

查看:36
本文介绍了如何在 KStream 中获取偏移值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Kafka Streams 开发 PoC.现在我需要在流消费者中获取偏移值并使用它为每条消息生成一个唯一的键 (topic-offset)->hash.原因是:生产者是 syslog,只有少数人有 ID.我无法在消费者中生成 UUID,因为在重新处理的情况下,我需要重新生成相同的密钥.

I'm developing a PoC with Kafka Streams. Now I need to get the offset value in the stream consumer and use it to generate a unique key (topic-offset)->hash for each message. The reason is: the producers are syslog and only few of them have IDs. I cannot generate a UUID in the consumer because in case of a reprocess I need to regenerate the same key.

我的问题是:org.apache.kafka.streams.processor.ProcessorContext 类公开了一个返回值的 .offset() 方法,但我使用 KStream 而不是 Processor,我找不到返回相同内容的方法.

My problem is: the org.apache.kafka.streams.processor.ProcessorContext class exposes an .offset() method that returns the value, but I'm using KStream instead of the Processor, and I couldn't find a method that returns the same thing.

有人知道如何从 KStream 中提取每一行的消费者值吗?提前致谢.

Anybody knows how to extract the consumer value for each row from a KStream? Thanks in advance.

推荐答案

您可以通过 process(...)transform(...)transformValues(...).

You can use mix-and-match DSL and Processor API via process(...), transform(...), and transformValues(...).

它允许您访问类似于普通处理器 API 的当前记录偏移量.在您的情况下,您似乎想使用 KStream#transform(...).

It allows you to access the current record offset similar to plain Processor API. In you case, it seems you want to use KStream#transform(...).

这篇关于如何在 KStream 中获取偏移值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆