Kafka Streams JsonSerde ObjectNode无法转换为类 [英] Kafka Streams JsonSerde ObjectNode cannot be cast to class

查看:111
本文介绍了Kafka Streams JsonSerde ObjectNode无法转换为类的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在研究Stepane Mareek关于KafkaStreams的示例.

I'm working on an example by Stepane Mareek regarding KafkaStreams.

基本上,该代码接受交易并计算AccountBalance.不幸的是,我得到了ClassCastException,并且我不明白为什么或发生了什么.

Basically, the code takes transactions and calculates the AccountBalance. Unfortunately I get a ClassCastException and i do not understand why or whats happening.

例外:

Exception in thread "bank-balance-dz-application-8f38e2f1-fc8e-4bb9-bcb7-82958aa39aff-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: A serializer (org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual value type (value type: com.fasterxml.jackson.databind.node.ObjectNode). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:204)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)
at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:100)
at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:108)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
Caused by: java.lang.ClassCastException: class com.fasterxml.jackson.databind.node.ObjectNode cannot be cast to class [B (com.fasterxml.jackson.databind.node.ObjectNode is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:104)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:95)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
    at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)

在调试中,它尝试使用"valueAndTimestampSerializer?"(链接中的图片)

因此,我尝试解释《守则》,希望是我未曾理解的权利.

So I try to explain the Code, hopefully my unterstanding ist right.

  1. Banktransaction获取主题的键和值,然后尝试将键解析为String并将值解析为JsonNode
  2. 然后我按键对条目进行分组并汇总值.(我是否必须再次传递Serdes?如果不传递它们,它们会被序列化为String吗?)
  3. 我在表格中生成主题为一次一次的银行结余".(我必须执行".toStream()"吗?)
  1. Banktransaction takes the key and value of the topic and try to parse the key to String and the value to JsonNode
  2. Then I group the entries by key and aggregate the values. (Do I have to pass the Serdes again? If i don't pass them would they be serialized as String?)
  3. I produce the table to the topic "bank-balance-exactly-once" (Do i have to do the ".toStream()"?)

我的"代码:

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.time.Instant;
import java.util.Properties;

public class BankBalanceStream {


    public static void main(String[] args) {
        Properties properties = new Properties();

        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "bank-balance-application");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "0");
        properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

        final Serializer<JsonNode> jsonNodeSerializer = new JsonSerializer();
        final Deserializer<JsonNode> jsonNodeDeserializer = new JsonDeserializer();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonNodeSerializer, jsonNodeDeserializer);

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, JsonNode> bankTransactions = builder.stream("bank-transactions", Consumed.with(Serdes.String(), jsonSerde));
        
        ObjectNode initialBalance = JsonNodeFactory.instance.objectNode();
        initialBalance.put("count", 0);
        initialBalance.put("balance", 0);
        initialBalance.put("time", Instant.ofEpochMilli(0L).toString());

        KTable<String, JsonNode> bankBalance = bankTransactions
                .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
                .aggregate(
                        () -> initialBalance,
                        (key, transaction, balance) -> newBalance(transaction, balance)
                );

        bankBalance.toStream().to("bank-balance-exactly-once", Produced.with(Serdes.String(), jsonSerde));

        KafkaStreams stream = new KafkaStreams(builder.build(), properties);
        stream.cleanUp();
        stream.start();

        System.out.println(stream.toString());

        Runtime.getRuntime().addShutdownHook(new Thread(stream::close));

    }

    private static JsonNode newBalance(JsonNode transaction, JsonNode balance) {
        ObjectNode newBalance = JsonNodeFactory.instance.objectNode();
        newBalance.put("count", balance.get("count").asInt() + 1);
        newBalance.put("balance", balance.get("balance").asInt() + transaction.get("amount").asInt());

        Long balanceEpoch = Instant.parse(balance.get("time").asText()).toEpochMilli();
        Long transactionEpoch = Instant.parse(transaction.get("time").asText()).toEpochMilli();
        Instant newBalanceInstant = Instant.ofEpochMilli(Math.max(balanceEpoch, transactionEpoch));
        newBalance.put("time", newBalanceInstant.toString());
        return newBalance;

    }
}

推荐答案

在聚合步骤中,我不得不再次提供Serdes.

In the aggregate-step I had to provide the serdes again.

 KTable<String, JsonNode> bankBalance = bankTransactions
            .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
            .aggregate(
                    () -> initialBalance,
                    (key, transaction, balance) -> newBalance(transaction, balance),
                    Named.as("bank-Transaction-Aggregate"),
                    Materialized.with(Serdes.String(), jsonSerde)
            );

这篇关于Kafka Streams JsonSerde ObjectNode无法转换为类的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆