Spring Cloud Producer 异常 - java.lang.IllegalStateException:生产者关闭后无法执行操作 [英] Spring Cloud Producer exception - java.lang.IllegalStateException: Cannot perform operation after producer has been closed
问题描述
Spring Cloud Producer 抛出异常 - java.lang.IllegalStateException: Producer 关闭后无法执行操作
嗨我们给出了一个基于spring cloud的微服务应用.在该应用程序中,我们在向 Kafka 生成消息时遇到错误.我们正在使用以下 Spring Cloud 版本.
Spring Cloud 版本
格林威治.SR1
以下是我们的接口定义,供Kafka生产者使用
String INPUT = "jobmanager-in";String OUTPUT_C = "集合输出";String OUTPUT_P = "解析器输出";String OUTPUT_CMP = "比较输出";String OUTPUT_R = "报告输出";String OUTPUT_N = "通知输出";@输入(输入)SubscribableChannel inboundJobManager();@输出(输出_C)MessageChannel outboundCollections();@输出(输出_P)MessageChannel outboundParse();@输出(输出_CMP)MessageChannel outboundCompare();@输出(输出_R)MessageChannel outboundReport();@输出(输出_N)MessageChannel outboundNotification();
生产者代码是
public void sendCollectionTask(final MessageT message) {logger.info("发送采集任务::" + message.toString());MessageChannel messageChannel = collectionStream.outboundCollections();boolean l_bool = messageChannel.send(MessageBuilder.withPayload(message).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());logger.info("为收集任务发送消息后发送状态::" + message.getTask().getId() + " : "+ l_bool);}
当我们启动应用程序时,生产者正确生成消息,但一段时间后我们收到错误
2019-09-12 02:07:45,215 错误 com.ericsson.tmo.cm.ccm.jobmanager.util.JobOrchestrator [scheduling-1] 错误跟踪 ::org.springframework.messaging.MessageHandlingException:消息处理程序发生错误 [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@11328ab9];嵌套异常是 java.lang.IllegalStateException: 生产者关闭后无法执行操作在 org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)在 org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:179)
如果我们重新启动应用程序,问题就解决了.感谢您的帮助.
我们的 Spring Cloud 配置是
云:溪流:卡夫卡:粘合剂:经纪人:kafka自动创建主题:假配置:auto.offset.reset:最新最大请求大小:16777216缓冲内存:67108864绑定:收集输出:目的地:集合内容类型:应用程序/json组: 集合制作人:分区数:5自动添加分区:真
异常信息很清楚.
<块引用>生产者关闭后无法进行操作
您的生产者可能被一个线程关闭,而另一个线程上的生产者正在调用 send()
或生产者网络线程仍在发送消息.
由于您使用的是 Spring,我认为 KafkaProducer
对象是为您创建的,并通过依赖注入进行注入.
您需要做的就是找出使用相同KafkaProducer
对象的人以及close()
被调用的位置.>
可能的原因
有时,same
KafkaProducer
可能会被同一应用程序的几个其他组件使用(如果您没有在多个生产者之间进行划分)和其中之一可能已关闭它.更清楚,可能是您使用相同的生产者 bean 实例为同一实例中的不同组件以及它关闭的某处生成消息.
通常情况下,我们有不同的生产者生产不同的主题,因为我们有不同的生产者配置和不同类型的数据的主题.
当我添加一个关闭钩子时,我遇到了类似的异常,其中我正在调用
立>producer.close()
而另一个线程正在尝试生成.
Spring Cloud Producer throwing exception - java.lang.IllegalStateException: Cannot perform operation after producer has been closed
Hi We gave a spring cloud based microservice application. In that application we are getting error while producing the message to Kafka. We are using following Spring Cloud version.
Spring Cloud Version
Greenwich.SR1
Following are our interface defination which is used by the Kafka producer
String INPUT = "jobmanager-in";
String OUTPUT_C = "collection-out";
String OUTPUT_P = "parser-out";
String OUTPUT_CMP = "compare-out";
String OUTPUT_R = "report-out";
String OUTPUT_N = "notification-out";
@Input(INPUT)
SubscribableChannel inboundJobManager();
@Output(OUTPUT_C)
MessageChannel outboundCollections();
@Output(OUTPUT_P)
MessageChannel outboundParse();
@Output(OUTPUT_CMP)
MessageChannel outboundCompare();
@Output(OUTPUT_R)
MessageChannel outboundReport();
@Output(OUTPUT_N)
MessageChannel outboundNotification();
The producer code is
public void sendCollectionTask(final MessageT message) {
logger.info("Sending Collection Task :: " + message.toString());
MessageChannel messageChannel = collectionStream.outboundCollections();
boolean l_bool = messageChannel.send(MessageBuilder.withPayload(message)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());
logger.info("After sending message for Collection Task send status :: " + message.getTask().getId() + " : "
+ l_bool);
}
When we start the application the producer produce the message properly but after some time we are receiving the error
2019-09-12 02:07:45,215 ERROR com.ericsson.tmo.cm.ccm.jobmanager.util.JobOrchestrator [scheduling-1] Error Trace ::
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@11328ab9]; nested exception is java.lang.IllegalStateException: Cannot perform operation after producer has been closed
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:179)
The problem is resolve if we restart the application. Thanks for your help.
Our Spring Cloud Config is
cloud:
stream:
kafka:
binder:
brokers: kafka
autoCreateTopic: false
configuration:
auto.offset.reset: latest
max.request.size: 16777216
buffer.memory: 67108864
bindings:
collection-out:
destination: collection
contentType: application/json
group: collection
producer:
partitionCount: 5
autoAddPartitions: true
The exception message is clear.
Cannot perform operation after producer has been closed
It can happen that your producer might be closed by one thread while on the other thread the producer is calling send()
or the producer network thread is still sending the messages.
Since you are using Spring, I suppose that the KafkaProducer
object is created for you and injected through Dependency Injection.
All you need to do is to figure out whoever uses the same KafkaProducer
object and where the close()
is called.
Possible reasons
Sometimes, it may happen that same
KafkaProducer
might be used by several other components of your same application (if you did not demarcate between several producers) and one of it might have closed it.More clearly, may be you are using the same producer bean instance for producing messages for different components in the same instance and somewhere it is closed.
Typically, we have different producers producing to different topics since we have different producer configurations and topics for different types of data.
I have encountered a similar exception when I added a shutdown hook in which I was calling
producer.close()
while the other thread is trying to produce.
这篇关于Spring Cloud Producer 异常 - java.lang.IllegalStateException:生产者关闭后无法执行操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!