Spring Cloud Producer 异常 - java.lang.IllegalStateException:生产者关闭后无法执行操作 [英] Spring Cloud Producer exception - java.lang.IllegalStateException: Cannot perform operation after producer has been closed

查看:56
本文介绍了Spring Cloud Producer 异常 - java.lang.IllegalStateException:生产者关闭后无法执行操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Spring Cloud Producer 抛出异常 - java.lang.IllegalStateException: Producer 关闭后无法执行操作

嗨我们给出了一个基于spring cloud的微服务应用.在该应用程序中,我们在向 Kafka 生成消息时遇到错误.我们正在使用以下 Spring Cloud 版本.

Spring Cloud 版本

格林威治.SR1

以下是我们的接口定义,供Kafka生产者使用

 String INPUT = "jobmanager-in";String OUTPUT_C = "集合输出";String OUTPUT_P = "解析器输出";String OUTPUT_CMP = "比较输出";String OUTPUT_R = "报告输出";String OUTPUT_N = "通知输出";@输入(输入)SubscribableChannel inboundJobManager();@输出(输出_C)MessageChannel outboundCollections();@输出(输出_P)MessageChannel outboundParse();@输出(输出_CMP)MessageChannel outboundCompare();@输出(输出_R)MessageChannel outboundReport();@输出(输出_N)MessageChannel outboundNotification();

生产者代码是

public void sendCollectionTask(final MessageT message) {logger.info("发送采集任务::" + message.toString());MessageChannel messageChannel = collectionStream.outboundCollections();boolean l_bool = messageChannel.send(MessageBuilder.withPayload(message).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());logger.info("为收集任务发送消息后发送状态::" + message.getTask().getId() + " : "+ l_bool);}

当我们启动应用程序时,生产者正确生成消息,但一段时间后我们收到错误

2019-09-12 02:07:45,215 错误 com.ericsson.tmo.cm.ccm.jobmanager.util.JobOrchestrator [scheduling-1] 错误跟踪 ::org.springframework.messaging.MessageHandlingException:消息处理程序发生错误 [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@11328ab9];嵌套异常是 java.lang.IllegalStateException: 生产者关闭后无法执行操作在 org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)在 org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:179)

如果我们重新启动应用程序,问题就解决了.感谢您的帮助.

我们的 Spring Cloud 配置是

云:溪流:卡夫卡:粘合剂:经纪人:kafka自动创建主题:假配置:auto.offset.reset:最新最大请求大小:16777216缓冲内存:67108864绑定:收集输出:目的地:集合内容类型:应用程序/json组: 集合制作人:分区数:5自动添加分区:真

解决方案

异常信息很清楚.

<块引用>

生产者关闭后无法进行操作

您的生产者可能被一个线程关闭,而另一个线程上的生产者正在调用 send() 或生产者网络线程仍在发送消息.

<小时>

由于您使用的是 Spring,我认为 KafkaProducer 对象是为您创建的,并通过依赖注入进行注入.

您需要做的就是找出使用相同KafkaProducer对象的人以及close()被调用的位置.

可能的原因

  1. 有时,same KafkaProducer 可能会被同一应用程序的几个其他组件使用(如果您没有在多个生产者之间进行划分)和其中之一可能已关闭它.

    更清楚,可能是您使用相同的生产者 bean 实例为同一实例中的不同组件以及它关闭的某处生成消息.

    通常情况下,我们有不同的生产者生产不同的主题,因为我们有不同的生产者配置和不同类型的数据的主题.

  2. 当我添加一个关闭钩子时,我遇到了类似的异常,其中我正在调用 producer.close() 而另一个线程正在尝试生成.

Spring Cloud Producer throwing exception - java.lang.IllegalStateException: Cannot perform operation after producer has been closed

Hi We gave a spring cloud based microservice application. In that application we are getting error while producing the message to Kafka. We are using following Spring Cloud version.

Spring Cloud Version

Greenwich.SR1

Following are our interface defination which is used by the Kafka producer

    String INPUT = "jobmanager-in";
String OUTPUT_C = "collection-out";
String OUTPUT_P = "parser-out";
String OUTPUT_CMP = "compare-out";
String OUTPUT_R = "report-out";
String OUTPUT_N = "notification-out";

@Input(INPUT)
SubscribableChannel inboundJobManager();

@Output(OUTPUT_C)
    MessageChannel outboundCollections();

@Output(OUTPUT_P)
MessageChannel outboundParse();

@Output(OUTPUT_CMP)
MessageChannel outboundCompare();

@Output(OUTPUT_R)
MessageChannel outboundReport();

@Output(OUTPUT_N)
MessageChannel outboundNotification();

The producer code is

public void sendCollectionTask(final MessageT message) {

        logger.info("Sending Collection Task :: " + message.toString());
        MessageChannel messageChannel = collectionStream.outboundCollections();
        boolean l_bool = messageChannel.send(MessageBuilder.withPayload(message)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());
        logger.info("After sending message for Collection Task send status :: " + message.getTask().getId() + " : "
                + l_bool);
    }

When we start the application the producer produce the message properly but after some time we are receiving the error

2019-09-12 02:07:45,215 ERROR com.ericsson.tmo.cm.ccm.jobmanager.util.JobOrchestrator [scheduling-1] Error Trace ::
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@11328ab9]; nested exception is java.lang.IllegalStateException: Cannot perform operation after producer has been closed
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:179)

The problem is resolve if we restart the application. Thanks for your help.

Our Spring Cloud Config is

cloud:
    stream:
      kafka:
        binder:
          brokers: kafka
          autoCreateTopic: false
          configuration:
            auto.offset.reset: latest
            max.request.size: 16777216
            buffer.memory: 67108864
      bindings:
        collection-out:
          destination: collection
          contentType: application/json
          group: collection
          producer:
            partitionCount: 5
            autoAddPartitions: true

解决方案

The exception message is clear.

Cannot perform operation after producer has been closed

It can happen that your producer might be closed by one thread while on the other thread the producer is calling send() or the producer network thread is still sending the messages.


Since you are using Spring, I suppose that the KafkaProducer object is created for you and injected through Dependency Injection.

All you need to do is to figure out whoever uses the same KafkaProducerobject and where the close() is called.

Possible reasons

  1. Sometimes, it may happen that same KafkaProducer might be used by several other components of your same application (if you did not demarcate between several producers) and one of it might have closed it.

    More clearly, may be you are using the same producer bean instance for producing messages for different components in the same instance and somewhere it is closed.

    Typically, we have different producers producing to different topics since we have different producer configurations and topics for different types of data.

  2. I have encountered a similar exception when I added a shutdown hook in which I was calling producer.close() while the other thread is trying to produce.

这篇关于Spring Cloud Producer 异常 - java.lang.IllegalStateException:生产者关闭后无法执行操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆