如何使用 Header 从一条 Kafka 消息中获取多个对象 [英] How to get multiple objects from one Kafka message using Header
问题描述
我想从一条消息(来自同一主题)中反序列化不同的对象,并根据对象类型将其保存/更新到数据库中的相应表.正如加里提到的这里我可以使用标题来提供我的对象必须反序列化为哪种类型的信息.您能否提供有关如何实现这一目标的示例?
参见 sample-02 此处.
<块引用>示例 2
<块引用>
这个示例演示了一个简单的生产者和一个多方法的消费者;生产者发送 Foo1
和 Bar1
类型的对象,消费者接收 Foo2
和 Bar2
类型的对象(对象具有相同的字段,foo
).
<块引用>
生产者使用一个JsonSerializer
;消费者使用 ByteArrayDeserializer
和 ByteArrayJsonMessageConverter
一起转换为侦听器方法参数的所需类型.在这种情况下我们无法推断类型(因为类型用于选择要调用的方法).因此,我们在生产者和消费者端配置类型映射.请参阅生产者端的 application.yml
和消费者端的 converter
bean.
<块引用>
MultiMethods
@KafkaListener
有 3 个方法;一个用于每个已知对象,一个用于其他对象的后备默认方法.
<块引用>
运行应用程序并使用 curl 发送一些数据:
<块引用>
$ curl -X POST http://localhost:8080/send/foo/bar
<块引用>
$ curl -X POST http://localhost:8080/send/bar/baz
<块引用>
$ curl -X POST http://localhost:8080/send/unknown/xxx
<块引用>
控制台:
<块引用>
收到:Foo2 [foo=bar]
<块引用>
收到:Bar2 [bar=baz]
<块引用>
收到未知:xxx
I want to deserialize different objects from one message (from the same topic), and according to object type save/update it to the appropriate table in DB. As Gary mentioned here I can use headers to provide information to which type my object must be deserialized. Can you help with examples of how to achieve this?
See sample-02 here.
Sample 2
This sample demonstrates a simple producer and a multi-method consumer; the producer sends objects of types
Foo1
andBar1
and the consumer receives objects of typeFoo2
andBar2
(the objects have the same field,foo
).
The producer uses a
JsonSerializer
; the consumer uses aByteArrayDeserializer
, together with aByteArrayJsonMessageConverter
which converts to the required type of the listener method argument. We can't infer the type in this case (because the type is used to choose the method to call). We therefore configure type mapping on the producer and consumer side. See theapplication.yml
for the producer side and theconverter
bean on the consumer side.
The
MultiMethods
@KafkaListener
has 3 methods; one for each of the known objects and a fallback default method for others.
Run the application and use curl to send some data:
$ curl -X POST http://localhost:8080/send/foo/bar
$ curl -X POST http://localhost:8080/send/bar/baz
$ curl -X POST http://localhost:8080/send/unknown/xxx
Console:
Received: Foo2 [foo=bar]
Received: Bar2 [bar=baz]
Received unknown: xxx
这篇关于如何使用 Header 从一条 Kafka 消息中获取多个对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!