See Oracle Streams, How to jobs
Requirements The value of the system parameter Job_queue_processes must be at least 2 and aq_tm_process must be at least 1, for propagation to take place. Grant the permissions EXECUTE ON DBMS_AQ and EXECUTE ON DBMS_AQADM for the database users between which the propagation has to take place. No need to set AQ_TM_PROCESSES since 10.1; QMNn will take care |
Concepts MSG_STATE may be: WAITING, READY, PROCESSED, EXPIRED MSG_STATE is changed from READY to PROCESSED only when the dequeuer issue the COMMIT If the dequeuer ROLLBACK(or an exception) the message remain READY and RETRY_COUNT is increased |
Count and see not dequeued messages select * from aq$prova_qt where queue='MSG_QUEUE' and MSG_STATE='READY' --If you want to see also those message in processing phase, not yet committed, then you must flag the --dequeuers sessions usin dbms_application_info. See an example in http://asktom.oracle.com/pls/asktom/f?p=100:11:0::::P11_QUESTION_ID:5011695395080 function queueHasData return number is result number; begin select count(*) into result from dual where exists (select 1 from AQ$QTABLE_QT where msg_state = 'READY' and queue='MY_QUEUE'); return result; end; |
Wait for the queue to be empty
procedure queueWaitForEmpty(timeoutSecs number) is
startTime timestamp;
sleepCicle number := 5;
begin
select current_timestamp into startTime from dual;
while(queueHasData=1) loop
if(intervalToSeconds(systimestamp-startTime)>timeoutSecs) then
raise_application_error(-20001, 'Timeout waiting for the queue to be empty');
else
dbms_lock.sleep(sleepCicle);
end if;
end loop;
end; |
| Multithreading(Jobs) using an Advanced Queue
Please see note 116833.1 Create the user
create user localaquser identified by localaquser;
grant connect, resource, aq_administrator_role to localaquser;
grant execute on sys.dbms_aqadm to localaquser;
grant execute on sys.dbms_aq to localaquser;
grant create type to localaquser;
begin
dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','localaquser',FALSE);
dbms_aqadm.grant_system_privilege('DEQUEUE_ANY','localaquser',FALSE);
end;
Create and fill a table
--part of the application example, you'll change this on your needs
create table prova (
id number primary key,
nome varchar2(50)
)
declare
i number;
begin
for i in 0..55 loop
insert into prova(id, nome) values(i, 'name' || i);
end loop;
end;
--This table will be filled by dequeuer processes,
--part of the application example, you'll change this on your needs
create table prova_dest (
id number primary key
)
--this contains the application errors during the dequeing,
--Since we are multithreading, the jobs will not stop on errors
create table prova_errors(
id number,
d date default sysdate not null,
error varchar2(4000)
)
Prepare the queue
create type message_type as object(id number);
begin
DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'prova_qt', queue_payload_type => 'message_type',
multiple_consumers => true);
end;
begin
DBMS_AQADM.CREATE_QUEUE (queue_name => 'msg_queue', queue_table => 'prova_qt');
end;
begin
DBMS_AQADM.START_QUEUE (queue_name => 'msg_queue');
end;
Enqueue process
create or replace procedure startProcesses(jobs number, mQName varchar2, mRecipient varchar2, whatDequeue varchar2) is
enqueue_options dbms_aq.enqueue_options_t;
-- enqueue_options_t is pre-defined AQ type
message_properties dbms_aq.message_properties_t;
-- message_properties_t is pre-defined AQ type
message_handle RAW(16);
message message_type;
recipients DBMS_AQ.aq$_recipient_list_t;
i number;
cursor c1 is
select id from prova
minus
select id from prova_dest;
mjob BINARY_INTEGER;
begin
recipients(1) := sys.aq$_agent(mRecipient, NULL, NULL);
message_properties.recipient_list := recipients;
FOR rc1 in c1 LOOP
message := message_type(rc1.id);
dbms_aq.enqueue(
queue_name => mQName,
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
-- the actual message to enqueue, object_type or RAW
msgid => message_handle);
commit;
END LOOP;
for i in 0..jobs-1 loop
DBMS_JOB.SUBMIT (job => mJob,
what => whatDequeue,
--what => 'begin ALDO.processQueue; end;'
--next_date IN DATE DEFAULT sysdate,
--interval IN VARCHAR2 DEFAULT 'null',
--no_parse IN BOOLEAN DEFAULT FALSE,
--instance IN BINARY_INTEGER DEFAULT any_instance,
force => true
);
--Commit must be done immediately after the statement
commit;
end loop;
--you may want to block here up to the queue is empty
end;
Dequeue procedure
CREATE OR REPLACE procedure ALDO.processQueue is
dequeue_options dbms_aq.dequeue_options_t;
-- dequeue_options_t is pre-defined AQ type
message_properties dbms_aq.message_properties_t;
-- message_properties_t is pre-defined AQ type
message_handle RAW(16);
message message_type;
-- aq.message_type is user-defined object
timeout EXCEPTION;
PRAGMA EXCEPTION_INIT (timeout, -25228);
merror varchar2(4000);
begin
dequeue_options.consumer_name := 'ALDO';
dequeue_options.wait := 2;
while(true) loop
dbms_aq.dequeue(
queue_name => 'msg_queue',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
begin
--do your user operation here, you normally call your procedure here
if(message.id=4) then
--we generate a dummy error for this id
raise_application_error(-20001, 'MY error');
else
insert into prova_dest values(message.id);
end if;
--commit also remove the message from the queue
commit;
exception
--since we are in multithreading here we log the exception and continue
when others then
begin
merror := DBMS_UTILITY.FORMAT_ERROR_STACK ||
DBMS_UTILITY.FORMAT_ERROR_BACKTRACE;
insert into prova_errors(id, error) values(message.id, merror);
end;
end;
end loop;
exception
when timeout then null;
end;
/
create or replace procedure purgeQueue is
mpurge_options DBMS_AQADM.aq$_purge_options_t;
begin
mpurge_options.block := true;
mpurge_options.delivery_mode := DBMS_AQADM.PERSISTENT_OR_BUFFERED;
DBMS_AQADM.PURGE_QUEUE_TABLE(queue_table => 'MTS_HVIEW_ABN_HISTORY.POPULATIONS_QT', purge_condition => null,
purge_options => mpurge_options);
commit;
end;
begin
delete from prova_dest;
delete from prova_errors;
startProcesses(4, 'msg_queue', 'ALDO', 'begin ALDO.processQueue; end;');
end;
select * from user_jobs
select id from prova
minus
select * from prova_dest
select count(*) from prova_qt
alter system flush shared_pool
select * from prova_errors
|
CLEAN UP --Stop the queue
begin
DBMS_AQADM.STOP_QUEUE('localaquser.local_msg_queue');
end;
--Drop the queue
begin
DBMS_AQADM.DROP_QUEUE('localaquser.local_msg_queue');
end;
--Drop the queue table
begin
DBMS_AQADM.DROP_QUEUE_TABLE('localaquser.local_queue_table');
end;
Drop the message type drop type localaquser.message_type; Drop the user drop user localaquser; |
Propagation
Exactly recreate the source queue on the destination database
#Plz replace the names used here with the example above
BEGIN
dbms_aqadm.add_subscriber (
queue_name => 'STRMADMIN.MTS_Q_B1',
subscriber => sys.aq$_agent('STRMADMIN','STRMADMIN.MTS_Q_B1@dmtaa',null)
);
END;
/
BEGIN
dbms_aqadm.schedule_propagation(
queue_name => 'STRMADMIN.MTS_Q_B1',
latency => 0);
END;
/
BEGIN
dbms_aqadm.schedule_propagation(
queue_name => 'STRMADMIN.MTS_Q_B1',
destination => 'dmtaa',
latency => 0);
END;
|
Verify queue types for propagation
--Use this to troubleshoot error
DECLARE
rc BINARY_INTEGER;
BEGIN
/* Verify that the queues aquser.q1def and aquser.q2def in the local database
have the same payload type */
DBMS_AQADM.VERIFY_QUEUE_TYPES(
src_queue_name => 'STRMADMIN.Q_MTS',
dest_queue_name => 'STRMADMIN.Q_MTS',
destination => 'SMTAA.ST',
rc => rc);
DBMS_OUTPUT.PUT_LINE(rc);
END;
select * from sys.aq$_message_types;
return 1 is good, 0 no good
|