Spring Integration MQTT订户(paho)停止处理消息 [英] Spring Integration MQTT Subscriber (paho) stops processing messages

查看:420
本文介绍了Spring Integration MQTT订户(paho)停止处理消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们在Spring集成中遇到了一个MQTT订阅者的问题(4.0.3.RELEASE在Paho MQTT Client 0.4.0上在Tomcat 7上运行).

We're experiencing an issue with one of our MQTT subscribers in Spring integration (4.0.3.RELEASE running on Tomcat 7 with the Paho MQTT Client 0.4.0).

问题出在订阅者上一个使用频繁的主题(大量消息)上.将消息发送到主题的设备是现场通过GPRS连接的设备.

The issue is with a subscriber on a heavily used topic (lots of messages). The devices sending the messages to the topic are devices in the field connecting over GPRS.

Spring集成和代理(Mosquitto)在同一服务器上运行.

Spring integration and the broker (Mosquitto) are running on the same server.

在不重新启动服务器的情况下在Tomcat上进行了几次重新部署后,似乎出现了此问题.发生问题时,重新启动tomcat实例会修复它一段时间.

The issue seems to appear after doing a couple of redeploys on the Tomcat without restarting the server. When the issue occurs, a restart of the tomcat instance fixes it for a while.

这是事件链(来自蚊子日志.vdm-dev-live订阅者是有问题的那个):

Here's the chain of events (from the mosquitto logs. The vdm-dev-live subscriber is the one with the issues):

开始进行Spring集成时,我们看到所有订户都已连接到各个主题:

When starting spring integration, we see all subscribers connecting to the various topics:

1409645645: New client connected from xxx.xx.xx.xxx as vdm-dev-live (c1, k60).
1409645645: Sending CONNACK to vdm-dev-live (0)
1409645645: Received SUBSCRIBE from vdm-dev-live
1409645645:     vdm/+/+/+/liveData (QoS 1)
1409645645: Sending SUBACK to vdm-dev-live
1409645645: New connection from xxx.xx.xx.xxx on port 1873.
1409645645: New client connected from xxx.xx.xx.xxx as vdm-dev-fmReq (c1, k60).
1409645645: Sending CONNACK to vdm-dev-fmReq (0)
1409645645: Received SUBSCRIBE from vdm-dev-fmReq
1409645645:     vdm/+/+/+/firmware/request (QoS 1)
1409645645: Sending SUBACK to vdm-dev-fmReq
1409645645: New connection from xxx.xx.xx.xxx on port 1873.
1409645645: New client connected from xxx.xx.xx.xxx as vdm-dev-cfgReq (c1, k60).
1409645645: Sending CONNACK to vdm-dev-cfgReq (0)
1409645645: Received SUBSCRIBE from vdm-dev-cfgReq
1409645645:     vdm/+/+/+/config/request (QoS 1)
1409645645: Sending SUBACK to vdm-dev-cfgReq
1409645645: New connection from xxx.xx.xx.xxx on port 1873.
1409645645: New client connected from xxx.xx.xx.xxx as vdm-dev-fmStat (c1, k60).
1409645645: Sending CONNACK to vdm-dev-fmStat (0)
1409645645: Received SUBSCRIBE from vdm-dev-fmStat
1409645645:     vdm/+/+/firmware/status (QoS 1)
1409645645: Sending SUBACK to vdm-dev-fmStat

我们看到来回的消息

1409645646: Received PUBLISH from 89320292400015932480 (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645646: Sending PUBLISH to vdm-dev-live (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645646: Sending PUBLISH to Yo3zC8ou5y (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645646: Sending PUBLISH to mqttjs_31f1e3f7cd0e0aed (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645648: Received PUBLISH from 89320292400015932480 (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645648: Sending PUBLISH to vdm-dev-live (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645648: Sending PUBLISH to Yo3zC8ou5y (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645648: Sending PUBLISH to mqttjs_31f1e3f7cd0e0aed (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645650: Received PUBLISH from 89320292400015932480 (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645650: Sending PUBLISH to vdm-dev-live (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645650: Sending PUBLISH to Yo3zC8ou5y (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645650: Sending PUBLISH to mqttjs_31f1e3f7cd0e0aed (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))

我们看到了来自各个订户的ping请求

We see the ping requests from the various subscribers

1409645705: Received PINGREQ from vdm-dev-update
1409645705: Sending PINGRESP to vdm-dev-update
1409645705: Received PINGREQ from vdm-dev-live
1409645705: Sending PINGRESP to vdm-dev-live
1409645705: Received PINGREQ from vdm-dev-fmReq
1409645705: Sending PINGRESP to vdm-dev-fmReq
1409645705: Received PINGREQ from vdm-dev-cfgReq
1409645705: Sending PINGRESP to vdm-dev-cfgReq
1409645705: Received PINGREQ from vdm-dev-fmStat
1409645705: Sending PINGRESP to vdm-dev-fmStat

但是突然之间我们看到了:

But all of a sudden we see this:

1409645776: Socket error on client vdm-dev-live, disconnecting.

那时订户已经死了.我们没有看到任何ping请求,并且不再处理该主题中的任何消息.在代理级别,一切都还不错,因为我有调试日志订阅服务器(使用NodeJS),在那里我看到那些订阅服务器仍在处理来自该主题的消息(所以问题出在订阅服务器级别).

And at that point the subscriber is dead. We're not seeing any ping requests and it is no longer processing any messages from that topic. On a broker level everything is still fine, as I have debug-log subscribers (using NodeJS) where I see that those subscribers are still processing the messages from that topic just fine (so the issue is on the subscriber level).

在tomcat日志中,我们还看到以下内容:

In the tomcat logs we also see this:

Sep 02, 2014 10:16:05 AM org.eclipse.paho.client.mqttv3.internal.ClientState checkForActivity
SEVERE: vdm-dev-live: Timed out as no activity, keepAlive=60,000 lastOutboundActivity=1,409,645,705,714 lastInboundActivity=1,409,645,755,075

但是Paho不会对该用户进行任何清理/重启.

But Paho doesn't do any cleanup / restart of this subscriber.

我也在Tomcat日志中看到了这一点:

I'm also seeing this in the Tomcat logs:

SEVERE: The web application [/vdmapp] appears to have started a thread named [MQTT Snd: vdm-dev-live] but has failed to stop it. This is very likely to create a memory leak.

我还注意到该用户的很多线程在关闭时都卡住了.

I also noticed a lot of threads for that subscriber that are stuck while doing a shutdown.

"MQTT Snd: vdm-dev-live" daemon prio=10 tid=0x00007f1b44781800 nid=0x3061 in Object.wait() [0x00007f1aa7bfa000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.lang.Thread.join(Thread.java:1258)
    - locked <0x00000007ab13e218> (a java.lang.Thread)
    at java.lang.Thread.join(Thread.java:1332)
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.stop(CommsReceiver.java:77)
    - locked <0x00000007ab552730> (a java.lang.Object)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms.shutdownConnection(ClientComms.java:294)
    at org.eclipse.paho.client.mqttv3.internal.CommsSender.handleRunException(CommsSender.java:154)
    at org.eclipse.paho.client.mqttv3.internal.CommsSender.run(CommsSender.java:131)
    at java.lang.Thread.run(Thread.java:722)

有什么想法会导致这种情况,以及如何防止这种情况发生?

Any idea what is causing this and how this can be prevented ?

推荐答案

在我对@Artem答案的评论之后...

Following up from my comments in @Artem's answer...

Paho客户端似乎出现了僵局.参见要点的第573行; Snd线程正在等待Rec线程终止.在586行,因为入站队列已满(10),所以阻塞了Rec线程.对于所有看起来像这样的情况,没有Call线程.因此,队列满条件将永远不会被清除.请注意,在第227行,线程的三连发工作正常(大概是重新部署后重新连接了吗?).

There appears to be a deadlock in the Paho client. See line 573 of your Gist; the Snd thread is waiting for the Rec thread to terminate. At line 586, the Rec thread is blocked because the inbound queue is full (10). For all the cases that look like this, there is no Call thread. So the queue full condition will never be cleared. Notice at line 227 the trifecta of threads are working fine (presumably a reconnection after redeploy?).

对于死线程,没有Call线程.

With the dead threads, there is no Call thread.

我认为问题出在Paho客户端中-在CommsCallback.run()方法中,在Throwable上有一个陷阱,它关闭了连接,但是由于队列已满,所以从不通知Rec线程(因此不会清除).因此,似乎邮件传递引发了异常,如果队列已满,则会导致此死锁.

I think the problem is in the Paho client - in the CommsCallback.run() method, there is a catch on Throwable, which closes the connection, but because the queue is full, the Rec thread is never notified (and so won't clean up). So it seems the message delivery is throwing an exception and, if the queue is full, causes this deadlock.

Paho客户需要修复,但与此同时,我们可以找出异常是什么.

The Paho client needs a fix but in the meantime, we can figure out what the exception is.

如果异常是入站网关的下游,则应该看到一条日志...

If the exception is downstream of the inbound gateway, you should see a log...

        logger.error("Unhandled exception for " + message.toString(), e);

由于此日志是在MqttCallback.messageArrived()中生成的,因此,如果您没有看到此类错误,则问题可能出在Paho客户端本身.

Since this log is produced in MqttCallback.messageArrived(), if you are not seeing such errors, the problem may be in the Paho client itself.

CommsCallback中的异常处理如下所示……

The exception handling in CommsCallback looks like this...

        } catch (Throwable ex) {
            // Users code could throw an Error or Exception e.g. in the case
            // of class NoClassDefFoundError
            // @TRACE 714=callback threw exception
            log.fine(className, methodName, "714", null, ex);
            running = false;
            clientComms.shutdownConnection(null, new MqttException(ex));
        }

(这是他们应调用spaceAvailable.notifyAll()唤醒(即将死去的)Rec线程的地方).

(that is where they should call spaceAvailable.notifyAll() to wake the (dying) Rec thread).

因此,为Paho客户端打开FINE日志记录应该告诉您例外情况在哪里/什么情况.

So, turning on FINE logging for the Paho client should tell you where/what the exception is.

这篇关于Spring Integration MQTT订户(paho)停止处理消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆