Spring Cloud Kafka:当两个处理器处于活动状态时,无法序列化输出流的数据 [英] Spring Cloud Kafka: Can't serialize data for output stream when two processors are active

查看:37
本文介绍了Spring Cloud Kafka:当两个处理器处于活动状态时,无法序列化输出流的数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个具有函数式编程风格的 Spring Cloud Kafka Streams 的工作设置.有两个用例,通过 application.properties 进行配置.它们都单独工作,但是一旦我同时激活它们,我就会收到第二个用例的输出流的序列化错误:

线程异常ActivitiesAppId-05296224-5ea1-412a-aee4-1165870b5c75-StreamThread-1"org.apache.kafka.streams.errors.StreamsException:将记录发送到任务 0_0 的主题 outputActivities 时遇到错误,原因是:...引起:org.apache.kafka.common.errors.SerializationException:无法序列化主题 [outputActivities] 的数据 [com.example.connector.model.Activity@497b37ff]引起:com.fasterxml.jackson.databind.exc.InvalidDefinitionException:不兼容的类型:声明的根类型([简单类型,类 com.example.connector.model.Material])与 com.example.connector.model.Activity

这里的最后一行很重要,因为声明的根类型"是来自 Material 类,而不是 Activity 类,这可能是源错误.

同样,当我在启动应用程序之前只激活第二个用例时,一切正常.所以我假设材料"是处理器以某种方式干扰了活动"处理器(或其序列化程序),但我不知道何时何地.


设置

1.) 用例:材料"

  • 一个输入流 ->转换 ->一个输出流

@Beanpublic Function, KStream>流程材料(){...}

application.properties

spring.cloud.stream.kafka.streams.binder.functions.processMaterials.applicationId=MaterialsAppIdspring.cloud.stream.bindings.processMaterials-in-0.destination=inputMaterialsRawspring.cloud.stream.bindings.processMaterials-out-0.destination=outputMaterials

2.) 用例:活动"

  • 两个输入流 ->加入 ->一个输出流

@Beanpublic BiFunction<KStream<String, ActivityRaw>, KStream<String, Assignee>, KStream<String, Activity>流程活动(){...}

application.properties

spring.cloud.stream.kafka.streams.binder.functions.processActivities.applicationId=ActivitiesAppIdspring.cloud.stream.bindings.processActivities-in-0.destination=inputActivitiesRawspring.cloud.stream.bindings.processActivities-in-1.destination=inputAssigneesspring.cloud.stream.bindings.processActivities-out-0.destination=outputActivities

两个处理器在application.properties中也定义为流函数:spring.cloud.stream.function.definition=processActivities;processMaterials

谢谢!

更新 - 以下是我在代码中使用处理器的方式:

实施

//材质模型@Getter@Setter@AllArgsConstructor@NoArgsConstructor公共类 MaterialRaw {私人字符串ID;私人字符串名称;}@Getter@Setter@AllArgsConstructor@NoArgsConstructor公共类材料{私人字符串ID;私人字符串名称;}//材料处理器@豆角,扁豆public Function, KStream>处理材料(){返回材料RawStream ->materialRawStream .map((recordKey, materialRaw) -> {//一些转换final var newId = materialRaw.getId() + "---foo";final var newName = materialRaw.getName() + "---bar";final var material = new Material(newId, newName);//输出返回新的 KeyValue(recordKey, material);};}

//活动模型@Getter@Setter@AllArgsConstructor@NoArgsConstructor公共类 ActivityRaw {私人字符串ID;私人字符串名称;}@Getter@Setter@AllArgsConstructor@NoArgsConstructor公共类受让人{私人字符串ID;私有字符串分配在;}/*** `ActivityRaw` 和 `Assignee` 的组合*/@Getter@Setter@AllArgsConstructor@NoArgsConstructor公共课活动{私人字符串ID;私有整数;私有字符串分配在;}//活动处理器@豆角,扁豆public BiFunction<KStream<String, ActivityRaw>, KStream<String, Assignee>, KStream<String, Activity>流程活动(){返回(activityRawStream,assigneesStream)->{final var joinWindow = JoinWindows.of(Duration.ofDays(30));最终 var streamJoined = StreamJoined.with(Serdes.String(),新的 JsonSerde(ActivityRaw.class),新的 JsonSerde(Assignee.class));最终 varjoinedStream = activityRawStream.leftJoin(受让人流,新的 ActivityJoiner(),加入窗口,流加入);最终 var 映射流 =joinedStream.map((recordKey, activity) -> {返回新的 KeyValue<>(recordKey, activity);});返回映射流;};}

解决方案

事实证明,当存在多个具有不同出站目标类型的函数时,绑定程序推断 Serde 类型的方式存在问题,在您的情况下,一个带有 Activity,另一个带有 Material.我们将不得不在活页夹中解决这个问题.我在此处创建了一个问题.>

在此期间,您可以遵循此解决方法.

创建一个自定义的 Serde 类,如下所示.

公共类ActivitySerde扩展JsonSerde{}

然后,使用配置显式使用此 Serde 用于 processActivities 函数的出站.

例如,

spring.cloud.stream.kafka.streams.bindings.processActivities-out-0.producer.valueSerde=com.example.so65003575.ActivitySerde

如果您正在尝试此解决方法,请将软件包更改为合适的软件包.

这是另一种推荐的方法.如果您使用目标类型定义了 Serde 类型的 bean,那么它优先,因为绑定器将与 KStream 类型进行匹配.因此,您也可以在不定义上述解决方法中的额外类的情况下执行此操作.

@Bean公共 Serde活动Serde(){返回新的 JsonSerde(Activity.class);}

这里是 docs 解释了所有这些细节.

I have a working setup for Spring Cloud Kafka Streams with functional programming style. There are two use cases, which are configured via application.properties. Both of them work individually, but as soon as I activate both at the same time, I get a serialization error for the output stream of the second use case:

Exception in thread "ActivitiesAppId-05296224-5ea1-412a-aee4-1165870b5c75-StreamThread-1" org.apache.kafka.streams.errors.StreamsException:
Error encountered sending record to topic outputActivities for task 0_0 due to:
...
Caused by: org.apache.kafka.common.errors.SerializationException:
Can't serialize data [com.example.connector.model.Activity@497b37ff] for topic [outputActivities]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException:
Incompatible types: declared root type ([simple type, class com.example.connector.model.Material]) vs com.example.connector.model.Activity

The last line here is important, as the "declared root type" is from the Material class, but not the Activity class, which is probably the source error.

Again, when I only activate the second use case before starting the application, everything works fine. So I assume that the "Material" processor somehow interfers with the "Activities" processor (or its serializer), but I don't know when and where.


Setup

1.) use case: "Materials"

  • one input stream -> transformation -> one output stream

@Bean
public Function<KStream<String, MaterialRaw>, KStream<String, Material>> processMaterials() {...}

application.properties

spring.cloud.stream.kafka.streams.binder.functions.processMaterials.applicationId=MaterialsAppId
spring.cloud.stream.bindings.processMaterials-in-0.destination=inputMaterialsRaw
spring.cloud.stream.bindings.processMaterials-out-0.destination=outputMaterials

2.) use case: "Activities"

  • two input streams -> joining -> one output stream

@Bean
public BiFunction<KStream<String, ActivityRaw>, KStream<String, Assignee>, KStream<String, Activity>> processActivities() {...}

application.properties

spring.cloud.stream.kafka.streams.binder.functions.processActivities.applicationId=ActivitiesAppId
spring.cloud.stream.bindings.processActivities-in-0.destination=inputActivitiesRaw
spring.cloud.stream.bindings.processActivities-in-1.destination=inputAssignees
spring.cloud.stream.bindings.processActivities-out-0.destination=outputActivities

The two processors are also defined as stream function in application.properties: spring.cloud.stream.function.definition=processActivities;processMaterials

Thanks!

Update - Here's how I use the processors in the code:

Implementation

// Material model
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class MaterialRaw {
    private String id;
    private String name;
}

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Material {
    private String id;
    private String name;
}

// Material processor
@Bean
public Function<KStream<String, MaterialRaw>, KStream<String, Material>> processMaterials() {
    return materialsRawStream -> materialsRawStream .map((recordKey, materialRaw) -> {
        // some transformation
        final var newId = materialRaw.getId() + "---foo";
        final var newName = materialRaw.getName() + "---bar";
        final var material = new Material(newId, newName);

        // output
        return new KeyValue<>(recordKey, material); 
    };
}

// Activity model
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class ActivityRaw {
    private String id;
    private String name;
}

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Assignee {
    private String id;
    private String assignedAt;
}

/**
 * Combination of `ActivityRaw` and `Assignee`
 */
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Activity {
    private String id;
    private Integer number;
    private String assignedAt;
}

// Activity processor
@Bean
public BiFunction<KStream<String, ActivityRaw>, KStream<String, Assignee>, KStream<String, Activity>> processActivities() {
    return (activitiesRawStream, assigneesStream) -> { 
        final var joinWindow = JoinWindows.of(Duration.ofDays(30));

        final var streamJoined = StreamJoined.with(
            Serdes.String(),
            new JsonSerde<>(ActivityRaw.class),
            new JsonSerde<>(Assignee.class)
        );

        final var joinedStream = activitiesRawStream.leftJoin(
            assigneesStream,
            new ActivityJoiner(),
            joinWindow,
            streamJoined
        );

        final var mappedStream = joinedStream.map((recordKey, activity) -> {
            return new KeyValue<>(recordKey, activity);
        });

        return mappedStream;
    };
}

解决方案

This turns out to be an issue with the way the binder infers Serde types when there are multiple functions with different outbound target types, one with Activity and another with Material in your case. We will have to address this in the binder. I created an issue here.

In the meantime, you can follow this workaround.

Create a custom Serde class as below.

public class ActivitySerde extends JsonSerde<Activity> {}

Then, explicitly use this Serde for the outbound of your processActivities function using configuration.

For e.g.,

spring.cloud.stream.kafka.streams.bindings.processActivities-out-0.producer.valueSerde=com.example.so65003575.ActivitySerde

Please change the package to the appropriate one if you are trying this workaround.

Here is another recommended approach. If you define a bean of type Serde with the target type, that takes precedence as the binder will do a match against the KStream type. Therefore, you can also do it without defining that extra class in the above workaround.

@Bean
public Serde<Activity> activitySerde() {
  return new JsonSerde(Activity.class);
}

Here are the docs where it explains all these details.

这篇关于Spring Cloud Kafka:当两个处理器处于活动状态时,无法序列化输出流的数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆