如何使用 Header 从一条 Kafka 消息中获取多个对象 [英] How to get multiple objects from one Kafka message using Header

查看:29
本文介绍了如何使用 Header 从一条 Kafka 消息中获取多个对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想从一条消息(来自同一主题)中反序列化不同的对象,并根据对象类型将其保存/更新到数据库中的相应表.正如加里提到的这里我可以使用标题来提供我的对象必须反序列化为哪种类型的信息.您能否提供有关如何实现这一目标的示例?

解决方案

参见 sample-02 此处.

<块引用>

示例 2

<块引用>

这个示例演示了一个简单的生产者和一个多方法的消费者;生产者发送 Foo1Bar1 类型的对象,消费者接收 Foo2Bar2 类型的对象(对象具有相同的字段,foo).

<块引用>

生产者使用一个JsonSerializer;消费者使用 ByteArrayDeserializerByteArrayJsonMessageConverter 一起转换为侦听器方法参数的所需类型.在这种情况下我们无法推断类型(因为类型用于选择要调用的方法).因此,我们在生产者和消费者端配置类型映射.请参阅生产者端的 application.yml 和消费者端的 converter bean.

<块引用>

MultiMethods @KafkaListener 有 3 个方法;一个用于每个已知对象,一个用于其他对象的后备默认方法.

<块引用>

运行应用程序并使用 curl 发送一些数据:

<块引用>

$ curl -X POST http://localhost:8080/send/foo/bar

<块引用>

$ curl -X POST http://localhost:8080/send/bar/baz

<块引用>

$ curl -X POST http://localhost:8080/send/unknown/xxx

<块引用>

控制台:

<块引用>

收到:Foo2 [foo=bar]

<块引用>

收到:Bar2 [bar=baz]

<块引用>

收到未知:xxx

I want to deserialize different objects from one message (from the same topic), and according to object type save/update it to the appropriate table in DB. As Gary mentioned here I can use headers to provide information to which type my object must be deserialized. Can you help with examples of how to achieve this?

解决方案

See sample-02 here.

Sample 2

This sample demonstrates a simple producer and a multi-method consumer; the producer sends objects of types Foo1 and Bar1 and the consumer receives objects of type Foo2 and Bar2 (the objects have the same field, foo).

The producer uses a JsonSerializer; the consumer uses a ByteArrayDeserializer, together with a ByteArrayJsonMessageConverter which converts to the required type of the listener method argument. We can't infer the type in this case (because the type is used to choose the method to call). We therefore configure type mapping on the producer and consumer side. See the application.yml for the producer side and the converter bean on the consumer side.

The MultiMethods @KafkaListener has 3 methods; one for each of the known objects and a fallback default method for others.

Run the application and use curl to send some data:

$ curl -X POST http://localhost:8080/send/foo/bar

$ curl -X POST http://localhost:8080/send/bar/baz

$ curl -X POST http://localhost:8080/send/unknown/xxx

Console:

Received: Foo2 [foo=bar]

Received: Bar2 [bar=baz]

Received unknown: xxx

这篇关于如何使用 Header 从一条 Kafka 消息中获取多个对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆