Spring Cloud Kafka:当两个处理器处于活动状态时,无法序列化输出流的数据 [英] Spring Cloud Kafka: Can't serialize data for output stream when two processors are active

查看:75
本文介绍了Spring Cloud Kafka:当两个处理器处于活动状态时,无法序列化输出流的数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我具有功能编程风格的Spring Cloud Kafka Streams的有效设置.有两种使用案例,它们是通过 application.properties 配置的.它们都可以单独工作,但是一旦我同时激活它们,就会收到第二个用例的输出流的序列化错误:

 线程"ActivitiesAppId-05296224-5ea1-412a-aee4-1165870b5c75-StreamThread-1"中的异常;org.apache.kafka.streams.errors.StreamsException:由于以下原因,将记录发送到任务0_0的主题outputActivities时遇到错误:...引起原因:org.apache.kafka.common.errors.SerializationException:无法序列化主题[outputActivities]的数据[com.example.connector.model.Activity@497b37ff]引起原因:com.fasterxml.jackson.databind.exc.InvalidDefinitionException:不兼容的类型:声明的根类型([简单类型,类com.example.connector.model.Material])vs com.example.connector.model.Activity 

这里的最后一行很重要,因为声明的根类型"来自 Material 类,而不来自 Activity 类,这可能是源错误.

同样,当我在启动应用程序之前仅激活第二个用例时,一切正常.因此,我认为材料"是指处理器以某种方式干扰了活动".处理器(或其序列化程序),但我不知道何时何地.


设置

1.)用例:材料"

  • 一个输入流->转换->一个输出流

 <代码> @Beanpublic Function< KStream< String,MaterialRaw> ;, KStream< String,Material>processMaterials(){...} 

application.properties

  spring.cloud.stream.kafka.streams.binder.functions.processMaterials.applicationId = MaterialsAppIdspring.cloud.stream.bindings.processMaterials-in-0.destination = inputMaterialsRawspring.cloud.stream.bindings.processMaterials-out-0.destination = outputMaterials 

2.)用例:活动"

  • 两个输入流->加入->一个输出流

 <代码> @Bean公共BiFunction< KStream< String,ActivityRaw>,KStream< String,Assignee>,KStream< String,Activity>processActivities(){...} 

application.properties

  spring.cloud.stream.kafka.streams.binder.functions.processActivities.applicationId = ActivitiesAppIdspring.cloud.stream.bindings.processActivities-in-0.destination = inputActivitiesRawspring.cloud.stream.bindings.processActivities-in-1.destination = inputAssigneesspring.cloud.stream.bindings.processActivities-out-0.destination = outputActivities 

这两个处理器在 application.properties 中也定义为流函数: spring.cloud.stream.function.definition = processActivities; processMaterials

谢谢!

更新-这是我在代码中使用处理器的方式:

实施

 //材质模型@盖特@Setter@AllArgsConstructor@NoArgsConstructor公共类MaterialRaw {私有字符串ID;私有字符串名称;}@盖特@Setter@AllArgsConstructor@NoArgsConstructor公共类材料{私有字符串ID;私有字符串名称;}//物料处理器@豆public Function< KStream< String,MaterialRaw> ;, KStream< String,Material>processMaterials(){返回物料RawStream->materialsRawStream .map((recordKey,materialRaw)-> {//一些转换final var newId = materialRaw.getId()+"--- foo";final var newName = materialRaw.getName()+"--- bar";final var material = new Material(newId,newName);//输出返回新的KeyValue<>(recordKey,material);};} 

 //活动模型@盖特@Setter@AllArgsConstructor@NoArgsConstructor公共类ActivityRaw {私有字符串ID;私有字符串名称;}@盖特@Setter@AllArgsConstructor@NoArgsConstructor公共类受让人{私有字符串ID;私有字符串assignedAt;}/***"ActivityRaw"和"Assignee"的组合*/@盖特@Setter@AllArgsConstructor@NoArgsConstructor公共课活动{私有字符串ID;私人整数私有字符串assignedAt;}//活动处理器@豆公共BiFunction< KStream< String,ActivityRaw>,KStream< String,Assignee>,KStream< String,Activity>processActivities(){返回(activitiesRawStream,assigneesStream)->{final var joinWindow = JoinWindows.of(Duration.ofDays(30));final var streamJoined = StreamJoined.with(Serdes.String(),新的JsonSerde(ActivityRaw.class),新的JsonSerde(Assignee.class));最后的var joinedStream = ActivitiesRawStream.leftJoin(AssigneesStream,新的ActivityJoiner(),joinWindow,流加入);最后的var mappingStream = joinedStream.map((recordKey,activity)-> {返回新的KeyValue<>(recordKey,activity);});返回被映射的流;};} 

解决方案

当存在多个具有不同出站目标类型的函数时,绑定器推断 Serde 类型的方式可能是一个问题,一个是 Activity ,另一个是 Material .我们将不得不在活页夹中解决这个问题.我在此处中创建了一个问题..>

与此同时,您可以按照以下解决方法进行操作.

创建一个自定义的 Serde 类,如下所示.

 公共类ActivitySerde扩展了JsonSerde< Activity>.{} 

然后,使用配置将此 Serde 明确用于 processActivities 函数的出站.

例如,

  spring.cloud.stream.kafka.streams.bindings.processActivities-out-0.producer.valueSerde = com.example.so65003575.ActivitySerde 

如果尝试这种解决方法,请将该软件包更改为适当的软件包.

这是另一种推荐的方法.如果您使用目标类型定义类型为 Serde 的bean,则其优先级较高,因为绑定程序将与 KStream 类型进行匹配.因此,您也可以在上述解决方法中不定义额外的类的情况下进行操作.

  @Bean公共Serde< Activity>activitySerde(){返回新的JsonSerde(Activity.class);} 

这是Exception in thread "ActivitiesAppId-05296224-5ea1-412a-aee4-1165870b5c75-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic outputActivities for task 0_0 due to: ... Caused by: org.apache.kafka.common.errors.SerializationException: Can't serialize data [com.example.connector.model.Activity@497b37ff] for topic [outputActivities] Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Incompatible types: declared root type ([simple type, class com.example.connector.model.Material]) vs com.example.connector.model.Activity

The last line here is important, as the "declared root type" is from the Material class, but not the Activity class, which is probably the source error.

Again, when I only activate the second use case before starting the application, everything works fine. So I assume that the "Material" processor somehow interfers with the "Activities" processor (or its serializer), but I don't know when and where.


Setup

1.) use case: "Materials"

  • one input stream -> transformation -> one output stream

@Bean
public Function<KStream<String, MaterialRaw>, KStream<String, Material>> processMaterials() {...}

application.properties

spring.cloud.stream.kafka.streams.binder.functions.processMaterials.applicationId=MaterialsAppId
spring.cloud.stream.bindings.processMaterials-in-0.destination=inputMaterialsRaw
spring.cloud.stream.bindings.processMaterials-out-0.destination=outputMaterials

2.) use case: "Activities"

  • two input streams -> joining -> one output stream

@Bean
public BiFunction<KStream<String, ActivityRaw>, KStream<String, Assignee>, KStream<String, Activity>> processActivities() {...}

application.properties

spring.cloud.stream.kafka.streams.binder.functions.processActivities.applicationId=ActivitiesAppId
spring.cloud.stream.bindings.processActivities-in-0.destination=inputActivitiesRaw
spring.cloud.stream.bindings.processActivities-in-1.destination=inputAssignees
spring.cloud.stream.bindings.processActivities-out-0.destination=outputActivities

The two processors are also defined as stream function in application.properties: spring.cloud.stream.function.definition=processActivities;processMaterials

Thanks!

Update - Here's how I use the processors in the code:

Implementation

// Material model
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class MaterialRaw {
    private String id;
    private String name;
}

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Material {
    private String id;
    private String name;
}

// Material processor
@Bean
public Function<KStream<String, MaterialRaw>, KStream<String, Material>> processMaterials() {
    return materialsRawStream -> materialsRawStream .map((recordKey, materialRaw) -> {
        // some transformation
        final var newId = materialRaw.getId() + "---foo";
        final var newName = materialRaw.getName() + "---bar";
        final var material = new Material(newId, newName);

        // output
        return new KeyValue<>(recordKey, material); 
    };
}

// Activity model
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class ActivityRaw {
    private String id;
    private String name;
}

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Assignee {
    private String id;
    private String assignedAt;
}

/**
 * Combination of `ActivityRaw` and `Assignee`
 */
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Activity {
    private String id;
    private Integer number;
    private String assignedAt;
}

// Activity processor
@Bean
public BiFunction<KStream<String, ActivityRaw>, KStream<String, Assignee>, KStream<String, Activity>> processActivities() {
    return (activitiesRawStream, assigneesStream) -> { 
        final var joinWindow = JoinWindows.of(Duration.ofDays(30));

        final var streamJoined = StreamJoined.with(
            Serdes.String(),
            new JsonSerde<>(ActivityRaw.class),
            new JsonSerde<>(Assignee.class)
        );

        final var joinedStream = activitiesRawStream.leftJoin(
            assigneesStream,
            new ActivityJoiner(),
            joinWindow,
            streamJoined
        );

        final var mappedStream = joinedStream.map((recordKey, activity) -> {
            return new KeyValue<>(recordKey, activity);
        });

        return mappedStream;
    };
}

解决方案

This turns out to be an issue with the way the binder infers Serde types when there are multiple functions with different outbound target types, one with Activity and another with Material in your case. We will have to address this in the binder. I created an issue here.

In the meantime, you can follow this workaround.

Create a custom Serde class as below.

public class ActivitySerde extends JsonSerde<Activity> {}

Then, explicitly use this Serde for the outbound of your processActivities function using configuration.

For e.g.,

spring.cloud.stream.kafka.streams.bindings.processActivities-out-0.producer.valueSerde=com.example.so65003575.ActivitySerde

Please change the package to the appropriate one if you are trying this workaround.

Here is another recommended approach. If you define a bean of type Serde with the target type, that takes precedence as the binder will do a match against the KStream type. Therefore, you can also do it without defining that extra class in the above workaround.

@Bean
public Serde<Activity> activitySerde() {
  return new JsonSerde(Activity.class);
}

Here are the docs where it explains all these details.

这篇关于Spring Cloud Kafka:当两个处理器处于活动状态时,无法序列化输出流的数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆