如何在Java提交时排队入Oracle AQ表并通过JMS客户端使用 [英] How to enqueue on Oracle AQ table on commit with Java and consume with a JMS client

查看:458
本文介绍了如何在Java提交时排队入Oracle AQ表并通过JMS客户端使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在为企业级产品编写Java组件,并希望利用Oracle 11g数据库的特定功能Active Queues.我要完成的确切方案是- 1.提交时向oracle活动队列/队列表中写入一条消息 2.与JMS使用者从队列中读取该消息

I am writing a Java component for an enterprise level product and want to leverage a particular feature of Oracle 11g databases, Active Queues. The exact scenario i nwant to accomplish is - 1. write a message to the oracle active queue/queue table on commit 2. read that message from the queue with a JMS consumer

我在 http://docs.oracle.com/cd/B28359_01/java.111/b31224/streamsaq.htm

特别是,我想专注于代码的入队部分-

and in particular, I'd like to focus on the enqueue part of the code -

    // Create the actual AQMessage instance:
    AQMessage mesg = AQFactory.createAQMessage(msgprop);
    // and add a payload:
    byte[] rawPayload = new byte[500];
    for (int i = 0; i < rawPayload.length; i++) {
        rawPayload[i] = 'b';
    }

    mesg.setPayload(new RAW(rawPayload));

    AQEnqueueOptions opt = new AQEnqueueOptions();
    opt.setRetrieveMessageId(true);
    opt.setDeliveryMode(AQEnqueueOptions.DeliveryMode.PERSISTENT);
    opt.setVisibility(AQEnqueueOptions.VisibilityOption.ON_COMMIT);

    // execute the actual enqueue operation:
    conn.enqueue(queueName, opt, mesg);

这对我来说很好,因为我们要确保仅在事务提交后,消费者才能看到该消息.

This works just fine for me, because we want to make sure the message is only visible to the consumers when the transaction is committed.

问题-在演示中,我们创建有效负载类型RAW的队列

The problem - In the demo we create queues of payload-type RAW

doUpdateDatabase(conn,
           "BEGIN "+
           "DBMS_AQADM.CREATE_QUEUE_TABLE( "+
           "   QUEUE_TABLE        =>  '"+USERNAME+".RAW_SINGLE_QUEUE_TABLE',  "+
           "   QUEUE_PAYLOAD_TYPE =>  'RAW', "+
           "   COMPATIBLE         =>  '10.0'); "+
           "END; ");
doUpdateDatabase(conn,
           "BEGIN "+
           "DBMS_AQADM.CREATE_QUEUE( "+
           "    QUEUE_NAME     =>   '"+USERNAME+".RAW_SINGLE_QUEUE', "+
           "    QUEUE_TABLE    =>   '"+USERNAME+".RAW_SINGLE_QUEUE_TABLE'); "+
           "END;  ");
doUpdateDatabase(conn,
           "BEGIN "+
           "  DBMS_AQADM.START_QUEUE('"+USERNAME+".RAW_SINGLE_QUEUE'); "+
           "END; ");

通过使用RAW中创建的队列,我可以将消息排队到队列中,但是JMS使用者无法订阅该队列,并抛出(空指针)异常,其中使用者需要一个预期类型的​​参数.简而言之,这段代码在初始化时抛出了空指针异常.

by using queues created in RAW I am able to enqueue messages to the queue, however JMS consumers fail to subscribe to the queue throwing an (null pointer) exception where the consumer expects a parameter for the expected type. In short this code throws a null pointer exception on init.

Properties env = new Properties();
env.load(new FileInputStream(new File("jndi.properties")));
Context ctx = new InitialContext(env);
ConnectionFactory connFactory = (ConnectionFactory)ctx.lookup(connectionFactoryName);
Connection connection = connFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
AQjmsSession queueSession = (AQjmsSession) session;
Queue queue = (Queue) ctx.lookup(queueName);
MessageConsumer receiver = queueSession.createReceiver(queue);

JNDI.properties

JNDI.properties

java.naming.factory.initial = oracle.jms.AQjmsInitialContextFactory
java.naming.security.principal = username
java.naming.security.credentials = password
db_url = jdbc:oracle:thin:@host:port:dbname

尝试在Camel中设置使用者时,我遇到类似的异常.

I get a similar exception when trying to setup consumers in Camel.

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
        <!-- this camel route will read incoming messages from Oracle -->
        <route>
            <from uri="oracleQueue:queue:RAW_SINGLE_QUEUE" />
            <to uri="WebSphereMQ:queue:myWebSphereQueue" />
        </route>
    </camelContext>

    <bean id="connectionFactoryOracleAQQueue" class="oracle.jms.AQjmsFactory" factory-method="getQueueConnectionFactory">
        <constructor-arg index="0">
            <value>oracle db URL</value>
        </constructor-arg>
        <constructor-arg index="1" type="java.util.Properties">
            <value></value>
        </constructor-arg>
    </bean>

    <bean id="oracleQueueCredentials" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
        <property name="targetConnectionFactory">
            <ref bean="connectionFactoryOracleAQQueue" />
        </property>
        <property name="username">
            <value>username</value>
        </property>
        <property name="password">
            <value>password</value>
        </property>
    </bean>

    <bean id="oracleQueue" class="org.apache.camel.component.jms.JmsComponent">
        <property name="connectionFactory" ref="oracleQueueCredentials" />
    </bean>

通过一些研究,我认为可能是队列有效负载类型.因此,我更改了队列表创建脚本,并使用JMS消息作为有效负载类型

 doUpdateDatabase(conn, "BEGIN " + "DBMS_AQADM.CREATE_QUEUE_TABLE( "
 + "   QUEUE_TABLE        =>  'RAW_SINGLE_QUEUE_TABLE',  "
 + "   QUEUE_PAYLOAD_TYPE =>  'SYS.AQ$_JMS_MESSAGE', " +
 "   COMPATIBLE         =>  '10.0'); " + "END; ");

在这种情况下,JMS使用者可以连接,但是排队代码现在失败- ORA-25215:user_data类型和队列类型不匹配

In this case the JMS Consumers are able to connect, but the enqueue code now fails - ORA-25215: user_data type and queue type do not match

问题是,如何才能使Java生产者中的消息(仅在提交时可见)排队,并能够与骆驼或通用JMS消费者一起使用?

The question is how can I enqueue messages, visible only on commit, from a Java producer and be able to consume with camel or generic JMS consumer?

约束(以过滤掉网上已经存在的一些答案)-无法使用PL/SQL,spring事务,JTA.我见过类似这样的示例的示例使用Java将JMS消息发送到Oracle AQ ,其中队列表是使用SYS.AQ $ _JMS_MESSAGE类型创建的,但是示例生产者是JMS MessageProducer,而不是oracle指南中的那个.我不是要排队JMS消息(AQJmsMessage),而是按照Oracle指南中的说明使用AQMessage类型,并使用visible on commit选项.

constraints (to filter out some of the answers already on the net) - Cannot use PL/SQL, spring transactions, JTA. I've seen examples like How to enqueue a JMS message into Oracle AQ using Java where the queue table is created with the SYS.AQ$_JMS_MESSAGE type but the example producer is a JMS MessageProducer rather than the one in the oracle guide. I am not trying to enqueue JMS messages (AQJmsMessage), rather use the AQMessage type as explained in the Oracle guide, and to use the visible on commit option.

我的感觉是,如果问题仅基于有效载荷类型的不匹配,则必须在用户端进行某种配置以指定有效载荷类型,或者在生产者端进行某种配置,以便能够以某种方式写入消息JMS消费者会理解.有没有办法做到这一点?

My feeling is that if the issue is based on a mismatch of payload types only, then there must be some configuration on the consumer side to specify the payload type, or on the producer side to be able to write messages in a way JMS consumers will understand. Is there a way to accomplish this?

推荐答案

我能够做到这一点-我不得不猜测Oracle API的许多部分,并从各种博客中收集提示.对于任何对此感兴趣的人来说,我都能正常工作- 1.我在Oracle Db上创建了一个Oracle对象 2.使用此Oracle对象,我创建了对象类型的队列表作为有效负载 3.现在,我可以使用STRUCT有效负载使AQMessage类型入队,其中包含对象数据 4.而且,我能够与了解ADT有效负载类型的JMS使用方出队(感谢

I was able to accomplish this - I had to guess around many parts of the Oracle API, and collecting hints from various blogs. For anyone interested here is way I got it working - 1. I created an Oracle Object on the Oracle Db 2. With this Oracle Object, I created queue tables of the object type as the payload 3. I am now able to enqueue AQMessage types with STRUCT payload, containing the object data 4. And I am able to dequeue with a JMS consumer that understands the ADT payload type (Thanks to the article at http://blog.javaforge.net/post/30858904340/oracle-advanced-queuing-spring-custom-types)

以下是代码步骤- 创建Oracle

Here are the steps with code - Create the Oracle object. The object can have any primary data type fields like VARCHAR, TIMESTAMP etc and also BLOB, CLOB etc. In this case I provided one of the columns as a blob to make things more complicated.

create or replace type aq_event_obj as object
(
  id       varchar2(100),
  payload  BLOB
);
commit;

现在创建队列表.表的有效负载类型是oracle对象.

Now create the queue table. The payload type of the table is the oracle object.

private void setup(Connection conn) throws SQLException {
    doUpdateDatabase(conn, "BEGIN " + "DBMS_AQADM.CREATE_QUEUE_TABLE( "
            + "   QUEUE_TABLE        =>  'OBJ_SINGLE_QUEUE_TABLE',  " + "   QUEUE_PAYLOAD_TYPE =>  'AQ_EVENT_OBJ', "
            + "   COMPATIBLE         =>  '10.0'); " + "END; ");
    doUpdateDatabase(conn, "BEGIN " + "DBMS_AQADM.CREATE_QUEUE( " + "    QUEUE_NAME     =>   'OBJ_SINGLE_QUEUE', "
            + "    QUEUE_TABLE    =>   'OBJ_SINGLE_QUEUE_TABLE'); " + "END;  ");
    doUpdateDatabase(conn, "BEGIN " + "  DBMS_AQADM.START_QUEUE('OBJ_SINGLE_QUEUE'); " + "END; ");
} 

您现在可以使用对象的结构实例将Java中的AQMessage类型排队

You can now enqueue AQMessage types in Java with a struct instance of the object

public void enqueueMessage(OracleConnection conn, String correlationId, byte[] payloadData) throws Exception {
    // First create the message properties:
    AQMessageProperties aqMessageProperties = AQFactory.createAQMessageProperties();
    aqMessageProperties.setCorrelation(correlationId);
    aqMessageProperties.setExceptionQueue(EXCEPTION_QUEUE_NAME);

    // Specify an agent as the sender:
    AQAgent aqAgent = AQFactory.createAQAgent();
    aqAgent.setName(SENDER_NAME);
    aqAgent.setAddress(QUEUE_NAME);
    aqMessageProperties.setSender(aqAgent);

    // Create the payload
    StructDescriptor structDescriptor = StructDescriptor.createDescriptor(EVENT_OBJECT, conn);
    Map<String, Object> payloadMap = new HashMap<String, Object>();
    payloadMap.put("ID", correlationId);
    payloadMap.put("PAYLOAD", new OracleAQBLOBUtil().createBlob(conn, payloadData));
    STRUCT struct = new STRUCT(structDescriptor, conn, payloadMap);

    // Create the actual AQMessage instance:
    AQMessage aqMessage = AQFactory.createAQMessage(aqMessageProperties);
    aqMessage.setPayload(struct);

    AQEnqueueOptions opt = new AQEnqueueOptions();
    opt.setDeliveryMode(AQEnqueueOptions.DeliveryMode.PERSISTENT);
    opt.setVisibility(AQEnqueueOptions.VisibilityOption.ON_COMMIT);

    // execute the actual enqueue operation:
    conn.enqueue(QUEUE_NAME, opt, aqMessage);
}

blob字段需要特殊处理

The blob field needed special handling

public class OracleAQBLOBUtil {

    public BLOB createBlob(OracleConnection conn, byte[] payload) throws Exception {
        BLOB blob = BLOB.createTemporary(conn, false, BLOB.DURATION_SESSION);
        OutputStream outputStream = blob.setBinaryStream(1L);
        InputStream inputStream = new ByteArrayInputStream(payload);
        try {
            byte[] buffer = new byte[blob.getBufferSize()];
            int bytesRead = 0;
            while ((bytesRead = inputStream.read(buffer)) != -1) {
                outputStream.write(buffer, 0, bytesRead);
            }
            return blob;
        }
        finally {
            outputStream.close();
            inputStream.close();
        }
    }

    public byte[] saveOutputStream(BLOB blob) throws Exception {
        InputStream inputStream = blob.getBinaryStream();
        int counter;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        while ((counter = inputStream.read()) > -1) {
            byteArrayOutputStream.write(counter);
        }
        byteArrayOutputStream.close();
        return byteArrayOutputStream.toByteArray();
    }

}

对于使用者,您需要提供一个ORADataFactory实例,以使使用者了解有效负载类型(您的自定义对象).

For the consumer, you need to provide an instance of ORADataFactory that lets the consumer understand the payload type (your custom object).

AQjmsSession queueSession = (AQjmsSession) session;
Queue queue = (Queue) ctx.lookup(queueName);
MessageConsumer receiver = queueSession.createReceiver(queue, new OracleAQObjORADataFactory());

OracleAQObjORADataFactory的代码是

Where the code for OracleAQObjORADataFactory is

import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.SQLException;

import oracle.jdbc.OracleTypes;
import oracle.jpub.runtime.MutableStruct;
import oracle.sql.BLOB;
import oracle.sql.Datum;
import oracle.sql.ORAData;
import oracle.sql.ORADataFactory;
import oracle.sql.STRUCT;

public class OracleAQObjORADataFactory  implements ORAData, ORADataFactory {

    public static final String EVENT_OBJECT = "SYSTEM.AQ_EVENT_OBJ";
    public static final int _SQL_TYPECODE = OracleTypes.STRUCT;

    protected MutableStruct _struct;

    protected static int[] _sqlType = { java.sql.Types.VARCHAR, java.sql.Types.VARBINARY };
    protected static ORADataFactory[] _factory = new ORADataFactory[2];
    protected static final OracleAQObjORADataFactory  _AqEventObjFactory = new OracleAQObjORADataFactory ();

    public static ORADataFactory getORADataFactory() {
        return _AqEventObjFactory;
    }

    /* constructors */
    protected void _init_struct(boolean init) {
        if (init)
            _struct = new MutableStruct(new Object[2], _sqlType, _factory);
    }

    public OracleAQObjORADataFactory () {
        _init_struct(true);
    }

    public OracleAQObjORADataFactory (String id, byte[] payload) throws SQLException {
        _init_struct(true);
        setId(id);
        setPayload(payload);
    }

    /* ORAData interface */
    public Datum toDatum(Connection c) throws SQLException {
        return _struct.toDatum(c, EVENT_OBJECT);
    }

    /* ORADataFactory interface */
    public ORAData create(Datum d, int sqlType) throws SQLException {
        return create(null, d, sqlType);
    }

    protected ORAData create(OracleAQObjORADataFactory  o, Datum d, int sqlType) throws SQLException {
        if (d == null)
            return null;
        if (o == null)
            o = new OracleAQObjORADataFactory ();
        o._struct = new MutableStruct((STRUCT) d, _sqlType, _factory);
        return o;
    }

    public String getId() throws SQLException {
        return (String) _struct.getAttribute(0);
    }

    public void setId(String id) throws SQLException {
        _struct.setAttribute(0, id);
    }

    public byte[] getPayload() throws SQLException {
        BLOB blob = (BLOB) _struct.getAttribute(1);
        InputStream inputStream = blob.getBinaryStream();
        return getBytes(inputStream);
    }

    public byte[] getBytes(InputStream body) {
        int c;
        try {
            ByteArrayOutputStream f = new ByteArrayOutputStream();
            while ((c = body.read()) > -1) {
                f.write(c);
            }
            f.close();
            byte[] result = f.toByteArray();
            return result;
        }
        catch (Exception e) {
            System.err.println("Exception: " + e.getMessage());
            e.printStackTrace();
            return null;
        }
    }

    public void setPayload(byte[] payload) throws SQLException {
        _struct.setAttribute(1, payload);
    }

}

您可能在项目中使用了Camel或Spring,在这种情况下- 1.如果您使用的是Camel 2.10.2或更高版本,则可以使用自定义消息列表容器(CAMEL-5676)创建JMS使用者 2.如果您使用的是以前的版本,则可能无法使用端点方式(我无法弄清楚),但是可以使用JMS请求侦听器

You're probably using Camel or Spring in your project, in which case - 1. If you're on Camel 2.10.2 or upwards, you can create a JMS consumer with a custom message lister container (CAMEL-5676) 2. If you're on a previous version then you may not be able to use the endpoint way (i couldn't figure it out), but you can use a JMS request listener

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"
    xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
                        http://www.springframework.org/schema/jms
                        http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

    <!-- this is just an example, you can also use a datasource as the ctor arg -->
    <bean id="connectionFactoryOracleAQQueue" class="oracle.jms.AQjmsFactory" factory-method="getQueueConnectionFactory">
        <constructor-arg index="0">
            <value>jdbc:oracle:thin:@blrub442:1522:UB23</value>
        </constructor-arg>
        <constructor-arg index="1" type="java.util.Properties">
            <value></value>
        </constructor-arg>
    </bean>

    <bean id="oracleQueueCredentials" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
        <property name="targetConnectionFactory">
            <ref bean="connectionFactoryOracleAQQueue" />
        </property>
        <property name="username">
            <value>system</value>
        </property>
        <property name="password">
            <value>oracle</value>
        </property>
    </bean>

    <!-- Definitions for JMS Listener classes that we have created -->
    <bean id="aqMessageListener" class="com.misys.test.JmsRequestListener" />

    <bean id="aqEventQueue" class="com.misys.test.OracleAqQueueFactoryBean">
        <property name="connectionFactory" ref="oracleQueueCredentials" />
        <property name="oracleQueueName" value="BOZ_SINGLE_QUEUE" />
    </bean>

    <!-- The Spring DefaultMessageListenerContainer configuration. This bean is automatically loaded when the JMS application context is started -->
    <bean id="jmsContainer" class="com.misys.test.AQMessageListenerContainer" scope="singleton">
        <property name="connectionFactory" ref="oracleQueueCredentials" />
        <property name="destination" ref="aqEventQueue" />
        <property name="messageListener" ref="aqMessageListener" />
        <property name="sessionTransacted" value="false" />
    </bean>

</beans>

自定义消息侦听器容器

public class AQMessageListenerContainer extends DefaultMessageListenerContainer {

    @Override
    protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {
        return ((AQjmsSession) session).createConsumer(destination, getMessageSelector(),
                OracleAQObjORADataFactory.getORADataFactory(), null, isPubSubNoLocal());
    }
}

和请求侦听器的onMessage方法

and the request listener onMessage method

public void onMessage(Message msg) {
    try {
        AQjmsAdtMessage aQjmsAdtMessage = (AQjmsAdtMessage) msg;
        OracleAQObjORADataFactory obj = (OracleAQObjORADataFactory) aQjmsAdtMessage.getAdtPayload();

        System.out.println("Datetime: " + obj.getId());
        System.out.println("Payload: " + new String(obj.getPayload(), Charset.forName("UTF-8")));
    }
    catch (Exception jmsException) {
        if (logger.isErrorEnabled()) {
            logger.error(jmsException.getLocalizedMessage());
        }
    }
}

这篇关于如何在Java提交时排队入Oracle AQ表并通过JMS客户端使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆