如何使用Java将JMS消息加入Oracle AQ [英] How to enqueue a JMS message into Oracle AQ using Java

查看:165
本文介绍了如何使用Java将JMS消息加入Oracle AQ的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个队列类型为SYS.AQ $ _JMS_TEXT_MESSAGE的Oracle AQ.我想做的是从Java应用程序将文本插入到提到的队列中.

I have an Oracle AQ with the queue type of SYS.AQ$_JMS_TEXT_MESSAGE. What I'm trying to do is to insert a text into the mentioned queue from a java application.

等效的SQL查询是

declare
 r_enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
 r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
 v_message_handle     RAW(16);
 o_payload            SYS.AQ$_JMS_TEXT_MESSAGE;
begin
 o_payload := sys.aq$_jms_text_message.construct;
 o_payload.set_text(xmltype('<user>text</user>').getClobVal());
 sys.dbms_aq.enqueue (
   queue_name         => 'QUEUE_NAME',
   enqueue_options    => r_enqueue_options,
   message_properties => r_message_properties,
   payload            => o_payload,
   msgid              => v_message_handle
 );
 commit;
end;
/

我使用本指南,但我被困在

 o_payload := sys.aq$_jms_text_message.construct;
 o_payload.set_text(xmltype('<user>text</user>').getClobVal());

该指南显示了如何使RAW消息入队,但是我需要将其作为JMS,否则数据类型与队列类型不匹配.

The guide shows how to enqueue a RAW message, but I need it to be JMS, otherwise the data type doesn't match the queue type.

任何帮助将不胜感激,因为即使有了全能的Google我也找不到解决此问题的方法.有没有办法使用oracle.jdbc.aq类来做到这一点,还是我只需要吸收它并使用SQL查询?

Any help would be appreciated, because even with the almighty google I am not able to find a solution to this problem. Is there a way to do it using the oracle.jdbc.aq classes, or do I just have to suck it up and use the SQL query?

推荐答案

只需复制粘贴此代码并尝试. (如果它对您有用)然后仔细阅读代码并理解.

Just copy paste this code and try. (if it works for you) Then carefully go through the code, and understand.

执行时,

  • 首先在主方法中取消对" createQueue()"行的注释.
  • First uncomment the 'createQueue()' line in the main method.

之后

  • 注释它,然后取消注释" sendMessage()"行,然后尝试发送消息.
  • Comment it and uncomment 'sendMessage()' line and try sending your message.

然后分别注释/取消注释每行并尝试一下.

Then comment/uncomment each line respectively and give a try.

import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.jms.TextMessage;

import oracle.AQ.AQQueueTable;
import oracle.AQ.AQQueueTableProperty;
import oracle.jms.AQjmsDestination;
import oracle.jms.AQjmsDestinationProperty;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;

public class OracleAQClient {

public static QueueConnection getConnection() {

    String hostname = "localhost";
    String oracle_sid = "xe";
    int portno = 1521;
    String userName = "jmsuser";
    String password = "jmsuser";
    String driver = "thin";
    QueueConnectionFactory QFac = null;
    QueueConnection QCon = null;
    try {
        // get connection factory , not going through JNDI here
        QFac = AQjmsFactory.getQueueConnectionFactory(hostname, oracle_sid, portno, driver);
        // create connection
        QCon = QFac.createQueueConnection(userName, password);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return QCon;
}

public static void createQueue(String user, String qTable, String queueName) {
    try {
        /* Create Queue Tables */
        System.out.println("Creating Queue Table...");
        QueueConnection QCon = getConnection();
        Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

        AQQueueTableProperty qt_prop;
        AQQueueTable q_table = null;
        AQjmsDestinationProperty dest_prop;
        Queue queue = null;
        qt_prop = new AQQueueTableProperty("SYS.AQ$_JMS_TEXT_MESSAGE");

        q_table = ((AQjmsSession) session).createQueueTable(user, qTable, qt_prop);

        System.out.println("Qtable created");
        dest_prop = new AQjmsDestinationProperty();
        /* create a queue */
        queue = ((AQjmsSession) session).createQueue(q_table, queueName, dest_prop);
        System.out.println("Queue created");
        /* start the queue */
        ((AQjmsDestination) queue).start(session, true, true);

    } catch (Exception e) {
        e.printStackTrace();
        return;
    }
}

public static void sendMessage(String user, String queueName,String message) {

    try {
        QueueConnection QCon = getConnection();
        Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
        QCon.start();
        Queue queue = ((AQjmsSession) session).getQueue(user, queueName);
        MessageProducer producer = session.createProducer(queue);
        TextMessage tMsg = session.createTextMessage(message);

        //set properties to msg since axis2 needs this parameters to find the operation
        tMsg.setStringProperty("SOAPAction", "getQuote");
        producer.send(tMsg);
        System.out.println("Sent message = " + tMsg.getText());

        session.close();
        producer.close();
        QCon.close();

    } catch (JMSException e) {
        e.printStackTrace();
        return;
    }
}

public static void browseMessage(String user, String queueName) {
    Queue queue;
    try {
        QueueConnection QCon = getConnection();
        Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

        QCon.start();
        queue = ((AQjmsSession) session).getQueue(user, queueName);
        QueueBrowser browser = session.createBrowser(queue);
        Enumeration enu = browser.getEnumeration();
        List list = new ArrayList();
        while (enu.hasMoreElements()) {
            TextMessage message = (TextMessage) enu.nextElement();
            list.add(message.getText());
        }
        for (int i = 0; i < list.size(); i++) {
            System.out.println("Browsed msg " + list.get(i));
        }
        browser.close();
        session.close();
        QCon.close();

    } catch (JMSException e) {
        e.printStackTrace();
    }

}

public static void consumeMessage(String user, String queueName) {
    Queue queue;
    try {
        QueueConnection QCon = getConnection();
        Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
        QCon.start();
        queue = ((AQjmsSession) session).getQueue(user, queueName);
        MessageConsumer consumer = session.createConsumer(queue);
        TextMessage msg = (TextMessage) consumer.receive();
        System.out.println("MESSAGE RECEIVED " + msg.getText());

        consumer.close();
        session.close();
        QCon.close();
    } catch (JMSException e) {
        e.printStackTrace();
    }
}

public static void main(String args[]) {
    String userName = "jmsuser";
    String queue = "sample_aq";
    String qTable = "sample_aqtbl";
    //createQueue(userName, qTable, queue);
    //sendMessage(userName, queue,"<user>text</user>");
    //browseMessage(userName, queue);
    //consumeMessage(userName, queue);
}

}

您将需要将这些jars/libs从oracle DB安装目录复制到Java项目中

You will need to copy these jars/libs to your java project from your oracle DB setup directory

  1. ojdbc6.jar
  2. jta.jar
  3. jmscommon.jar
  4. aqapi.jar

本文应归功于Ratha [1].几乎没有什么要修改的东西,我只是对其进行了修改并提供了代码.

The credits should go to Ratha for this article[1]. There were few stuff to be amended, I just modified those and provided the code.

[1] http://wso2 .com/library/tutorials/2011/11/configuring-wso2-esb-with-oracle-as-messaging-media/

谢谢

这篇关于如何使用Java将JMS消息加入Oracle AQ的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆