我有一个用于 kafka 连接的 kafka 管道(json 问题更新) [英] i have a kafka pipeline (json problem update) for kafka connect

查看:16
本文介绍了我有一个用于 kafka 连接的 kafka 管道(json 问题更新)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以我根据一些建议进行了更新.但流应用程序在一段时间后终止.没有表演.ide 显示的以下代码中没有错误.最后,我将数据发送到主题,因为键等于字符串,值作为 json 对象.还是不行.

so i updated according to some suggestions. but the streams application terminates after some time. without performing. no error in below code shown by ide. at last i'm sending data to topic as key equals string and value as a json object. still not working.

我猜它是一条线或其他东西,但不确定我是否正确.请.还附上了下面的错误截图.

i guess its a line or something but not sure if im right. please. also attached the error screenshot below.

 Serializer<JsonNode> jsonSerializer = new JsonSerializer();
            Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
            Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
    
            JSONObject jsnObj = new JSONObject();
    
           ......(word count manipulationover part over here)
    
            KTable<Windowed<String>, Long> Ttable = TgroupedStream
                    .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
                    .count();
    
            Ttable
                    .toStream()
                    .selectKey((key, word) -> key.key())
                    .map((key, value) -> {
                                JSONParser par = new JSONParser();
                                StringWriter sw = new StringWriter();
    
                                KeyValue<String, JsonNode> kv = null;
                                try {
                                    ObjectMapper objectMapper = new ObjectMapper();
                                    JsonNode jsonNode = objectMapper.readTree("{ \"word\": \"" + key + "\" \",\" count: \"" + value + "\" }");
                                    KeyValue.pair(key.concat("s"), jsonNode);
                                    kv = KeyValue.pair(key.concat("s"), jsonNode);
    
                                } catch (JsonMappingException e) {
                                    e.printStackTrace();
                                } catch (JsonProcessingException e) {
                                    e.printStackTrace();
                                }
                                return kv;
                            }
                    )
                    .to("badliar", Produced.with(Serdes.String(), jsonSerde));
    
          
            KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
            streams.start();
    
            Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        }

推荐答案

您正在使用包含您想要的确切数据的键值对.您不需要解析任何东西,只需创建 JsonNode 并返回即可.

You're consuming a key-value pair containing the exact data you want. You dont need to parse anything, just create the JsonNode and return it.

final ObjectMapper mapper = new ObjectMapper();

Ttable
        .toStream()
        .selectKey((key, word) -> key.key())
        .map((key, value) -> {
             ObjectNode rootNode = mapper.createObjectNode();

             rootNode.put("word", key);
             rootNode.put("count", value);
                        
             return new KeyValue.pair(key, jsonNode);           
        })

如果不修改键,也可以使用 mapValues 代替 map

You can also use mapValues instead of map if you aren't modifying the key

这篇关于我有一个用于 kafka 连接的 kafka 管道(json 问题更新)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆