AQ

See Oracle Streams, How to jobs
Requirements
The value of the system parameter Job_queue_processes must be at least 2 and aq_tm_process must be at least 1, for propagation to take place. 
Grant the permissions EXECUTE ON DBMS_AQ and EXECUTE ON DBMS_AQADM for the database users between which the propagation has to take place. 
No need to set AQ_TM_PROCESSES since 10.1; QMNn will take care

 

Concepts
MSG_STATE may be: WAITING, READY, PROCESSED, EXPIRED
MSG_STATE is changed from READY to PROCESSED only when the dequeuer issue the COMMIT
If the dequeuer ROLLBACK(or an exception) the message remain READY and RETRY_COUNT is increased

 

Count and see not dequeued messages
select * from aq$prova_qt where queue='MSG_QUEUE' and MSG_STATE='READY'

--If you want to see also those message in processing phase, not yet committed, then you must flag the
--dequeuers sessions usin dbms_application_info. See an example in http://asktom.oracle.com/pls/asktom/f?p=100:11:0::::P11_QUESTION_ID:5011695395080

function queueHasData return number is
  result number;
begin
  select count(*) into result from dual where exists (select 1 from AQ$QTABLE_QT where msg_state = 'READY' and queue='MY_QUEUE');
  return result;
end;

 

Wait for the queue to be empty
procedure queueWaitForEmpty(timeoutSecs number) is
  startTime timestamp;
  sleepCicle number := 5;
begin
  select current_timestamp into startTime from dual;
  while(queueHasData=1) loop
    if(intervalToSeconds(systimestamp-startTime)>timeoutSecs) then
      raise_application_error(-20001, 'Timeout waiting for the queue to be empty');
    else
      dbms_lock.sleep(sleepCicle);
    end if;
  end loop;
end;

 

Multithreading(Jobs) using an Advanced Queue

Please see note 116833.1

Create the user
create user localaquser identified by localaquser;
grant connect, resource, aq_administrator_role to localaquser;
grant execute on sys.dbms_aqadm to localaquser;
grant execute on sys.dbms_aq to localaquser;
grant create type to localaquser;
begin
	dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','localaquser',FALSE);
	dbms_aqadm.grant_system_privilege('DEQUEUE_ANY','localaquser',FALSE);
end;

Create and fill a table
--part of the application example, you'll change this on your needs
create table prova (
id number primary key,
nome varchar2(50)
)

declare
  i number;
begin
  for i in 0..55 loop
    insert into prova(id, nome) values(i, 'name' || i);
  end loop;
end;

--This table will be filled by dequeuer processes, 
--part of the application example, you'll change this on your needs
create table prova_dest (
  id number primary key
)

--this contains the application errors during the dequeing,
--Since we are multithreading, the jobs will not stop on errors
create table prova_errors(
  id number,
  d date default sysdate not null,
  error varchar2(4000)
)


Prepare the queue
create type message_type as object(id number);

begin
  DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'prova_qt', queue_payload_type => 'message_type',
  	multiple_consumers => true);
end;

begin
  DBMS_AQADM.CREATE_QUEUE (queue_name => 'msg_queue', queue_table => 'prova_qt');
end;

begin
	 DBMS_AQADM.START_QUEUE (queue_name => 'msg_queue');
end;

Enqueue process
create or replace procedure startProcesses(jobs number, mQName varchar2, mRecipient varchar2, whatDequeue varchar2) is
  enqueue_options       dbms_aq.enqueue_options_t;
-- enqueue_options_t is pre-defined AQ type
  message_properties    dbms_aq.message_properties_t;
-- message_properties_t is pre-defined AQ type
  message_handle        RAW(16);
  message               message_type;
  recipients          DBMS_AQ.aq$_recipient_list_t;
  i number;
  cursor c1 is 
    select id from prova
    minus
    select id from prova_dest; 
  mjob BINARY_INTEGER;
begin
  recipients(1) := sys.aq$_agent(mRecipient, NULL, NULL);
  message_properties.recipient_list := recipients;
  FOR rc1 in c1 LOOP
    message := message_type(rc1.id);
    dbms_aq.enqueue(
      queue_name            => mQName,
      enqueue_options       => enqueue_options,
      message_properties    => message_properties,
      payload               => message,
      -- the actual message to enqueue, object_type or RAW
      msgid                 => message_handle);
    commit;
  END LOOP;
  for i in 0..jobs-1 loop
    DBMS_JOB.SUBMIT (job => mJob,
      what => whatDequeue, 
      --what => 'begin ALDO.processQueue; end;'
      --next_date IN DATE DEFAULT sysdate,
      --interval IN VARCHAR2 DEFAULT 'null',
      --no_parse IN BOOLEAN DEFAULT FALSE,
      --instance IN BINARY_INTEGER DEFAULT any_instance,
      force => true
      );
    --Commit must be done immediately after the statement
    commit;
  end loop;
  --you may want to block here up to the queue is empty
end;

Dequeue procedure
CREATE OR REPLACE procedure ALDO.processQueue is
  dequeue_options      dbms_aq.dequeue_options_t;
  -- dequeue_options_t is pre-defined AQ type
  message_properties   dbms_aq.message_properties_t;
  -- message_properties_t is pre-defined AQ type
  message_handle        RAW(16);
  message               message_type;
  -- aq.message_type is user-defined object 
  timeout    EXCEPTION;
  PRAGMA EXCEPTION_INIT (timeout, -25228);
  merror varchar2(4000);
begin
  dequeue_options.consumer_name := 'ALDO';
  dequeue_options.wait := 2;
  while(true) loop
    dbms_aq.dequeue(
      queue_name            => 'msg_queue',
      dequeue_options       => dequeue_options,
      message_properties    => message_properties,
      payload               => message,
      msgid                 => message_handle);
    begin
      --do your user operation here, you normally call your procedure here
      if(message.id=4) then
        --we generate a dummy error for this id
        raise_application_error(-20001, 'MY error');
      else
        insert into prova_dest values(message.id);
      end if;
      --commit also remove the message from the queue
      commit;
    exception
      --since we are in multithreading here we log the exception and continue
      when others then
      begin
        merror := DBMS_UTILITY.FORMAT_ERROR_STACK  ||
                  DBMS_UTILITY.FORMAT_ERROR_BACKTRACE;
        insert into prova_errors(id, error) values(message.id, merror);
      end;
    end;
  end loop;
exception
  when timeout then null;
end;
/

create or replace procedure purgeQueue is
    mpurge_options DBMS_AQADM.aq$_purge_options_t;
  begin
    mpurge_options.block := true;
    mpurge_options.delivery_mode := DBMS_AQADM.PERSISTENT_OR_BUFFERED;
    DBMS_AQADM.PURGE_QUEUE_TABLE(queue_table => 'MTS_HVIEW_ABN_HISTORY.POPULATIONS_QT', purge_condition => null,
      purge_options => mpurge_options);
    commit;
  end;
  
begin
  delete from prova_dest;
  delete from prova_errors;
  startProcesses(4, 'msg_queue', 'ALDO', 'begin ALDO.processQueue; end;');
end;

select * from user_jobs

select id from prova
minus 
select * from prova_dest

select count(*) from prova_qt

alter system flush shared_pool

select * from prova_errors

 

CLEAN UP
--Stop the queue
begin
DBMS_AQADM.STOP_QUEUE('localaquser.local_msg_queue');
end;
--Drop the queue
begin
	 DBMS_AQADM.DROP_QUEUE('localaquser.local_msg_queue');
end;
--Drop the queue table
begin
	 DBMS_AQADM.DROP_QUEUE_TABLE('localaquser.local_queue_table');
end;
Drop the message type
drop type localaquser.message_type;
Drop the user
drop user localaquser;

 

Propagation
Exactly recreate the source queue on the destination database

#Plz replace the names used here with the example above
  BEGIN
  dbms_aqadm.add_subscriber (
      queue_name => 'STRMADMIN.MTS_Q_B1',
      subscriber => sys.aq$_agent('STRMADMIN','STRMADMIN.MTS_Q_B1@dmtaa',null)
  );
  END;
  /
  
  BEGIN
    dbms_aqadm.schedule_propagation(
    queue_name => 'STRMADMIN.MTS_Q_B1',
    latency => 0);
  END;
  /
  
  BEGIN
    dbms_aqadm.schedule_propagation(
    queue_name => 'STRMADMIN.MTS_Q_B1',
    destination => 'dmtaa',
    latency => 0);
  END;

 

Verify queue types for propagation
--Use this to troubleshoot error
DECLARE 
rc      BINARY_INTEGER; 
BEGIN 
/* Verify that the queues aquser.q1def and aquser.q2def in the local database 
   have the same payload type */ 
   DBMS_AQADM.VERIFY_QUEUE_TYPES(
      src_queue_name  => 'STRMADMIN.Q_MTS', 
      dest_queue_name => 'STRMADMIN.Q_MTS',
      destination => 'SMTAA.ST',
      rc              => rc); 
   DBMS_OUTPUT.PUT_LINE(rc);
END;

select * from sys.aq$_message_types; 

return 1 is good, 0 no good