使用 kafka 流根据消息密钥向主题发送消息 [英] send message to topic based on message key using kafka streams

查看:29
本文介绍了使用 kafka 流根据消息密钥向主题发送消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望能够根据消息键的键将 Kafkastream 中的所有记录发送到不同的主题.前任.Kafka 中的流包含名称作为键和记录作为值.我想根据记录的键将这些记录扇出到不同的主题

I want to be able to send all records in a Kafkastream to a different topic based on the key of the message key. Ex. A stream in Kafka contains name as key and record as value. I want to fan out these records to different topic based on the key of the record

data : (jhon -> {jhonsRecord}),(sean -> {seansRecord}),(mary -> {marysRecord}),(jhon -> {jhonsRecord2}),预期

data : (jhon -> {jhonsRecord}),(sean -> {seansRecord}),(mary -> {marysRecord}),(jhon -> {jhonsRecord2}), expected

  • topic1 :name: jhon ->(jhon -> {jhonsRecord}),(jhon -> {jhonsRecord2})
  • topic2 :sean-> (sean -> {seansRecord})
  • topic3 :mary -> (mary -> {marysRecord})

下面是我现在这样做的方式,但是由于名称列表是hudge,所以速度很慢.加上即使有几个名字的记录,我也需要遍历整个列表 请提出修复建议

Below is the way I am doing this right now, but since the list of names is hudge this is slow. Plus even if there are records a few names, I need to traverse the entire list Please suggest a fix

    for( String name : names )
    {
        recordsByName.filterNot(( k, v ) -> k.equalsIgnoreCase(name)).to(name);
    } 

推荐答案

我认为你应该使用 KStream::to(final TopicNameExtractor topicExtractor) 函数.它使您能够计算每条消息的主题名称.

I think you should use KStream::to(final TopicNameExtractor<K, V> topicExtractor) function. It gives you ability to calculate name of the topic for each message.

示例代码:

final KStream<String, String> stream = ???;
stream.to((key, value, recordContext) -> key);

这篇关于使用 kafka 流根据消息密钥向主题发送消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆