Spring Kafka:同一主题上的不同json负载 [英] spring kafka : different json payload on the same topic

查看:49
本文介绍了Spring Kafka:同一主题上的不同json负载的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要在不使用父类的情况下,在同一Kafka主题(例如Foo,Bar,Car ...)上发送不同的JSON负载

I need to send different JSON payload on the same Kafka Topic (for example, Foo, Bar, Car ...) without using parent class

基于spring kafka文档,我可以在类级别使用 @KafkaListener ,并在方法级别指定 @KafkaHandler (

Based on spring kafka documentation, I can use @KafkaListener at the class-level and specify @KafkaHandler at the method level ( doc )

@KafkaListener(topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(Foo foo) {
        ...
    }

    @KafkaHandler
    public void listen(Bar bar) {
        ...
    }

    @KafkaHandler
    public void listen(Car car) {
        ...
    }
}

但是我得到了这个例外:

but I get this exception:

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: ConsumerRecord(topic = myTopic, partition = 1, offset = 0, CreateTime = 1508859519287, checksum = 3297149058, serialized key size = -1, serialized value size = 13, key = null, value = {"foo":"foo"})
org.springframework.kafka.KafkaException: No method found for class java.util.LinkedHashMap
    at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:92)
    at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:147)
    at org.springframework.kafka.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:60)
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:188)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:794)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:570)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:745)

我也尝试过这种方法,但是不起作用:

I have also tried this approach but it's not working:

   @KafkaListener(topics = "myTopic")
   public void receive(ConsumerRecord<String, ?> payload)
   {
      if (payload.value() instanceof Foo)
      {
         //
      }
      else if (payload.value() instanceof Car)
      {
         //
      }
   }

如何使用Spring Kafka配置生产者和使用者在同一主题上发送不同的JSON有效载荷?

How I can configure my producer and consumer to send different JSON Payload on the same Topic using spring kafka?

推荐答案

使用JSON和多方法侦听器时有一个问题22.我们需要知道要路由到正确方法的类型;我们无法从方法签名中推断要转换的类型,因为我们尚不知道要调用哪个方法.

There is a catch-22 when using JSON and multi-method listeners; we need to know the type to route to the right method; we can't infer the type for conversion from the method signature because we don't know which method we want to call yet.

您需要一个消息转换器,可以从数据或使用 Header (对于Kafka 11)中找出类型.

You need a message converter that can figure out the type, either from the data, or using a Header (with Kafka 11).

然后,在将转换器转换为正确的类型之后,我们可以找出要调用的方法.

Then, after the converter has converted to the proper type, we can then figure out which method to call.

第二次尝试, ConsumerRecord 将包含原始JSON;您将需要使用 ObjectMapper 进行转换,但是,再次,您将需要有关要转换为哪种类型的提示(或以蛮力操作,直到成功).

With your second attempt, the ConsumerRecord will contain the raw JSON; you will need to use an ObjectMapper to do the conversion but, again, you'll need some hint as to what type you want to convert to (or do it with brute force, until success).

这篇关于Spring Kafka:同一主题上的不同json负载的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆