spring kafka:同一主题上的不同 json 有效负载 [英] spring kafka : different json payload on the same topic

查看:18
本文介绍了spring kafka:同一主题上的不同 json 有效负载的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要在不使用父类的情况下在同一个 Kafka Topic(例如,Foo、Bar、Car ...)上发送不同的 JSON 负载

I need to send different JSON payload on the same Kafka Topic (for example, Foo, Bar, Car ...) without using parent class

基于 spring kafka 文档,我可以在类级别使用 @KafkaListener 并在方法级别指定 @KafkaHandler ( doc )

Based on spring kafka documentation, I can use @KafkaListener at the class-level and specify @KafkaHandler at the method level ( doc )

@KafkaListener(topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(Foo foo) {
        ...
    }

    @KafkaHandler
    public void listen(Bar bar) {
        ...
    }

    @KafkaHandler
    public void listen(Car car) {
        ...
    }
}

但我得到这个例外:

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: ConsumerRecord(topic = myTopic, partition = 1, offset = 0, CreateTime = 1508859519287, checksum = 3297149058, serialized key size = -1, serialized value size = 13, key = null, value = {"foo":"foo"})
org.springframework.kafka.KafkaException: No method found for class java.util.LinkedHashMap
    at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:92)
    at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:147)
    at org.springframework.kafka.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:60)
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:188)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:794)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:570)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:745)

我也尝试过这种方法,但它不起作用:

I have also tried this approach but it's not working:

   @KafkaListener(topics = "myTopic")
   public void receive(ConsumerRecord<String, ?> payload)
   {
      if (payload.value() instanceof Foo)
      {
         //
      }
      else if (payload.value() instanceof Car)
      {
         //
      }
   }

如何配置我的生产者和消费者使用 spring kafka 在同一个 Topic 上发送不同的 JSON Payload?

How I can configure my producer and consumer to send different JSON Payload on the same Topic using spring kafka?

推荐答案

使用 JSON 和多方法侦听器时有一个 catch-22;我们需要知道路由到正确方法的类型;我们无法从方法签名推断转换类型,因为我们还不知道要调用哪个方法.

There is a catch-22 when using JSON and multi-method listeners; we need to know the type to route to the right method; we can't infer the type for conversion from the method signature because we don't know which method we want to call yet.

您需要一个消息转换器,它可以根据数据或使用 Header(使用 Kafka 11)确定类型.

You need a message converter that can figure out the type, either from the data, or using a Header (with Kafka 11).

然后,在转换器转换为正确的类型后,我们就可以确定要调用的方法.

Then, after the converter has converted to the proper type, we can then figure out which method to call.

第二次尝试时,ConsumerRecord 将包含原始 JSON;您将需要使用 ObjectMapper 来进行转换,但是,同样,您需要一些关于要转换为哪种类型的提示(或使用蛮力进行转换,直到成功).

With your second attempt, the ConsumerRecord will contain the raw JSON; you will need to use an ObjectMapper to do the conversion but, again, you'll need some hint as to what type you want to convert to (or do it with brute force, until success).

这篇关于spring kafka:同一主题上的不同 json 有效负载的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆