如何使 Serdes 与多步 kafka 流一起工作 [英] How to make Serdes work with multi-step kafka streams

查看:23
本文介绍了如何使 Serdes 与多步 kafka 流一起工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Kafka 的新手,我正在构建一个使用 Twitter API 作为数据源的入门项目.我创建了一个生产者,它可以查询 Twitter API,并使用字符串序列化器将数据发送到我的 kafka 主题,用于键和值.我的 Kafka Stream 应用程序读取这些数据并进行字数统计,但也按推文的日期进行分组.这部分是通过名为 wordCounts 的 KTable 完成的,以利用其 upsert 功能.这个KTable的结构是:

I am new to Kafka and I'm building a starter project using the Twitter API as a data source. I have create a Producer which can query the Twitter API and sends the data to my kafka topic with string serializer for both key and value. My Kafka Stream Application reads this data and does a word count, but also grouping by the date of the tweet. This part is done through a KTable called wordCounts to make use of its upsert functionality. The structure of this KTable is:

键:{word:exampleWord,日期:exampleDate},值:numberOfOccurences

Key: {word: exampleWord, date: exampleDate}, Value: numberOfOccurences

然后我尝试将 KTable 流中的数据重组为平面结构,以便稍后将其发送到数据库.您可以在 wordCountsStructured KStream 对象中看到这一点.这会将数据重组为如下所示的结构.该值最初是一个 JsonObject,但我将其转换为字符串以匹配我设置的 Serdes.

I then attempt to restructure the data in the KTable stream to a flat structure so I can later send it to a database. You can see this in the wordCountsStructured KStream object. This restructures the data to look like the structure below. The value is initially a JsonObject but i convert it to a string to match the Serdes which i set.

Key: null, Value: {word: exampleWord, date: exampleDate, Counts: numberOfOccurences}

但是,当我尝试将其发送到我的第二个 kafka 主题时,出现以下错误.

However, when I try to send this to my second kafka topic, I get the error below.

一个序列化器(关键:org.apache.kafka.common.serialization.StringSerializer/值:org.apache.kafka.common.serialization.StringSerializer) 不是与实际的键或值类型兼容(键类型:com.google.gson.JsonObject/值类型:com.google.gson.JsonObject).更改 StreamConfig 中的默认 Serdes 或提供正确的 Serdes通过方法参数.

A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: com.google.gson.JsonObject / value type: com.google.gson.JsonObject). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

我对此感到困惑,因为我发送到主题的 KStream 是 类型.有谁知道我该如何解决这个问题?

I'm confused by this since the KStream I am sending to the topic is of type <String, String>. Does anyone know how I might fix this?

public class TwitterWordCounter {

private final JsonParser jsonParser = new JsonParser();

public Topology createTopology(){
    StreamsBuilder builder = new StreamsBuilder();


    KStream<String, String> textLines = builder.stream("test-topic2");
    KTable<JsonObject, Long> wordCounts = textLines
            //parse each tweet as a tweet object
            .mapValues(tweetString -> new Gson().fromJson(jsonParser.parse(tweetString).getAsJsonObject(), Tweet.class))
            //map each tweet object to a list of json objects, each of which containing a word from the tweet and the date of the tweet
            .flatMapValues(TwitterWordCounter::tweetWordDateMapper)
            //update the key so it matches the word-date combination so we can do a groupBy and count instances
            .selectKey((key, wordDate) -> wordDate)
            .groupByKey()
            .count(Materialized.as("Counts"));

    /*
        In order to structure the data so that it can be ingested into SQL, the value of each item in the stream must be straightforward: property, value
        so we have to:
         1. take the columns which include the dimensional data and put this into the value of the stream.
         2. lable the count with 'count' as the column name
     */
    KStream<String, String> wordCountsStructured = wordCounts.toStream()
            .map((key, value) -> new KeyValue<>(null, MapValuesToIncludeColumnData(key, value).toString()));

    KStream<String, String> wordCountsPeek = wordCountsStructured.peek(
            (key, value) -> System.out.println("key: " + key + "value:" + value)
    );

    wordCountsStructured.to("test-output2", Produced.with(Serdes.String(), Serdes.String()));

    return builder.build();
}

public static void main(String[] args) {
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application1111");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "myIPAddress");
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    TwitterWordCounter wordCountApp = new TwitterWordCounter();

    KafkaStreams streams = new KafkaStreams(wordCountApp.createTopology(), config);
    streams.start();

    // shutdown hook to correctly close the streams application
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

}

//this method is used for taking a tweet and transforming it to a representation of the words in it plus the date
public static List<JsonObject> tweetWordDateMapper(Tweet tweet) {
    try{

        List<String> words = Arrays.asList(tweet.tweetText.split("\\W+"));
        List<JsonObject> tweetsJson = new ArrayList<JsonObject>();
        for(String word: words) {
            JsonObject tweetJson = new JsonObject();
            tweetJson.add("date", new JsonPrimitive(tweet.formattedDate().toString()));
            tweetJson.add("word", new JsonPrimitive(word));
            tweetsJson.add(tweetJson);
        }

        return tweetsJson;
    }
    catch (Exception e) {
        System.out.println(e);
        System.out.println(tweet.serialize().toString());
        return new ArrayList<JsonObject>();
    }

}

public JsonObject MapValuesToIncludeColumnData(JsonObject key, Long countOfWord) {
    key.addProperty("count", countOfWord); //new JsonPrimitive(count));
    return key;
}

推荐答案

因为您在 groupBy() 之前执行了一个键更改操作,它将创建一个重新分区主题,并且对于该主题,它将依赖于默认键,值 serdes,您已将其设置为 String Serde.

Because you are performing a key changing operation before the groupBy(), it will create a repartition topic and for that topic, it will rely on the default key, value serdes, which you have set to String Serde.

您可以将 groupBy() 调用修改为 groupBy(Grouped.with(StringSerde,JsonSerde),这应该会有所帮助.

You can modify the groupBy() call to groupBy(Grouped.with(StringSerde,JsonSerde) and this should help.

这篇关于如何使 Serdes 与多步 kafka 流一起工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆