在 PL/SQL 中创建队列订阅者的语法是什么? [英] What is the syntax for creating a queue subscriber in PL/SQL?

查看:33
本文介绍了在 PL/SQL 中创建队列订阅者的语法是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试创建一个队列和一个在消息排队时触发的回调,但我无法触发回调.我做错了什么?

I'm trying to create a queue and a callback that triggers when a message is queued, but I can't get the callback to trigger. What am I doing wrong?

我有一个将消息入队的触发器,我可以在队列消息表上看到它,我可以手动将它出队并处理它,我只是无法让回调在入队时触发.

I have a trigger that enqueues a message, and I can see it on the queue message table, and I can dequeue it by hand and process it, I just can't get the callback to fire on enqueue.

BEGIN    
DBMS_AQADM.CREATE_QUEUE_TABLE (
  queue_table        => 'queue_message_table',
  queue_payload_type => 'queue_message_type',
  multiple_consumers => TRUE);

DBMS_AQADM.CREATE_QUEUE (
  queue_name  => 'message_queue',
  queue_table => 'queue_message_table');
DBMS_AQADM.START_QUEUE (queue_name => 'message_queue');
END;

CREATE OR REPLACE PROCEDURE queue_callback(
  context RAW, reginfo  SYS.AQ$_REG_INFO, descr SYS.AQ$_DESCRIPTOR, payload  RAW, payloadl NUMBER) AS

    queue_options       DBMS_AQ.DEQUEUE_OPTIONS_T;
    message_properties  DBMS_AQ.MESSAGE_PROPERTIES_T;
    my_message          queue_message_type;
    ret                 varchar2(200);
    message_id          RAW(16);
BEGIN
    DBMS_OUTPUT.PUT_LINE('Callback');
    queue_options.msgid := descr.msg_id;
    queue_options.consumer_name := descr.consumer_name;

    DBMS_AQ.DEQUEUE(
        queue_name => descr.queue_name,
        dequeue_options => queue_options,
        message_properties => message_properties,
        payload => my_message,
        msgid => message_id );
    ret := handle_message(my_message);
    commit;
END;

BEGIN
  DBMS_AQADM.ADD_SUBSCRIBER (queue_name => 'message_queue',
    subscriber => SYS.AQ$_AGENT('queue_subscriber', 'message_queue',NULL));
  DBMS_AQ.REGISTER (
    SYS.AQ$_REG_INFO_LIST(
      SYS.AQ$_REG_INFO(
        'MESSAGE_QUEUE:QUEUE_SUBSCRIBER',
        DBMS_AQ.NAMESPACE_AQ,
        'plsql://QUEUE_CALLBACK',
        HEXTORAW('FF')
      )
    ), 1
  );
END;

推荐答案

需要注意数据库版本.报告了一些关于 Oracle Aq 问题 的错误.特别是我已经按照 this link 构建了我自己的示例,执行Oracle 11gR2 企业数据库中的演示.我能够入队、出队、清除队列,但使用 Dbms_Aq.Register 创建的侦听器不起作用.我运行了相同的示例,下载了一个 Oracle 11g R2 xe 数据库,它运行正常.

You need to be careful with the database version. some bugs has been reported about issues with Oracle Aq. In particular I've followed this link to built my own sample, executing the demo in a Oracle 11gR2 enterprise database. I was abled to enqueue, dequeue, purge the queue but the listener created with Dbms_Aq.Register didn't work. I ran the same example downloading a Oracle 11g R2 xe database and it worked.

同样的示例在 Oracle 10gR2 实例中运行,并且运行良好.

The same example was runned in a Oracle 10gR2 instance and it works perfectly.

在使用 aq 时需要注意一些事项:

There are some things that you need to be careful on using aq:

  • 使用适当的参数添加订阅者
  • 使用适当的命名空间向 Dbms_Aq.Register 注册侦听器
  • 使用多个消费者标志来声明队列表
  • 使用适当的权限来打包和处理队列
  • 如果不起作用,请在某些情况下使用队列的限定名称.

'首先创建架构

connect / as sysdba
-- @?/rdbms/admin/dbmsaqad.sql --(install if you don't have aq installed yet)

-- create the user and permissions
create user aqadmin identified by aqadmin default tablespace users temporary tablespace temp;
GRANT create session TO aqadmin;
grant connect, resource to aqadmin;
GRANT aq_administrator_role TO aqadmin IDENTIFIED BY aqadmin;
GRANT execute ON dbms_aq TO aqadmin;
GRANT execute ON dbms_aqadm TO aqadmin;

创建 ddl 对象

CREATE TABLE demo_queue_message_table
( message VARCHAR2(4000) );

创建 aq 对象

create or replace type demo_queue_payload_type as object(message varchar2(4000)) ;
/

begin
  DBMS_AQADM.CREATE_QUEUE_TABLE (queue_table => 'demo_queue_table', queue_payload_type => 'demo_queue_payload_type',multiple_consumers => TRUE);
  DBMS_AQADM.CREATE_QUEUE (queue_name => 'demo_queue', queue_table => 'demo_queue_table');
  DBMS_AQADM.START_QUEUE('demo_queue');
end;
/

CREATE or replace PROCEDURE demo_queue_callback_procedure(
                 context  RAW,
                 reginfo  SYS.AQ$_REG_INFO,
                 descr    SYS.AQ$_DESCRIPTOR,
                 payload  RAW,
                 payloadl NUMBER
                 ) AS

   r_dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;
   r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
   v_message_handle     RAW(16);
   o_payload            demo_queue_payload_type;

BEGIN

   r_dequeue_options.msgid := descr.msg_id;
   r_dequeue_options.consumer_name := descr.consumer_name;

   DBMS_AQ.DEQUEUE(
      queue_name         => descr.queue_name,
      dequeue_options    => r_dequeue_options,
      message_properties => r_message_properties,
      payload            => o_payload,
      msgid              => v_message_handle
      );

   INSERT INTO demo_queue_message_table ( message )
   VALUES ( 'Message [' || o_payload.message || '] ' ||
            'dequeued at [' || TO_CHAR( SYSTIMESTAMP,
                                        'DD-MON-YYYY HH24:MI:SS.FF3' ) || ']' );
   COMMIT;

END;
/


BEGIN

   DBMS_AQADM.ADD_SUBSCRIBER (
      queue_name => 'demo_queue',
      subscriber => SYS.AQ$_AGENT(
                       'demo_queue_subscriber',
                       NULL,
                       NULL )
      );

    DBMS_AQ.REGISTER (
       SYS.AQ$_REG_INFO_LIST(
          SYS.AQ$_REG_INFO(
             'DEMO_QUEUE:DEMO_QUEUE_SUBSCRIBER',
             DBMS_AQ.NAMESPACE_AQ,
             'plsql://DEMO_QUEUE_CALLBACK_PROCEDURE',
             HEXTORAW('FF')
             )
          ),
       1
       );
END;
/

最后测试队列

DECLARE

   r_enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
   r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
   v_message_handle     RAW(16);
   o_payload            demo_queue_payload_type;

BEGIN

   o_payload := demo_queue_payload_type(
                   TO_CHAR(SYSTIMESTAMP, 'DD-MON-YYYY HH24:MI:SS.FF3' )
                   );

   DBMS_AQ.ENQUEUE(
      queue_name         => 'demo_queue',
      enqueue_options    => r_enqueue_options,
      message_properties => r_message_properties,
      payload            => o_payload,
      msgid              => v_message_handle
      );

  COMMIT;

END;
/

这篇关于在 PL/SQL 中创建队列订阅者的语法是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆