是否可以使用 Kafka Streams 访问消息头? [英] Is it possible to access message headers with Kafka Streams?

查看:26
本文介绍了是否可以使用 Kafka Streams 访问消息头?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

添加记录头(ProducerRecord & ConsumerRecord) 在 Kafka 0.11 中,使用 Kafka Streams 处理主题时是否可以获得这些标头?当在 KStream 上调用 map 之类的方法时,它提供记录的 keyvalue 的参数,但不提供我可以看到访问 headers 的方式.如果我们可以map覆盖ConsumerRecords,那就太好了.

With the addition of Headers to the records (ProducerRecord & ConsumerRecord) in Kafka 0.11, is it possible to get these headers when processing a topic with Kafka Streams? When calling methods like map on a KStream it provides arguments of the key and the value of the record but no way I can see to access the headers. It would be nice if we could just map over the ConsumerRecords.

例如

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((key, value) ->  ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
    ... 

这样的事情会起作用:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((record) -> {
        record.headers();
        record.key();
        record.value();
    })
    ...

推荐答案

从 2.0.0 版本开始可以访问记录标题(参见 KIP-244 详情).

Records headers are accessible since versions 2.0.0 (cf. KIP-244 for details).

您可以通过处理器 API 访问记录元数据(即,通过 transform()transformValues()process()),通过给定的上下文"对象(参见https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context).

You can access record metadata via the Processor API (ie, via transform(), transformValues(), or process()), by the given "context" object (cf. https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context).

更新

从 2.7.0 版本开始,处理器 API 得到改进(参见 KIP-478),添加一个新的类型安全的 api.Processor 类和 process(Record) 而不是process(K, V) 方法.对于这种情况,可以通过 Record 类访问标头(和记录元数据).

As of 2.7.0 release, the Processor API was improved (cf. KIP-478), adding a new type-safe api.Processor class with process(Record) instead of process(K, V) method. For this case, headers (and record metadata) are accessible via the Record class).

这个新功能在 DSL 的PAPI 方法"中尚不可用(例如 KStream#process()KStream#transform() 和兄弟姐妹).

This new feature is not yet available in "PAPI method of the DSL though (eg. KStream#process(), KStream#transform() and siblings).

++++++

在 2.0 之前,上下文仅公开主题、分区、偏移量和时间戳——但不公开在旧版本中读取时实际上由 Streams 丢弃的标头.

Prior to 2.0, the context only exposes topic, partition, offset, and timestamp---but not headers that are in fact dropped by Streams on read in those older versions.

尽管元数据在 DSL 级别不可用.但是,还有一些工作正在通过 KIP-159.

Metadata is not available at DSL level though. However, there is also work in progress to extend the DSL via KIP-159.

这篇关于是否可以使用 Kafka Streams 访问消息头?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆