如何转换/分叉 Kafka 流并将其发送到特定主题? [英] How do I transform/fork a Kafka stream and send it over to a specific topic?

查看:37
本文介绍了如何转换/分叉 Kafka 流并将其发送到特定主题?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用函数mapValues"将在我的原始流textlines"中获得的字符串值转换为 newStream.然后将我在 newStream 中获得的任何内容流式传输到名为testoutput"的主题上.但是每次消息实际上通过转换块时,我都会收到一个 NullPointerException,错误仅指向 kafka 流库.不知道发生了什么:((

I am Trying to transform the string value obtained in my original stream "textlines" into JSONObject Messages using the function "mapValues" into newStream. Then stream whatever I get in newStream onto a topic called "testoutput". But everytime a message actually goes through the transformation block I get a NullPointerException with errors pointing only into kafka stream libraries. Have no idea what is going on :((

附言当我从原始流分叉/创建新的 kafka 流时,新流是否属于原始构建器?由于我需要构建器来创建 KafkaStreams 对象并开始流式传输,因此我不确定是否需要对新流执行其他操作,而不仅仅是指定它的去向 .to("topic")

P.S. When I fork/create a new kafka stream from the original stream, does the new stream belong to the original builder? Since I need builder to create the KafkaStreams Object and start streaming I am not sure if I need to do something else with the new stream other than just specifying where its going .to("topic")

//Testing a Kafka Stream Application
public class testStream {

public static void main(String[] args) throws Exception {
    //Configurations
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-teststream");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "xxxxxxxxxxxx:xxxx");
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

    //Building Stream
    KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> textlines = builder.stream("mytest2"); 

    //Printout The Inputs just for testing purposes
    textlines.foreach(new ForeachAction<String, String>(){
        public void apply(String key, String value){
            for(int y=0; y<value.length(); y++){
                System.out.print(value.charAt(y));
            }
            System.out.print("\n");
        }
    });

    //Transform String Records to JSON Objects
    KStream<String, JSONObject> newStream = textlines.mapValues(new ValueMapper<String,JSONObject>(){
        @Override
        public JSONObject apply(String value) {

            JSONObject jsnobj = new JSONObject();

            //If the first 4 letters of the message is "xxxx" then parse it to a 
            //JSON Object, otherwise create a dummy
            if(value.substring(0, 4).equals("xxxx")){               
                jsnobj.put("Header_Title", value.substring(0, 4));
                jsnobj.put("Data_Part", value.substring(4));
            }else{
                jsnobj.put("Header_Title", "Not xxxx");
                jsnobj.put("Data_Part", "None");
            }
            return jsnobj;
        }
    });

    //Specify target
    newStream.to("testoutput");

    //Off you go
    KafkaStreams streams=new KafkaStreams(builder, props);
    streams.start();

  }
}

推荐答案

据我所知,您的问题是这一行:

From what I can tell your problem is this line:

newStream.to("testoutput");

newStream 的类型为 KStream.

但是,您的应用程序默认配置为使用 String serde 来序列化/反序列化记录键和记录值:

However, your application is configured to use, by default, a String serde to serialize/deserialize record keys and record values:

props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

这意味着,当您没有在 to() 调用中提供显式 serdes 时,Kafka Streams 将尝试将您的 newStream 编写为 KStream(而不是 KStream)返回到 Kafka.

This means that, when you do not provide explicit serdes in the to() call, Kafka Streams will attempt to write your newStream as KStream<String, String> (rather than KStream<String, JSONObject>) back to Kafka.

您需要做的是在 to() 调用中提供显式 serdes:

What you need to do is to provide explicit serdes in the to() call:

// Sth like this
newStream.to(Serdes.String(), myJsonSerde, "testoutput");

不幸的是,Kafka 尚未包含开箱即用的 JSON serde(已计划).幸运的是,您可以查看(并复制)包含在 Kafka 自己的 Kafka Streams API 演示应用程序中的示例 JSON serde:https://github.com/apache/kafka/tree/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview

Unfortunately, Kafka doesn't include an out-of-the-box JSON serde yet (it's planned). Fortunately, you can look at (and copy) the example JSON serde included in Kafka's own demo applications for the Kafka Streams API: https://github.com/apache/kafka/tree/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview

这篇关于如何转换/分叉 Kafka 流并将其发送到特定主题?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆