Spring-kafka-test 中的 ClassCastException 使用 `merger()` [英] ClassCastException in spring-kafka-test using `merger()`

查看:19
本文介绍了Spring-kafka-test 中的 ClassCastException 使用 `merger()`的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想通过使用 kafka-streams-test-utils 的单元测试来测试我的 Kafka Streams 拓扑.我使用这个库已经很长时间了,我已经使用 TestNG 围绕我的测试构建了一些抽象层.但是由于我向流中添加了 merge(...),因此出现以下异常:

 org.apache.kafka.streams.errors.StreamsException:在进程中捕获异常.taskId=0_0,处理器=KSTREAM-SOURCE-0000000001,topic=my-topic-2,partition=0,offset=0在 org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:318)在 org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:393)引起:org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer/value: org.apache.kafka.common.serialization.ByteArraySerializer) 不兼容实际的键或值类型(键类型:com.MyKey/值类型:com.MyValue).更改 StreamConfig 中的默认 Serdes 或通过方法参数提供正确的 Serdes.在 org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)在 org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42)在 org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)在 org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)在 org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)在 org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)在 org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302)... 3个引起:java.lang.ClassCastException:class com.MyKey 无法转换为类 [B(com.MyValue 位于加载器app"的未命名模块中;[B 位于加载器bootstrap"的模块 java.base 中)在 org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)在 org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:156)在 org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:101)在 org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)……还有 15 个

这是我如何使用 TopologyTestDriver 的 StreamBuilder 构建流的部分:

//第 1 块KStream流 2 = 流构建器.流("我的主题 2",consumerAs(OtherKey.class, OtherValue.class, AllowEmpty.NONE)//提供默认的json Serde).平面地图((键,值)->{列表<KeyValue<MyKey,MyValue>>list = new ArrayList<>();//做东西填写列表退货清单;}).through("tmp-topic");//第 2 块KStream[] 分支 = stream1.merge(stream2)... 商业的东西

为了生成关于源主题的消息,我使用了用 JsonSerDes 初始化的 TopologyTestDriver.pipeInput(...).通过转换 ByteArray 发生异常,但我不知道为什么 ByteArraySerializer 的预期参数是同一个类,但来自另一个模块而不是加载的消耗类.它们也可能由另一个类加载器加载.但是后台没有Spring堆栈,一切都应该同步运行.

我对这种行为感到非常困惑.

Apache Kafka Dependecies 的版本为:2.0.1,我使用的是 openjdk-11.是否可以对齐序列化程序的类加载?仅当我在 my-topic-2 上生成某些内容时,才会发生错误,合并的另一个主题工作正常.

解决方案

正如@bbejeck 所提到的,你需要使用 不同版本的.through(),允许您覆盖应用于 K, V 的默认 (ByteArraySerde) serdes.

<块引用>

KStream通过(java.lang.String 主题,产生的<K,V>制作)

将此流物化为主题,并使用 Produced 实例从主题创建新的 KStream,以配置 key serdevalue serde 和 StreamPartitioner.... 这相当于调用 to(someTopic, Produced.with(keySerde, valueSerde) 和 StreamsBuilder#stream(someTopicName, Consumed.with(keySerde, valueSerde)).

I want to test my Kafka Streams topology with a unit test using kafka-streams-test-utils. I'm using this library already a longer time and I built already some abstract layer around my tests using TestNG. But since I added a merge(...) to my Stream, I got the following Exception:

 org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, topic=my-topic-2, partition=0, offset=0
 at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:318)
at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:393)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: com.MyKey / value type: com.MyValue). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302)
... 3 more
Caused by: java.lang.ClassCastException: class com.MyKey cannot be cast to class [B (com.MyValue is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:156)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:101)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 15 more

Here is the part how I build the Stream with the StreamBuilder of the TopologyTestDriver:

// Block 1
KStream<MyKey, MyValue> stream2 = streamsBuilder.stream(
    "my-topic-2",
    consumedAs(OtherKey.class, OtherValue.class, AllowEmpty.NONE) // Provides default json Serde
).flatMap(
    (key, value) -> {
        List<KeyValue<MyKey, MyValue>> list = new ArrayList<>();
        // Do stuff an fill out the list
        return list;
    })
 .through("tmp-topic");

// Block 2
KStream<MyKey, MyValue>[] branches = stream1
    .merge(stream2)
    ... business stuff

For producing messages on the source topic, I'm using TopologyTestDriver.pipeInput(...) initialized with JsonSerDes. The Exception happens by casting the ByteArray, but I don't know why the expected parameter of the ByteArraySerializer is the same class but from another module than the consumed class loaded. They might also loaded by another ClassLoaders. But there is no Spring stack in the background and everything should runs synchronous.

I'm really confused about this behavior.

Apache Kafka Dependecies have the version: 2.0.1 and I'm using openjdk-11. Is it possible to align the classloading of the serializers? The error occurs only, if I produce something on: my-topic-2, the other topic of the merge works fine.

解决方案

As mentioned by @bbejeck, you would need to use a different version of .through(), the one that allows you to override default (ByteArraySerde) serdes applied to K, V.

KStream<K,V> through​(java.lang.String topic,
                     Produced<K,V> produced) 

Materialize this stream to a topic and creates a new KStream from the topic using the Produced instance for configuration of the key serde, value serde, and StreamPartitioner. ... This is equivalent to calling to(someTopic, Produced.with(keySerde, valueSerde) and StreamsBuilder#stream(someTopicName, Consumed.with(keySerde, valueSerde)).

这篇关于Spring-kafka-test 中的 ClassCastException 使用 `merger()`的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆