Kafka Streams JsonSerde ObjectNode 无法转换为类 [英] Kafka Streams JsonSerde ObjectNode cannot be cast to class

查看:24
本文介绍了Kafka Streams JsonSerde ObjectNode 无法转换为类的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在研究 Stepane Mareek 的一个关于 KafkaStreams 的例子.

I'm working on an example by Stepane Mareek regarding KafkaStreams.

基本上,代码获取交易并计算 AccountBalance.不幸的是,我收到了 ClassCastException,我不明白为什么或发生了什么.

Basically, the code takes transactions and calculates the AccountBalance. Unfortunately I get a ClassCastException and i do not understand why or whats happening.

例外:

Exception in thread "bank-balance-dz-application-8f38e2f1-fc8e-4bb9-bcb7-82958aa39aff-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: A serializer (org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual value type (value type: com.fasterxml.jackson.databind.node.ObjectNode). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:204)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)
at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:100)
at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:108)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
Caused by: java.lang.ClassCastException: class com.fasterxml.jackson.databind.node.ObjectNode cannot be cast to class [B (com.fasterxml.jackson.databind.node.ObjectNode is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:104)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:95)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
    at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)

在调试中它尝试使用valueAndTimestampSerializer?"(链接中的图片)

所以我尝试解释准则,希望我的理解是正确的.

So I try to explain the Code, hopefully my unterstanding ist right.

  1. Banktransaction 获取主题的键和值,并尝试将键解析为 String,将值解析为 JsonNode
  2. 然后我按键对条目进行分组并汇总值.(我是否必须再次通过 Serdes?如果我不通过它们,它们会被序列化为 String 吗?)
  3. 我制作了主题为bank-balance-exactly-once"的表格;(我必须执行.toStream()"吗?)

我的"代码:

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.time.Instant;
import java.util.Properties;

public class BankBalanceStream {


    public static void main(String[] args) {
        Properties properties = new Properties();

        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "bank-balance-application");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "0");
        properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

        final Serializer<JsonNode> jsonNodeSerializer = new JsonSerializer();
        final Deserializer<JsonNode> jsonNodeDeserializer = new JsonDeserializer();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonNodeSerializer, jsonNodeDeserializer);

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, JsonNode> bankTransactions = builder.stream("bank-transactions", Consumed.with(Serdes.String(), jsonSerde));
        
        ObjectNode initialBalance = JsonNodeFactory.instance.objectNode();
        initialBalance.put("count", 0);
        initialBalance.put("balance", 0);
        initialBalance.put("time", Instant.ofEpochMilli(0L).toString());

        KTable<String, JsonNode> bankBalance = bankTransactions
                .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
                .aggregate(
                        () -> initialBalance,
                        (key, transaction, balance) -> newBalance(transaction, balance)
                );

        bankBalance.toStream().to("bank-balance-exactly-once", Produced.with(Serdes.String(), jsonSerde));

        KafkaStreams stream = new KafkaStreams(builder.build(), properties);
        stream.cleanUp();
        stream.start();

        System.out.println(stream.toString());

        Runtime.getRuntime().addShutdownHook(new Thread(stream::close));

    }

    private static JsonNode newBalance(JsonNode transaction, JsonNode balance) {
        ObjectNode newBalance = JsonNodeFactory.instance.objectNode();
        newBalance.put("count", balance.get("count").asInt() + 1);
        newBalance.put("balance", balance.get("balance").asInt() + transaction.get("amount").asInt());

        Long balanceEpoch = Instant.parse(balance.get("time").asText()).toEpochMilli();
        Long transactionEpoch = Instant.parse(transaction.get("time").asText()).toEpochMilli();
        Instant newBalanceInstant = Instant.ofEpochMilli(Math.max(balanceEpoch, transactionEpoch));
        newBalance.put("time", newBalanceInstant.toString());
        return newBalance;

    }
}

推荐答案

在聚合步骤中,我不得不再次提供 serdes.

In the aggregate-step I had to provide the serdes again.

 KTable<String, JsonNode> bankBalance = bankTransactions
            .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
            .aggregate(
                    () -> initialBalance,
                    (key, transaction, balance) -> newBalance(transaction, balance),
                    Named.as("bank-Transaction-Aggregate"),
                    Materialized.with(Serdes.String(), jsonSerde)
            );

这篇关于Kafka Streams JsonSerde ObjectNode 无法转换为类的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆