初始化异步JMS侦听器并使其无限运行的正确方法 [英] Correct approach to initialize an asynchronous JMS Listener and let it run infinitely

查看:205
本文介绍了初始化异步JMS侦听器并使其无限运行的正确方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用以下应用程序连接到我的oracle数据库,注册队列侦听器并等待入队消息:

I use the following application to connect to my oracle database, register a queue listener and wait for enqueued messages:

package sample;

import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;

import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;

public class MyThread extends Thread {

    private static final String QUEUE_NAME = "MY_QUEUE";
    private static final String QUEUE_USER = "myuser";
    private static final String QUEUE_PW = "mypassword";
    private boolean running;

    public MyThread() {
        this.running = true;
    }

    public static void main(String[] args) {
        MyThread mt = new MyThread();
        mt.start();
    }

    private QueueConnection getQueueConnection() throws JMSException {
        QueueConnectionFactory QFac = AQjmsFactory.getQueueConnectionFactory("xxx.xxx.xxx.xxx", "orcl", 1521, "thin");
        QueueConnection QCon = QFac.createQueueConnection(QUEUE_USER, QUEUE_PW);
        return QCon;
    }

    @Override
    public void interrupt() {
        this.running = false;
        super.interrupt();
    }

    @Override
    public void run() {
        try {
            QueueConnection queueConnection = getQueueConnection();
            QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = ((AQjmsSession) queueSession).getQueue(QUEUE_USER, QUEUE_NAME);

            while (running) {
                System.out.println("Starting...");

                queueConnection.start();
                MessageConsumer mq = ((AQjmsSession) queueSession).createReceiver(queue);
                MyListener listener = new MyListener();
                mq.setMessageListener(listener);

                System.out.println("... Done, now sleep a bit and redo");

                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            System.out.println("Closing Application");
            queueSession.close();
            queueConnection.close();

        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

消息排队后,onMessage函数会将消息追加到文本文件内容中:

Once a message got enqueued the onMessage function will append a message into a textfiles content:

package sample;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;

import javax.jms.Message;
import javax.jms.MessageListener;

public class MyListener implements MessageListener{

    @Override
    public void onMessage(Message arg0) {
        try {
            PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter("C:/temp/output/messages.txt", true)));
            out.println("New Message arrived");
            out.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

在运行时,我的控制台输出如下:

On runtime my console output looks like this:

开始...
...做完了,现在睡一会儿重做
正在启动...
...做完了,现在睡一会儿重做
正在启动...
...做完了,现在睡一会儿重做
正在启动...
...做完了,现在睡一会儿重做

Starting...
... Done, now sleep a bit an redo
Starting...
... Done, now sleep a bit an redo
Starting...
... Done, now sleep a bit an redo
Starting...
... Done, now sleep a bit an redo

如果有任何入队,文件将包含新消息.
因此:我可以使事件出队,此消息将触发onMessage事件.

If there where any enqueues the file will contain the new messages.
So: I can dequeue Events and the onMessage event will be triggered with this code.

现在要问我的问题:我很确定等待5秒钟再次注册侦听器(并调用queueConnection.start()来接收onMessage调用)不是正确的方法.但是,如果我不这样做,将不会有任何onMessage事件(文件保持为空).

Now to my question: I'm pretty sure waiting 5 seconds to register the Listener again (and call queueConnection.start() to receive the onMessage calls) is not the correct approach. But if I don't do this there won't be any onMessage events (File stays empty).

什么是在没有固定Thread.sleep()调用且即使没有任何事件的情况下也无需再次注册侦听器而无限地开始侦听队列的正确方法?

What is the correct approach to start listening to a queue infinitely without a fixed Thread.sleep() call and without the need to register the listener again even if there weren't any events?

数据库:Oracle 11g2
Java运行时:1.6
Maven依赖项:

Database: Oracle 11g2
Java Runtime: 1.6
Maven Dependencies:

  • oracle-jdbc(11.2.0.4.0)
  • xdb(1.0)
  • aqapi(1.3)
  • jmscommon(1.3.1_02)

推荐答案

没有理由运行线程来创建JMS使用者并设置其消息侦听器. JMS消息侦听器的全部重点是异步接收消息(出于某种原因,您似乎试图复制该功能).

There's no reason to run a thread to create a JMS consumer and set its message listener. The whole point of a JMS message listener is to receive message asynchronously (functionality which you appear to be trying to duplicate for some reason).

您只需要创建JMS使用方并设置消息侦听器,然后确保未关闭使用方即可.根据应用程序的编写方式,有时有时需要使用while循环以确保程序不会终止并因此关闭使用者.您的线程没有这样做.等待消息5秒钟后,它使使用者不在范围之内,这意味着它将被垃圾回收,并且我希望对于大多数JMS实现而言,它将被关闭.不过,可能会更糟.通过不明确关闭使用者并让其超出范围,您可能会泄漏使用者,这最终将使您的消息代理陷入困境.这不仅是草率的编程,而且对于尝试使用消息的其他用户来说可能是有问题的.

You simply need to create the JMS consumer and set the message listener and then ensure the consumer isn't closed. Depending on how the application is written it is sometimes necessary to have a while loop to make sure the program doesn't terminate and therefore close the consumer. Your thread isn't doing that. It's letting the consumer fall out of scope after waiting for messages for 5 seconds which means that it will be garbage collected and I expect for most JMS implementations that means it will be closed. It could be worse than that, though. By not explicitly closing the consumer and just letting it fall out of scope you could be leaking consumers which would eventually bog down your message broker. This is not only sloppy programming, but potentially problematic for other users trying to consume messages.

这篇关于初始化异步JMS侦听器并使其无限运行的正确方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆