是否可以使用Kafka Streams访问邮件头? [英] Is it possible to access message headers with Kafka Streams?

查看:166
本文介绍了是否可以使用Kafka Streams访问邮件头?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

添加标题的标题 ProducerRecord & ConsumerRecord )在使用Kafka Streams处理主题时是否可以获得这些标题?在 KStream 上调用 map 等方法时,它提供了键的参数和记录的但我无法看到访问标题。如果我们只能 map 而不是 ConsumerRecord ,那就太好了。

With the addition of Headers to the records (ProducerRecord & ConsumerRecord) in Kafka 0.11, is it possible to get these headers when processing a topic with Kafka Streams? When calling methods like map on a KStream it provides arguments of the key and the value of the record but no way I can see to access the headers. It would be nice if we could just map over the ConsumerRecords.

ex。

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((key, value) ->  ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
    ... 

这样的事情会起作用:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((record) -> {
        record.headers();
        record.key();
        record.value();
    })
    ...


推荐答案

自Streams API的2.0版以来,可以访问记录标题。 (参见 KIP-244 了解详情。)

Records headers are accessible since versions 2.0 of Streams API. (Cf. KIP-244 for details.)

您可以通过处理器API访问记录元数据(即通过转换() transformValues(),或 process()),由给定的上下文对象(参见 https: //docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context )。

You can access record metadata via Processor API (ie, via transform(), transformValues(), or process()), by the given "context" object (cf. https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context).

之前2.0,上下文只公开主题,分区,偏移量和时间戳---但不是那些在旧版本中读取时被Streams删除的标题。

Prior to 2.0, the context only exposes topic, partition, offset, and timestamp---but not headers that are in fact dropped by Streams on read in those older versions.

元数据虽然在DSL级别不可用。但是,还在进行扩展DSL的工作: https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams

Metadata is not available at DSL level though. However, there is also work in progress to extend the DSL: https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams

这篇关于是否可以使用Kafka Streams访问邮件头?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆