我有一个用于 kafka 连接的 kafka 管道(json 问题更新) [英] i have a kafka pipeline (json problem update) for kafka connect
问题描述
所以我根据一些建议进行了更新.但流应用程序在一段时间后终止.没有表演.ide 显示的以下代码中没有错误.最后,我将数据发送到主题,因为键等于字符串,值作为 json 对象.还是不行.
so i updated according to some suggestions. but the streams application terminates after some time. without performing. no error in below code shown by ide. at last i'm sending data to topic as key equals string and value as a json object. still not working.
我猜它是一条线或其他东西,但不确定我是否正确.请.还附上了下面的错误截图.
i guess its a line or something but not sure if im right. please. also attached the error screenshot below.
Serializer<JsonNode> jsonSerializer = new JsonSerializer();
Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
JSONObject jsnObj = new JSONObject();
......(word count manipulationover part over here)
KTable<Windowed<String>, Long> Ttable = TgroupedStream
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
Ttable
.toStream()
.selectKey((key, word) -> key.key())
.map((key, value) -> {
JSONParser par = new JSONParser();
StringWriter sw = new StringWriter();
KeyValue<String, JsonNode> kv = null;
try {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree("{ \"word\": \"" + key + "\" \",\" count: \"" + value + "\" }");
KeyValue.pair(key.concat("s"), jsonNode);
kv = KeyValue.pair(key.concat("s"), jsonNode);
} catch (JsonMappingException e) {
e.printStackTrace();
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return kv;
}
)
.to("badliar", Produced.with(Serdes.String(), jsonSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
推荐答案
您正在使用包含您想要的确切数据的键值对.您不需要解析任何东西,只需创建 JsonNode 并返回即可.
You're consuming a key-value pair containing the exact data you want. You dont need to parse anything, just create the JsonNode and return it.
final ObjectMapper mapper = new ObjectMapper();
Ttable
.toStream()
.selectKey((key, word) -> key.key())
.map((key, value) -> {
ObjectNode rootNode = mapper.createObjectNode();
rootNode.put("word", key);
rootNode.put("count", value);
return new KeyValue.pair(key, jsonNode);
})
如果不修改键,也可以使用 mapValues
代替 map
You can also use mapValues
instead of map
if you aren't modifying the key
这篇关于我有一个用于 kafka 连接的 kafka 管道(json 问题更新)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!