是否可以使用 Kafka Streams 访问消息头? [英] Is it possible to access message headers with Kafka Streams?
问题描述
添加记录头(ProducerRecord & ConsumerRecord) 在 Kafka 0.11 中,使用 Kafka Streams 处理主题时是否可以获得这些标头?当在 KStream
上调用 map
之类的方法时,它提供记录的 key
和 value
的参数,但不提供我可以看到访问 headers
的方式.如果我们可以map
覆盖ConsumerRecord
s,那就太好了.
With the addition of Headers to the records (ProducerRecord & ConsumerRecord) in Kafka 0.11, is it possible to get these headers when processing a topic with Kafka Streams? When calling methods like map
on a KStream
it provides arguments of the key
and the value
of the record but no way I can see to access the headers
. It would be nice if we could just map
over the ConsumerRecord
s.
例如
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((key, value) -> ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
...
这样的事情会起作用:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((record) -> {
record.headers();
record.key();
record.value();
})
...
推荐答案
从 2.0.0 版本开始可以访问记录标题(参见 KIP-244 详情).
Records headers are accessible since versions 2.0.0 (cf. KIP-244 for details).
您可以通过处理器 API 访问记录元数据(即,通过 transform()
、transformValues()
或 process()
),通过给定的上下文"对象(参见https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context).
You can access record metadata via the Processor API (ie, via transform()
, transformValues()
, or process()
), by the given "context" object (cf. https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context).
更新
从 2.7.0 版本开始,处理器 API 得到改进(参见 KIP-478),添加一个新的类型安全的 api.Processor
类和 process(Record)
而不是process(K, V)
方法.对于这种情况,可以通过 Record
类访问标头(和记录元数据).
As of 2.7.0 release, the Processor API was improved (cf. KIP-478), adding a new type-safe api.Processor
class with process(Record)
instead of process(K, V)
method. For this case, headers (and record metadata) are accessible via the Record
class).
这个新功能在 DSL 的PAPI 方法"中尚不可用(例如 KStream#process()
、KStream#transform()
和兄弟姐妹).
This new feature is not yet available in "PAPI method of the DSL though (eg. KStream#process()
, KStream#transform()
and siblings).
++++++
在 2.0 之前,上下文仅公开主题、分区、偏移量和时间戳——但不公开在旧版本中读取时实际上由 Streams 丢弃的标头.
Prior to 2.0, the context only exposes topic, partition, offset, and timestamp---but not headers that are in fact dropped by Streams on read in those older versions.
尽管元数据在 DSL 级别不可用.但是,还有一些工作正在通过 KIP-159.
Metadata is not available at DSL level though. However, there is also work in progress to extend the DSL via KIP-159.
这篇关于是否可以使用 Kafka Streams 访问消息头?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!