带有Avro的Spring Cloud Stream无法正确转换字符串消息 [英] Spring Cloud Stream with Avro cannot correctly convert String message
本文介绍了带有Avro的Spring Cloud Stream无法正确转换字符串消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
此问题发生在我设置Avro消息转换器之后。如果我删除Avro消息转换器并使用StreamListener处理消息转换,它将工作得很好。
源应用程序.属性
spring.cloud.stream.bindings.toGreeting.destination=greeting
spring.cloud.stream.bindings.toGreeting.contentType=application/*+avro
spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled=true
接收器应用程序
server.port=8990
spring.cloud.stream.bindings.greeting.destination=greeting
消息转换器
@Configuration
@EnableSchemaRegistryClient
public class MessageConverterConfig {
@Bean
public MessageConverter topic1MessageConverter() throws IOException {
return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
}
}
应用程序类
@SpringBootApplication
@EnableSchemaRegistryClient
public class SourceApplication {
public static void main(String[] args) {
SpringApplication.run(SourceApplication.class, args);
}
}
@EnableSchemaRegistryServer
@EnableSchemaRegistryClient
@SpringBootApplication
public class SinkApplication {
public static void main(String[] args) {
SpringApplication.run(SinkApplication.class, args);
}
}
我是否缺少配置? 谢谢。
推荐答案
这里有一个简单的规则:
如果您只想拥有一个可以从Avro序列化/反序列化的消息转换器,并且您在配置GenericRecords时提供了架构位置,或者您的StreamListener方法具有一种类型为SpecificRecord的签名。然后选择AvroSchemaMessageConverter
,像您所做的那样进行设置,但使用avro/bytes
。我们保留application/*+avro
用于架构演进支持。
因此,如果您设置@EnableSchemaRegistryClient
,那么您将委托外部注册表来拥有您的架构。在这种情况下,您不仅需要注册中心,还需要在那里注册的模式。
默认情况下,如果启用了spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled
,则生成器将自动注册任何规范记录/通用记录或Pojos类型的有效负载。
在这种情况下,假设您的主题是用户并且是第一个版本,则制作人实际上会将标题设置为类似application/vnd.user.v1+avro
的内容。
下游,如果您的使用者也配置了application/*+avro
Content Type,他们将能够读取此Content Type并推断其用于查询架构服务器和检索适当架构的主题/版本。
这篇关于带有Avro的Spring Cloud Stream无法正确转换字符串消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文