在PL / SQL中创建队列用户的语法是什么? [英] What is the syntax for creating a queue subscriber in PL/SQL?

查看:236
本文介绍了在PL / SQL中创建队列用户的语法是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试创建一个队列和一个回调,当消息排队时触发,但我无法获得触发的回调。我做错了什么?

I'm trying to create a queue and a callback that triggers when a message is queued, but I can't get the callback to trigger. What am I doing wrong?

我有一个触发器来排队一个消息,我可以在队列消息表上看到它,我可以用手拿出队列

I have a trigger that enqueues a message, and I can see it on the queue message table, and I can dequeue it by hand and process it, I just can't get the callback to fire on enqueue.

BEGIN    
DBMS_AQADM.CREATE_QUEUE_TABLE (
  queue_table        => 'queue_message_table',
  queue_payload_type => 'queue_message_type',
  multiple_consumers => TRUE);

DBMS_AQADM.CREATE_QUEUE (
  queue_name  => 'message_queue',
  queue_table => 'queue_message_table');
DBMS_AQADM.START_QUEUE (queue_name => 'message_queue');
END;

CREATE OR REPLACE PROCEDURE queue_callback(
  context RAW, reginfo  SYS.AQ$_REG_INFO, descr SYS.AQ$_DESCRIPTOR, payload  RAW, payloadl NUMBER) AS

    queue_options       DBMS_AQ.DEQUEUE_OPTIONS_T;
    message_properties  DBMS_AQ.MESSAGE_PROPERTIES_T;
    my_message          queue_message_type;
    ret                 varchar2(200);
    message_id          RAW(16);
BEGIN
    DBMS_OUTPUT.PUT_LINE('Callback');
    queue_options.msgid := descr.msg_id;
    queue_options.consumer_name := descr.consumer_name;

    DBMS_AQ.DEQUEUE(
        queue_name => descr.queue_name,
        dequeue_options => queue_options,
        message_properties => message_properties,
        payload => my_message,
        msgid => message_id );
    ret := handle_message(my_message);
    commit;
END;

BEGIN
  DBMS_AQADM.ADD_SUBSCRIBER (queue_name => 'message_queue',
    subscriber => SYS.AQ$_AGENT('queue_subscriber', 'message_queue',NULL));
  DBMS_AQ.REGISTER (
    SYS.AQ$_REG_INFO_LIST(
      SYS.AQ$_REG_INFO(
        'MESSAGE_QUEUE:QUEUE_SUBSCRIBER',
        DBMS_AQ.NAMESPACE_AQ,
        'plsql://QUEUE_CALLBACK',
        HEXTORAW('FF')
      )
    ), 1
  );
END;


推荐答案

您需要小心数据库版本。已报告有关 Oracle Aq的问题的一些错误。
特别是我遵循此链接建立我的自己的示例,在Oracle 11gR2企业数据库中执行演示。我被abled到enqueue,dequeue,清除队列,但使用Dbms_Aq.Register创建的侦听器没有工作。
我运行同样的例子下载一个Oracle 11g R2 xe数据库,并且它工作。

You need to be careful with the database version. some bugs has been reported about issues with Oracle Aq. In particular I've followed this link to built my own sample, executing the demo in a Oracle 11gR2 enterprise database. I was abled to enqueue, dequeue, purge the queue but the listener created with Dbms_Aq.Register didn't work. I ran the same example downloading a Oracle 11g R2 xe database and it worked.

同样的例子在一个Oracle 10gR2实例中运行,它工作完美。

The same example was runned in a Oracle 10gR2 instance and it works perfectly.

使用aq时需要注意以下几点:

There are some things that you need to be careful on using aq:


  • 使用适当的参数添加订阅者

  • 使用适当的命名空间使用Dbms_Aq.Register注册侦听器

  • 使用多个consumer标记声明队列表

  • 使用适当的权限打包和处理队列

  • 在某些情况下,如果队列无效,请使用队列的限定名称。

  • use the appropriate parameters adding the subscriber
  • use the appropriate namespace registering the listener with Dbms_Aq.Register
  • use the multiple consumers flag declaring the queue table
  • use the appropriate permissions to packages and to handle the queues
  • use the qualified name of the queues in some cases if it didn't works.

'
首先创建模式

' First create the schema

connect / as sysdba
-- @?/rdbms/admin/dbmsaqad.sql --(install if you don't have aq installed yet)

-- create the user and permissions
create user aqadmin identified by aqadmin default tablespace users temporary tablespace temp;
GRANT create session TO aqadmin;
grant connect, resource to aqadmin;
GRANT aq_administrator_role TO aqadmin IDENTIFIED BY aqadmin;
GRANT execute ON dbms_aq TO aqadmin;
GRANT execute ON dbms_aqadm TO aqadmin;

创建ddl对象

CREATE TABLE demo_queue_message_table
( message VARCHAR2(4000) );

创建aq-objects

Create the aq-objects

create or replace type demo_queue_payload_type as object(message varchar2(4000)) ;
/

begin
  DBMS_AQADM.CREATE_QUEUE_TABLE (queue_table => 'demo_queue_table', queue_payload_type => 'demo_queue_payload_type',multiple_consumers => TRUE);
  DBMS_AQADM.CREATE_QUEUE (queue_name => 'demo_queue', queue_table => 'demo_queue_table');
  DBMS_AQADM.START_QUEUE('demo_queue');
end;
/

CREATE or replace PROCEDURE demo_queue_callback_procedure(
                 context  RAW,
                 reginfo  SYS.AQ$_REG_INFO,
                 descr    SYS.AQ$_DESCRIPTOR,
                 payload  RAW,
                 payloadl NUMBER
                 ) AS

   r_dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;
   r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
   v_message_handle     RAW(16);
   o_payload            demo_queue_payload_type;

BEGIN

   r_dequeue_options.msgid := descr.msg_id;
   r_dequeue_options.consumer_name := descr.consumer_name;

   DBMS_AQ.DEQUEUE(
      queue_name         => descr.queue_name,
      dequeue_options    => r_dequeue_options,
      message_properties => r_message_properties,
      payload            => o_payload,
      msgid              => v_message_handle
      );

   INSERT INTO demo_queue_message_table ( message )
   VALUES ( 'Message [' || o_payload.message || '] ' ||
            'dequeued at [' || TO_CHAR( SYSTIMESTAMP,
                                        'DD-MON-YYYY HH24:MI:SS.FF3' ) || ']' );
   COMMIT;

END;
/


BEGIN

   DBMS_AQADM.ADD_SUBSCRIBER (
      queue_name => 'demo_queue',
      subscriber => SYS.AQ$_AGENT(
                       'demo_queue_subscriber',
                       NULL,
                       NULL )
      );

    DBMS_AQ.REGISTER (
       SYS.AQ$_REG_INFO_LIST(
          SYS.AQ$_REG_INFO(
             'DEMO_QUEUE:DEMO_QUEUE_SUBSCRIBER',
             DBMS_AQ.NAMESPACE_AQ,
             'plsql://DEMO_QUEUE_CALLBACK_PROCEDURE',
             HEXTORAW('FF')
             )
          ),
       1
       );
END;
/

最后测试队列

DECLARE

   r_enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
   r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
   v_message_handle     RAW(16);
   o_payload            demo_queue_payload_type;

BEGIN

   o_payload := demo_queue_payload_type(
                   TO_CHAR(SYSTIMESTAMP, 'DD-MON-YYYY HH24:MI:SS.FF3' )
                   );

   DBMS_AQ.ENQUEUE(
      queue_name         => 'demo_queue',
      enqueue_options    => r_enqueue_options,
      message_properties => r_message_properties,
      payload            => o_payload,
      msgid              => v_message_handle
      );

  COMMIT;

END;
/

这篇关于在PL / SQL中创建队列用户的语法是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆