聚合中使用了错误的序列化程序 [英] Wrong serializers used on aggregate

查看:17
本文介绍了聚合中使用了错误的序列化程序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在处理一个 kafka-streams 应用程序,我在其中处理日志事件.在这种情况下,我想将 WorkflowInput 类型聚合为 Workflow 类型.我在使聚合正常工作时遇到问题.

I am working of a kafka-streams application in which I process log events. In this case I want to aggregate WorkflowInput types into a Workflow type. I am having issues getting the aggregate working.

final KStream<String, WorkflowInput> filteredStream = someStream;
final KTable<String, Workflow> aggregatedWorkflows = filteredStream
    .peek((k, v) -> {
        if (!(v instanceof WorkflowInput)) {
            throw new AssertionError("Type not expected");
        }
    })
    .groupByKey()
    .<Workflow>aggregate(Workflow::new, (k, input, workflow) -> workflow.updateFrom(input),
            Materialized.<String, Workflow, KeyValueStore<Bytes, byte[]>>as("worflow-cache")
                .withKeySerde(Serdes.String())
                .withValueSerde(Serdes.serdeFrom(new JsonSerializer<Workflow>(), new JsonDeserializer<Workflow>(Workflow.class)));

我得到以下异常:Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer/value: org.apache.kafka.common.serialization.StringSerializer)与实际的键或值类型(键类型:java.lang.String/值类型:workflowauditstreamer.WorkflowInput)不兼容.

I get the following exception:Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: workflowauditstreamer.WorkflowInput).

注意两点:* 值序列化器是一个 StringSerializer,而我使用 withValueSerde 配置了一些不同的东西.* 实际值类型是 WorkflowInput 而我期望 Workflow 因为这是我的聚合值类型.

Two things to notice: * The value serializer is a StringSerializer, while I configured something different using withValueSerde. * The actual value type is WorkflowInput while I expect Workflow since that is my aggregated value type.

我是 kafka-streams 的新手,所以我可能会遗漏一些明显的东西,但我无法弄清楚.我在这里错过了什么?

I am new to kafka-streams so I might be missing something obvious, but I cannot figure it out. What am I missing here?

推荐答案

如果您从配置中覆盖默认的 Serde,它将在操作员就地覆盖中进行.它不会向下游传播(Kafka 2.0 - 有 WIP 可以改进这一点).

If you overwrite default Serde from the config, it's in operator in-place overwrite. It's not propagated downstream (Kafka 2.0 -- there is WIP to improve this).

因此,您需要将您在 someStream = builder.stream(...) 中使用的 Serde 传入 .groupByKey(Serialized.with(...)),也是.

Thus, you will need to pass in the Serdes you use in someStream = builder.stream(...) into .groupByKey(Serialized.with(...)), too.

这篇关于聚合中使用了错误的序列化程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆