在 PL/SQL 中创建队列订阅者的语法是什么? [英] What is the syntax for creating a queue subscriber in PL/SQL?
问题描述
我正在尝试创建一个队列和一个在消息排队时触发的回调,但我无法触发回调.我做错了什么?
I'm trying to create a queue and a callback that triggers when a message is queued, but I can't get the callback to trigger. What am I doing wrong?
我有一个将消息入队的触发器,我可以在队列消息表上看到它,我可以手动将它出队并处理它,我只是无法让回调在入队时触发.
I have a trigger that enqueues a message, and I can see it on the queue message table, and I can dequeue it by hand and process it, I just can't get the callback to fire on enqueue.
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE (
queue_table => 'queue_message_table',
queue_payload_type => 'queue_message_type',
multiple_consumers => TRUE);
DBMS_AQADM.CREATE_QUEUE (
queue_name => 'message_queue',
queue_table => 'queue_message_table');
DBMS_AQADM.START_QUEUE (queue_name => 'message_queue');
END;
CREATE OR REPLACE PROCEDURE queue_callback(
context RAW, reginfo SYS.AQ$_REG_INFO, descr SYS.AQ$_DESCRIPTOR, payload RAW, payloadl NUMBER) AS
queue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
my_message queue_message_type;
ret varchar2(200);
message_id RAW(16);
BEGIN
DBMS_OUTPUT.PUT_LINE('Callback');
queue_options.msgid := descr.msg_id;
queue_options.consumer_name := descr.consumer_name;
DBMS_AQ.DEQUEUE(
queue_name => descr.queue_name,
dequeue_options => queue_options,
message_properties => message_properties,
payload => my_message,
msgid => message_id );
ret := handle_message(my_message);
commit;
END;
BEGIN
DBMS_AQADM.ADD_SUBSCRIBER (queue_name => 'message_queue',
subscriber => SYS.AQ$_AGENT('queue_subscriber', 'message_queue',NULL));
DBMS_AQ.REGISTER (
SYS.AQ$_REG_INFO_LIST(
SYS.AQ$_REG_INFO(
'MESSAGE_QUEUE:QUEUE_SUBSCRIBER',
DBMS_AQ.NAMESPACE_AQ,
'plsql://QUEUE_CALLBACK',
HEXTORAW('FF')
)
), 1
);
END;
推荐答案
需要注意数据库版本.报告了一些关于 Oracle Aq 问题 的错误.特别是我已经按照 this link 构建了我自己的示例,执行Oracle 11gR2 企业数据库中的演示.我能够入队、出队、清除队列,但使用 Dbms_Aq.Register 创建的侦听器不起作用.我运行了相同的示例,下载了一个 Oracle 11g R2 xe 数据库,它运行正常.
You need to be careful with the database version. some bugs has been reported about issues with Oracle Aq. In particular I've followed this link to built my own sample, executing the demo in a Oracle 11gR2 enterprise database. I was abled to enqueue, dequeue, purge the queue but the listener created with Dbms_Aq.Register didn't work. I ran the same example downloading a Oracle 11g R2 xe database and it worked.
同样的示例在 Oracle 10gR2 实例中运行,并且运行良好.
The same example was runned in a Oracle 10gR2 instance and it works perfectly.
在使用 aq 时需要注意一些事项:
There are some things that you need to be careful on using aq:
- 使用适当的参数添加订阅者
- 使用适当的命名空间向 Dbms_Aq.Register 注册侦听器
- 使用多个消费者标志来声明队列表
- 使用适当的权限来打包和处理队列
- 如果不起作用,请在某些情况下使用队列的限定名称.
'首先创建架构
connect / as sysdba
-- @?/rdbms/admin/dbmsaqad.sql --(install if you don't have aq installed yet)
-- create the user and permissions
create user aqadmin identified by aqadmin default tablespace users temporary tablespace temp;
GRANT create session TO aqadmin;
grant connect, resource to aqadmin;
GRANT aq_administrator_role TO aqadmin IDENTIFIED BY aqadmin;
GRANT execute ON dbms_aq TO aqadmin;
GRANT execute ON dbms_aqadm TO aqadmin;
创建 ddl 对象
CREATE TABLE demo_queue_message_table
( message VARCHAR2(4000) );
创建 aq 对象
create or replace type demo_queue_payload_type as object(message varchar2(4000)) ;
/
begin
DBMS_AQADM.CREATE_QUEUE_TABLE (queue_table => 'demo_queue_table', queue_payload_type => 'demo_queue_payload_type',multiple_consumers => TRUE);
DBMS_AQADM.CREATE_QUEUE (queue_name => 'demo_queue', queue_table => 'demo_queue_table');
DBMS_AQADM.START_QUEUE('demo_queue');
end;
/
CREATE or replace PROCEDURE demo_queue_callback_procedure(
context RAW,
reginfo SYS.AQ$_REG_INFO,
descr SYS.AQ$_DESCRIPTOR,
payload RAW,
payloadl NUMBER
) AS
r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;
BEGIN
r_dequeue_options.msgid := descr.msg_id;
r_dequeue_options.consumer_name := descr.consumer_name;
DBMS_AQ.DEQUEUE(
queue_name => descr.queue_name,
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
INSERT INTO demo_queue_message_table ( message )
VALUES ( 'Message [' || o_payload.message || '] ' ||
'dequeued at [' || TO_CHAR( SYSTIMESTAMP,
'DD-MON-YYYY HH24:MI:SS.FF3' ) || ']' );
COMMIT;
END;
/
BEGIN
DBMS_AQADM.ADD_SUBSCRIBER (
queue_name => 'demo_queue',
subscriber => SYS.AQ$_AGENT(
'demo_queue_subscriber',
NULL,
NULL )
);
DBMS_AQ.REGISTER (
SYS.AQ$_REG_INFO_LIST(
SYS.AQ$_REG_INFO(
'DEMO_QUEUE:DEMO_QUEUE_SUBSCRIBER',
DBMS_AQ.NAMESPACE_AQ,
'plsql://DEMO_QUEUE_CALLBACK_PROCEDURE',
HEXTORAW('FF')
)
),
1
);
END;
/
最后测试队列
DECLARE
r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;
BEGIN
o_payload := demo_queue_payload_type(
TO_CHAR(SYSTIMESTAMP, 'DD-MON-YYYY HH24:MI:SS.FF3' )
);
DBMS_AQ.ENQUEUE(
queue_name => 'demo_queue',
enqueue_options => r_enqueue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
COMMIT;
END;
/
这篇关于在 PL/SQL 中创建队列订阅者的语法是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!