Oracle Streams

See Advanced Queue
Troubleshooting http://www.acs.ilstu.edu/docs/Oracle/server.101/b10727/strms_tr.htm#1006156
See note 290605.1 to configure strmmon
My Forum, see note 265933.1 to merge more tables from source in to one dest table(not possible)
        , see http://forums.oracle.com/forums/thread.jspa?messageID=2828984&#2828984 
          for Streams on RAC and listener
See Implementing an history database for a real life example and requirement using Streams

Requirements
Officially global_names=true is required but referring to metalink note 344239.1 it should work even if false
Archive log required on source db
I recommend a FORCE LOGGING on db otherwise you lose the NOLOGGING sql DML
Premises for this document and references to untested issues
As usual every line of code was personally tested and verified.
Streams errors encountered are included in Oracle Managed Errors, simply search it.
This is not a document were you just copy and paste to have a Streams environment working, instead for every statement 
altering the Streams structure I'll provide detailed methods for troubleshooting and analyzing to see what happens behind the scene
I just remember the basic Streams structure:

Source DB                   
 _________                   Dest DB
| Capture |                  _______
|    |    |                 |       |
|  Queue ------> Propagate -> Queue |
|_________|                 |   |   |
                            | Apply |
                            |_______|

You may also have everything on a single database:

   DB
 _________
| Capture |
|    |    |
|  Queue  |
|    |    |
|  Apply  |
|_________|

Basically for each apply there is a queue.
STRMADMIN user is a DBA user and should not contain logic procedures, you may drop it and recreate on demand.

Anyway the examples below are based from a simple application retaining historical data changes on the Streams destination,
this is a good real life example to start.
Source table: 
CREATE TABLE TEST.SOURCETABLE (
  NOME  VARCHAR2(50 BYTE),
  ID    NUMBER primary key
)

Dest table:
CREATE TABLE TEST.SOURCETABLE (
  ID            NUMBER,
  NOME          VARCHAR2(50 BYTE),
  NOME_OLD      VARCHAR2(50 BYTE),
  ACTION        VARCHAR2(30 CHAR),
  DATE_LOG      DATE                            DEFAULT sysdate               NOT NULL,
  USERNAME_LOG  VARCHAR2(100 CHAR)
)

Using different owner/table_name between instances
Using a different owner/table_name for the destination is not supported and you get several errors during environment set up,
however you can do it since you'll use a custom dml handler to apply changes. Simply create a table with one column on destination 
with the same owner/table_name of source and then drop it, if you want to avoid errors on alert log on destination.
! Its a very complicated matter applying ddl if the destination owner/table_name is different since on the ddl handler
! you only have the row ddl text to use.
#Untested
  See DBMS_STREAMS_ADM.RENAME_SCHEMA
  Hub and Spoke on the forum http://forums.oracle.com/forums/thread.jspa?threadID=683801&tstart=30
Different OS platform
Is possible, check note UNTESTED 418755.1, 224255.1
Compare and determine schema differences UNTESTED
10g DBMS_RECTIFIER_DIFF.DIFFERENCES Note: 1062732.6
11g DBMS_COMPARISON.COMPARE and DBMS_COMPARISON.CONVERGE Note: 463295.1
My Forum - Accessing others LCR$_ROW_RECORD in same transaction from DML handler
http://forums.oracle.com/forums/thread.jspa?messageID=2784542

 

User creation and grants
create user strmadmin identified by strmadmin
  default tablespace stream
  quota unlimited on stream;

grant connect,resource,dba to strmadmin;

BEGIN
  DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE(
      grantee          => 'strmadmin',
      grant_privileges => true);
END;
/
 
Db link setup (so basic... I could omit this...)
10g, strmadmin>--create the database link from source to dest
CREATE DATABASE LINK AL12 CONNECT TO STRMADMIN IDENTIFIED BY STRMADMIN USING 'al12';
#test
select * from dual@AL12
11g, strmadmin>--create the database link from dest to source
CREATE DATABASE LINK DB10 CONNECT TO STRMADMIN IDENTIFIED BY STRMADMIN USING 'DB10';
#test
select * from dual@DB10

Supplemental logging

Source data - Setup supplemental logging and grant
#From 10.2 primary/unique key are logged by default, if no key then all columns are logged (see Asynchronous CDC Cookbook)
#always set suppl. logg. at very source db
#not recommended
alter database add supplemental log data (all) columns;
#recommended
ALTER TABLE test.sourceTable ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

if dont use the clause (ALL) you may get this depending on your application, due to missing columns data :
ORA-01400: cannot insert NULL into ("MTS_HDATA"."AGENTS"."PARTY_BIC")
ORA-06512: at "SYS.LCR$_ROW_RECORD", line 419
ORA-06512: at "MTS_APPLY.TABLE_HANDLERS", line 52
ORA-06512: at "MTS_APPLY.TABLE_HANDLERS", line 55
ORA-06512: at line 1

UNTESTED
You may also use DBMS_CAPTURE_ADM.prepare_table_instantiation(supplemental_logging=>'all')

 

Check supplemental logging
SELECT always, table_name, log_group_type FROM dba_log_groups order by table_name;

Queue

Queue
#Sharing the queue
#A single queue may be used for many tables

--Create the queue with STRMADMIN user
BEGIN
  DBMS_STREAMS_ADM.SET_UP_QUEUE(
    queue_table  => 'strmadmin.q_sourcetable',
    queue_name   => 'strmadmin.q_sourcetable',
    queue_user => 'STRMADMIN');
END;

#Untested, manual dequeue http://forums.oracle.com/forums/thread.jspa?threadID=611832&tstart=240
 
Queue troubleshooting
SELECT * FROM DBA_QUEUES where owner = 'STRMADMIN'

How to remove it?
begin
  DBMS_STREAMS_ADM.REMOVE_QUEUE(queue_name => 'STRMADMIN.STREAMS_QUEUE',
    cascade => true, drop_unused_queue_table => true);
end;

Any data sitting in the queue? UNTESTED
select * from streams_QUEUE_TABLE;
SELECT * FROM AQ$streams_queue_table;

Rules

Need to moodify rules?
#A single rule is more performing http://forums.oracle.com/forums/thread.jspa?messageID=1774910&#1774910
#See below for how to select the rule name.
#The rule may relate to any Streams process
#I recommend to use a single rule for capture, propagation and apply even for many tables,
#to accomplish this I normally create a real dummy rule for each process and then I alter the rule.
#Never rely then on DBA_STREAMS_RULES
--Valid example condition:
--'(:dml.get_object_owner() = ''MY_OWNER'' and (:dml.get_object_name() in (''TABLE1'', ''TABLE2'')))'
begin
  DBMS_RULE_ADM.ALTER_RULE(rule_name => 'MY_RULE', condition => '(((:dml.get_object_owner() = ''ALDO'' and :dml.get_object_name() = ''PROVA'')) and :dml.is_null_tag() = ''Y'' and :dml.get_source_database_name() = ''AL12'')'
    --evaluation_context IN VARCHAR2 DEFAULT NULL,--no need to change
    --remove_evaluation_context IN BOOLEAN DEFAULT false,
    --action_context IN SYS.RE$NV_LIST DEFAULT NULL,
    --remove_action_context IN BOOLEAN DEFAULT false,
    --rule_comment IN VARCHAR2 DEFAULT NULL,
    --remove_rule_comment IN BOOLEAN DEFAULT false
    );
end;

Need extra columns? (Since 10.2)
select * from DBA_STREAMS_ADD_COLUMN

#Usefull to create columns at Capture time
Detect the rule name as above. Column will be added when rule evaluates true. 

BEGIN 
  DBMS_STREAMS_ADM.ADD_COLUMN(
    rule_name    => 'MY_RULE',
    table_name   => 'ALDO.PROVA',
    column_name  => 'birth_date', 
    column_value => ANYDATA.ConvertDate(NULL),
    value_type   => 'NEW',
    step_number  => 0,
    operation    => 'ADD');
END;

#To remove this rule use:
BEGIN 
  DBMS_STREAMS_ADM.ADD_COLUMN (
    rule_name    => 'MY_RULE',
    table_name   => 'ALDO.PROVA',
    column_name  => 'birth_date', 
    column_value => ANYDATA.ConvertDate(NULL),
    value_type   => 'NEW',
    step_number  => 0,
    operation => 'REMOVE');
END;

Need a customized function? 
See http://docs.huihoo.com/oracle/docs/B19306_01/server.102/b14229/strms_mtransform.htm#i1006552,
http://forums.oracle.com/forums/thread.jspa?messageID=1421560
Caution deterministic function needed... Function is called outside originated transaction context
Sometime people ignore that a deterministic function cache results and my be called by system from outside any context
See for example this thread http://forums.oracle.com/forums/thread.jspa?messageID=45218

Remove a generic rule from DBA_STREAMS_RULES
begin
  DBMS_STREAMS_ADM.REMOVE_RULE(rule_name=>'BANK_CHARGES_VOL_BREAKS290', streams_type=>'APPLY', 
    streams_name=>'APPLY_MTS', drop_unused_rule=>TRUE, inclusion_rule=>TRUE);
end;

 

Using the DBMS_STREAMS.ADD_TABLE_RULES procedure
--Creates capture, propagations and apply.
--Here is how to use to have a single rule

declare
  mdml_rule_name VARCHAR2(200);
  mddl_rule_name VARCHAR2(200);
  m_condition varchar2(32000) := '(:dml.get_object_owner() = ''ALDO'' and (:dml.get_object_name() in (''PERSONE'', ''INDIRIZZI'')))';
begin
  --creates a dummy rule
  DBMS_STREAMS_ADM.ADD_TABLE_RULES(
    table_name     => 'ALDO.PALLO',
    --APPLY or CAPTURE
    streams_type   => 'apply',
    streams_name   => 'APPLY_MTS' ,
    queue_name     => 'STRMADMIN.Q_MTS',
    include_dml    =>  true,
    include_ddl    =>  false,
    inclusion_rule =>  true,
    dml_rule_name  =>  mdml_rule_name,
    ddl_rule_name  =>  mddl_rule_name,
    source_database => '&2');
  --alter capture
  DBMS_RULE_ADM.ALTER_RULE(rule_name => mdml_rule_name, condition => m_condition);
end;
/

Capture

Capture
#Untested - adding rules to capture process http://forums.oracle.com/forums/thread.jspa?threadID=656049&tstart=75
#Sharing the capture process
#You may use a single capture process for more than one table
#Use the name of an existing capture to add a new table

begin
  --Must run at least once on SOURCE db
  DBMS_CAPTURE_ADM.BUILD;
end;
/

--ADD the capture rules using the DBMS_STREAMS.ADD_TABLE_RULES procedure, see above

--The user changing source data will be available to the Streams
declare 
  capture_name varchar2(200) := 'capture_test';
BEGIN
  DBMS_CAPTURE_ADM.INCLUDE_EXTRA_ATTRIBUTE(capture_name => capture_name, attribute_name => 'username', include => true);
  DBMS_CAPTURE_ADM.INCLUDE_EXTRA_ATTRIBUTE(capture_name => capture_name, attribute_name => 'row_id', include => true);
  DBMS_CAPTURE_ADM.INCLUDE_EXTRA_ATTRIBUTE(capture_name => capture_name, attribute_name => 'serial#', include => true);
  DBMS_CAPTURE_ADM.INCLUDE_EXTRA_ATTRIBUTE(capture_name => capture_name, attribute_name => 'session#', include => true);
  DBMS_CAPTURE_ADM.INCLUDE_EXTRA_ATTRIBUTE(capture_name => capture_name, attribute_name => 'thread#', include => true);
  DBMS_CAPTURE_ADM.INCLUDE_EXTRA_ATTRIBUTE(capture_name => capture_name, attribute_name => 'tx_name', include => true);
END;

begin
  dbms_capture_adm.start_capture(capture_name => 'capture_test');
end;

Remove capture rules
select 'execute DBMS_STREAMS_ADM.REMOVE_RULE(rule_name=>''' || rule_name || ''', streams_type=>''' || streams_type
  || ''', streams_name=>''' || streams_name || ''', drop_unused_rule=>TRUE, inclusion_rule=>TRUE);' 
  from dba_streams_rules
where streams_type = 'CAPTURE'
 
Capture troubleshooting
--View capture status
SELECT c.capture_name, SUBSTR (s.program, INSTR (s.program, '(') + 1, 4) process_name, c.SID,
       c.serial#, c.state, c.total_messages_captured,
       c.total_messages_enqueued, c.enqueue_time last_enqueue, sysdate
FROM v$streams_capture c, v$session s
WHERE c.SID = s.SID AND c.serial# = s.serial#;

Capture rules
--Do not rely on DBA_STREAMS_TABLE_RULES, if you uncleanely dropped 
--STRMADMIN user or something, old unused rules will be there
select DBA_RULES.* from dba_capture, DBA_RULE_SET_RULES, DBA_RULES 
where DBA_RULE_SET_RULES.rule_set_name (+)= dba_capture.rule_set_name
and DBA_RULES.rule_name (+)= DBA_RULE_SET_RULES.rule_name

#Create this function to have a clear idea of the SCN order
CREATE OR REPLACE function sortScn(startScn number, capturedScn number, 
  appliedScn number, firstScn number, sourceResetlogsScn number, 
  maxCheckpointScn number, requiredCheckpointScn number, lastEnqueuedScn number) 
  return varchar2 is
  type tRec is record(name varchar2(50), value number);
  r tRec;
  TYPE nestTable IS TABLE OF tRec;
  n nestTable;
  tmpvalue tRec;
  i number;
  result varchar2(1000) := '';
begin
  n := nestTable();
  --Add start SCN
  r.name := 'Start SCN                '; r.value := startScn;
  n.extend(); n(n.count()) := r;
  --Add captured SCN
  r.name := 'Captured SCN             '; r.value := capturedScn;
  n.extend(); n(n.count()) := r;
  --Add applied SCN
  r.name := 'Applied SCN              '; r.value := appliedScn;
  n.extend(); n(n.count()) := r;
  --Add first SCN
  r.name := 'First SCN                '; r.value := firstScn;
  n.extend(); n(n.count()) := r;
  --Add source resetlogs SCN
  r.name := 'Source resetlogs SCN     '; r.value := sourceResetlogsScn;
  n.extend(); n(n.count()) := r;
  --Add start SCN
  r.name := 'Max Checkpoint SCN       '; r.value := maxCheckpointScn;
  n.extend(); n(n.count()) := r;
  --Add required checkpoint SCN
  r.name := 'Required checkpoint SCN  '; r.value := requiredCheckpointScn;
  n.extend(); n(n.count()) := r;
  --Add last enqueued SCN SCN
  r.name := 'Last enqueued SCN        '; r.value := lastEnqueuedScn;
  n.extend(); n(n.count()) := r;
  --Add current SCN
  r.name := 'Current SCN              '; r.value := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
  n.extend(); n(n.count()) := r;
  begin
    --Sort 
    i := n.first;
    while(i<n.last-1) loop
      --DBMS_OUTPUT.PUT_LINE(i);
      --for i in n.first..n.last-1 loop
      if(n(i).value>n(i+1).value) then
        --DBMS_OUTPUT.PUT_LINE('found');
        tmpvalue := n(i);
        n(i) := n(i+1);
        n(i+1) := tmpvalue;
        i := n.first;
      else i := i+1;
      end if;
    end loop;
  end;
  for i in n.first..n.last loop
    result := result || (n(i).name || ' ' || n(i).value);
    if(i<n.last) then
      result := result || chr(13) || chr(10);
    end if;
  end loop;
  return result;
end;

#Query DBA_STREAMS_RULES to lookup RULE_SET_NAME
select c.*, sortScn(start_scn, captured_Scn, applied_Scn, first_Scn, 
  source_Resetlogs_Scn, max_Checkpoint_Scn, required_Checkpoint_Scn, 
  last_Enqueued_Scn) scn_order from dba_capture c

#Normally the result of the scn_order column is:
Source resetlogs SCN      33636153271
Start SCN (or First SCN)  33636310564
First SCN (or Start SCN)  33636310564
Required checkpoint SCN   33636697161
Captured SCN              33636770454
Applied SCN               33636770454
Max Checkpoint SCN        33636770454
Last enqueued SCN         33636782303
Current SCN               33636783363

#Is there a capture history?
select * from DBA_HIST_STREAMS_CAPTURE

#Tracing capture - UNTESTED!
http://forums.oracle.com/forums/thread.jspa?threadID=687265&tstart=30

Propagation

Propagation
#Sharing the propagation
#A single propagation process may be used for many tables

#First create a queue on destination(see above)

declare
  mdml_rule_name VARCHAR2(200);
  mddl_rule_name VARCHAR2(200);
  m_condition varchar2(32000) := '(:dml.get_object_owner() = ''ALDO'' and (:dml.get_object_name() in (''PERSONE'', ''INDIRIZZI'')))';
BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(table_name => 'MTS_OWNER.BANK_CHARGES', 
    streams_name => 'PROPAGATION_MTS', 
    source_queue_name => 'strmadmin.Q_MTS', 
    destination_queue_name => 'STRMADMIN.Q_MTS@SMTAA.ST', 
    include_dml => true, 
    include_ddl => false, 
    dml_rule_name  =>  mdml_rule_name,
    ddl_rule_name  =>  mddl_rule_name,
    source_database => 'SMTGA.ST');
  DBMS_RULE_ADM.ALTER_RULE(rule_name => mdml_rule_name, condition => m_condition);
END;
/

 

Propagation
select * from dba_propagation where propagation_name = 'PROPAGATION_TEST'

Propagation rules
select DBA_RULES.* from dba_propagation, DBA_RULE_SET_RULES, DBA_RULES 
where DBA_RULE_SET_RULES.rule_set_name (+)= dba_propagation.rule_set_name
and DBA_RULES.rule_name (+)= DBA_RULE_SET_RULES.rule_name

BEGIN
  DBMS_PROPAGATION_ADM.STOP_PROPAGATION(
    propagation_name => 'prop1',
    --clean statistics
    force => true);
END;
/

BEGIN
DBMS_PROPAGATION_ADM.DROP_PROPAGATION(
  propagation_name => 'PROPAGATION_MTS',
  drop_unused_rule_sets => TRUE);
END;
/

BEGIN
  DBMS_PROPAGATION_ADM.START_PROPAGATION(
    propagation_name => 'prop1');
END;

 

Instantiation number
        
Concept:
The instantiation informations are recorded in the destination database     
     
#Now we are going to start the apply process but the scn must be aligned with source
#the dblink is the destination of the apply process
#you only use the source table name here even if the destination table is different

create or replace procedure prepareInstantiation(source_table_name varchar2) is
  source_scn  NUMBER;
  --This is the global_name of the source
  source_db_name varchar2(100) := 'DMTGA.DE';
BEGIN
  select global_name into source_db_name from global_name;
  source_scn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
  --use the dblink if you are propagating to a remote queue
  DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@DMTAA(
    source_object_name      => source_table_name,
    source_database_name    => source_db_name,
    instantiation_scn       => source_scn);
  dbms_output.put_line(source_scn);
  --on source database
  dbms_capture_adm.prepare_table_instantiation(source_table_name, supplemental_logging=>'all');
END;

begin
  prepareInstantiation('TEST.SOURCETABLE');
end;

#What does this do?
select first_scn from dba_capture where capture_name = 'CAPTURE_TEST'

#Query DBA_STREAMS_RULES to lookup RULE_SET_NAME
select decode(sign(difference_scn), -1, '!!CHANGES NOT BEING APPLIED, LOGS LOOSING!!', 'Normal') status,
b.* from (
  select curr_scn - instantiation_scn difference_scn, a.* from (
    select DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER() curr_scn, i.* 
    from DBA_APPLY_INSTANTIATED_OBJECTS@al12 i
  ) a
) b

Apply

Apply
#Sharing the apply
#For every apply process the apply order will be mantained, 
#you'll likely use a single apply process for a schema
        
--here we create the apply process on destination
--ADD the capture rules using the DBMS_STREAMS.ADD_TABLE_RULES procedure, see above

begin 
  --make your reflections on this value, see DBA_APPLY_PARAMETERS for existing values
  DBMS_APPLY_ADM.SET_PARAMETER(apply_name => 'APPLY_TEST',parameter  => 'disable_on_error', value => 'y');
end;

BEGIN
  DBMS_APPLY_ADM.START_APPLY(apply_name => 'apply_test');
END;

 

Apply troubleshooting

Apply Parameters
select * from DBA_APPLY_PARAMETERS

Apply rules
select DBA_RULES.* from dba_apply, DBA_RULE_SET_RULES, DBA_RULES 
where DBA_RULE_SET_RULES.rule_set_name (+)= dba_apply.rule_set_name
and DBA_RULES.rule_name (+)= DBA_RULE_SET_RULES.rule_name

Apply status
SELECT dba_apply.*, sysdate FROM DBA_apply where apply_name = 'APPLY_TEST'
select * from dba_rules where upper(rule_condition) like upper('%TESTONA%')

How to delete it?
execute DBMS_APPLY_ADM.STOP_APPLY(apply_name  => 'apply_test');
execute DBMS_APPLY_ADM.DROP_APPLY(apply_name => 'apply_test');

BEGIN
  DBMS_APPLY_ADM.START_APPLY(apply_name => 'apply_test');
END;

Errors in apply? Is the apply process running? Remember to re-execute pending errors before restarting the apply
--First check sequence
select * from dba_apply
select * from dba_apply_error
execute DBMS_APPLY_ADM.EXECUTE_All_ERRORS;
select * from dba_apply_error
execute DBMS_APPLY_ADM.START_APPLY(apply_name => 'APPLY_MTS');
select * from DBA_APPLY_PROGRESS

#Check the handler exists
SELECT * FROM DBA_APPLY_DML_HANDLERS WHERE APPLY_DATABASE_LINK IS NULL ORDER BY OBJECT_OWNER, OBJECT_NAME;

#What is on alert log if the apply stop due to an exception?
Wed Aug 27 01:01:45 2008
Streams Apply Reader AS05 for APPLY_TEST2 with pid=38 OS id=12691 stopped

#You also find a trace in bdump in the form al12_as02_10503.trc <db>_as##_####.trc
#You may resubmit the failed apply using:
begin
 DBMS_APPLY_ADM.EXECUTE_ERROR(local_transaction_id => '24.21.1447');
end;
#or
execute DBMS_APPLY_ADM.EXECUTE_All_ERRORS;

#The apply process stopped for a full tbs, now the tbs is ok how many messages are now to be dequeued?
SELECT s.SUBSCRIBER_NAME, q.QUEUE_SCHEMA, q.QUEUE_NAME, s.LAST_DEQUEUED_SEQ,
       s.NUM_MSGS, s.TOTAL_SPILLED_MSG
FROM V$BUFFERED_QUEUES q, V$BUFFERED_SUBSCRIBERS s, DBA_APPLY a
WHERE q.QUEUE_ID = s.QUEUE_ID AND s.SUBSCRIBER_ADDRESS IS NULL AND s.SUBSCRIBER_NAME = a.APPLY_NAME;
subscriber_name queue_schema queue_name last_dequeued_seq num_msgs total_spilled_msg
APPLY_TEST2 STRMADMIN STREAMS_QUEUE 183 43 50
APPLY_TEST STRMADMIN STREAMS_QUEUE 189 0 0
#Tere are 43 messages to dequeue #If the apply process as stopped due to an exception #YOU MUST EXECUTE ERRORS BEFORE STARTING IT AGAIN #the error queue must be be empty oterwhise new changes are applied and lead to incinsistency #Data on destination has been manually repaired, how to clear to error queue? BEGIN --CAUTION!!! Any pending LCR will be deleted delete from SYS.apply$_error; commit; execute immediate('alter system flush shared_pool'); DBMS_APPLY_ADM.START_APPLY(apply_name => 'APPLY_TEST'); END; #Is there an apply history(10g)? select * from DBA_HIST_STREAMS_APPLY_SUM

 

DML Handler
grant all on sys.dbms_system to test;

SYS.LCR$_ROW_RECORD.COMMAND_TYPE = INSERT(NEW values only)
SYS.LCR$_ROW_RECORD.COMMAND_TYPE = UPDATE(NEW and OLD values)
SYS.LCR$_ROW_RECORD.COMMAND_TYPE = DELETE(OLD values only)

CREATE OR REPLACE PROCEDURE TEST.aldo_prova_handl(in_any IN ANYDATA) IS
  lcr          SYS.LCR$_ROW_RECORD;
  rc           PLS_INTEGER;
  command      VARCHAR2(30);
  old_values   SYS.LCR$_ROW_LIST;
  mID          SYS.AnyData;
  destOwner    varchar2(50) := 'TEST';
  destTable    varchar2(50) := 'DESTTABLE';
BEGIN
  rc := in_any.GETOBJECT(lcr);
  command:=lcr.GET_COMMAND_TYPE();
  sys.dbms_system.ksdwrt(2,command);
  --We always insert since this table keeps track of historical data 
  lcr.SET_COMMAND_TYPE('INSERT');
  lcr.set_object_owner(destOwner);
  lcr.SET_OBJECT_NAME(destTable);
  IF command IN ('UPDATE') THEN
    mID := lcr.get_value('old', 'ID');
    lcr.add_column('new', 'ID',  mID);
    lcr.add_column('new', 'NOME_OLD',  lcr.get_value('old', 'nome'));
    lcr.SET_VALUES('old', NULL);
  elsif command in ('DELETE') then
    --Get the old values in the row LCR
    old_values := lcr.GET_VALUES('old');
    lcr.SET_VALUES('new', old_values);
    lcr.SET_VALUES('old', NULL);
  END IF;
  lcr.ADD_COLUMN('new', 'USERNAME_LOG', lcr.GET_EXTRA_ATTRIBUTE('USERNAME') );
  lcr.ADD_COLUMN('new', 'ACTION', ANYDATA.ConvertVarChar2(command));
  lcr.EXECUTE(true);
  --Do not execute COMMIT or ROLLBACK statements. Doing so may endanger the
  --consistency of the transaction that contains the LCR.
exception
  when others then
  begin
    sys.dbms_system.ksdwrt(3, SQLERRM );
    sys.dbms_system.ksdwrt(3, DBMS_UTILITY.FORMAT_ERROR_BACKTRACE );
    raise;
  end;
end;
/

grant all on TEST.aldo_prova_handl to strmadmin;

--Object name is the source table, even if the destination table is different because you use a 
--Remember that the relative error handler is removed if exists
CREATE OR REPLACE procedure STRMADMIN.prepareDmlHandler(mOwner varchar2, mTableName varchar2, ownProcName varchar2) is
  applyName varchar2(50) := 'APPLY_TEST';
begin
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name         => mOwner||'.'||mTableName,
    object_type         => 'TABLE',
    operation_name      => 'INSERT',
    error_handler       => false,
    user_procedure      => ownProcName,
    apply_database_link => NULL,
    apply_name          => applyName);
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name         => mOwner||'.'||mTableName,
    object_type         => 'TABLE',
    operation_name      => 'UPDATE',
    error_handler       => false,
    user_procedure      => ownProcName,
    apply_database_link => NULL,
    apply_name          => applyName);
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name         => mOwner||'.'||mTableName,
    object_type         => 'TABLE',
    operation_name      => 'DELETE',
    error_handler       => false,
    user_procedure      => ownProcName,
    apply_database_link => NULL,
    apply_name          => applyName);
END;
/

begin
  prepareDmlHandler('TEST', 'SOURCETABLE', 'TEST.aldo_prova_handl');
end;
#Remember that you may also have operation_name=>'LOB_UPDATE'

#What does this do?
SELECT * FROM DBA_APPLY_DML_HANDLERS

 

Error Handler
#Using an error handler discards the normal dml handler and changes are automatically applied

--grant select, references on DBA_APPLY_ERROR to test;
create or replace PROCEDURE TEST.aldo_prova_handl_err(in_any IN ANYDATA, error_record IN DBA_APPLY_ERROR%ROWTYPE,
  error_message_number IN NUMBER, messaging_default_processing IN OUT BOOLEAN, out_any OUT ANYDATA) is
  LCR sys.lcr$_row_record;
  RC pls_integer;
begin
  sys.dbms_system.ksdwrt(3, 'Processing error');
  messaging_default_processing:=TRUE;
  out_any:=in_any;
  sys.dbms_system.ksdwrt(3, 'Streams error due to ORA-' || error_record.ERROR_NUMBER);
  /*if error_record.ERROR_NUMBER=1403 then
    rc:=in_any.getobject(lcr); 
    lcr.set_values('NEW',lcr.get_values('OLD'));
    lcr.set_values('OLD',NULL);
    lcr.set_command_type('INSERT');
    out_any := ANYDATA.ConvertObject(lcr);
  else;*/
exception
  when others then
  begin
    sys.dbms_system.ksdwrt(3, 'Streams error due to ORA-' || error_record.ERROR_NUMBER);
    sys.dbms_system.ksdwrt(3, SQLERRM );
    sys.dbms_system.ksdwrt(3, DBMS_UTILITY.FORMAT_ERROR_BACKTRACE );
    raise;
  end;
end;

BEGIN
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name => 'TEST.SOURCETABLE',
    object_type => 'TABLE',
    operation_name => 'INSERT',
    error_handler => true,
    user_procedure => 'test.aldo_prova_handl_err',
    --user_procedure => null,
    apply_database_link => NULL,
    apply_name => 'APPLY_TEST');
END;
/

BEGIN
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name => 'TEST.SOURCETABLE',
    object_type => 'TABLE',
    operation_name => 'UPDATE',
    error_handler => true,
    user_procedure => 'test.aldo_prova_handl_err',
    --user_procedure => null,
    apply_database_link => NULL,
    apply_name => 'APPLY_TEST');
END;
/

BEGIN
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name => 'TEST.SOURCETABLE',
    object_type => 'TABLE',
    operation_name => 'DELETE',
    error_handler => true,
    user_procedure => 'test.aldo_prova_handl_err',
    --user_procedure => null,
    apply_database_link => NULL,
    apply_name => 'APPLY_TEST');
END;
/

#Remember that you may also have operation_name=>'LOB_UPDATE'

 

DDL Handler
CREATE OR REPLACE PROCEDURE TEST.apply_ddl_handler(in_any IN ANYDATA) IS
  lcr SYS.LCR$_DDL_RECORD;
  rc PLS_INTEGER;
  ddl_text CLOB;
BEGIN
  rc := in_any.GETOBJECT(lcr);
  DBMS_LOB.CREATETEMPORARY(ddl_text, true);
  lcr.GET_DDL_TEXT(ddl_text);
  sys.dbms_system.ksdwrt(2, 'DDL text:' || ddl_text);
  DBMS_LOB.FREETEMPORARY(ddl_text);
  lcr.EXECUTE();
exception
  when others then
  begin
    sys.dbms_system.ksdwrt(2, SQLERRM );
    sys.dbms_system.ksdwrt(2,DBMS_UTILITY.FORMAT_ERROR_STACK  );
    sys.dbms_system.ksdwrt(2, DBMS_UTILITY.FORMAT_ERROR_BACKTRACE );
    raise;
  end;
END;
/

#You need this specific grant
grant all on test.apply_ddl_handler to strmadmin;

BEGIN
DBMS_APPLY_ADM.ALTER_APPLY(apply_name  => 'apply_test', ddl_handler => 'test.apply_ddl_handler');
END;

Print LCR procedures

My custom function Anydata to Varchar2
    
CREATE OR REPLACE function anydataToVarchar2(data IN SYS.AnyData) return varchar2 IS
  tn  VARCHAR2(61);
  str VARCHAR2(4000);
  chr VARCHAR2(1000);
  num NUMBER;
  dat DATE;
  rw  RAW(4000);
  res NUMBER;
  result varchar2(4000);
BEGIN
  IF data IS NULL THEN
    result := 'null';
  END IF;
  tn := data.GETTYPENAME();
  IF tn = 'SYS.VARCHAR2' THEN
    res := data.GETVARCHAR2(str);
    result := (SUBSTR(str,0,253));
  ELSIF tn = 'SYS.CHAR' then
    res := data.GETCHAR(chr);
    result := (SUBSTR(chr,0,253));
  ELSIF tn = 'SYS.VARCHAR' THEN
    res := data.GETVARCHAR(chr);
    result := chr;
  ELSIF tn = 'SYS.NUMBER' THEN
    res := data.GETNUMBER(num);
    result := num;
  ELSIF tn = 'SYS.DATE' THEN
    res := data.GETDATE(dat);
    result := dat;
  ELSIF tn = 'SYS.RAW' THEN
    -- res := data.GETRAW(rw);
    -- DBMS_OUTPUT.PUT_LINE(SUBSTR(DBMS_LOB.SUBSTR(rw),0,253));
    result := 'BLOB Value';
  ELSIF tn = 'SYS.BLOB' THEN
    result := 'BLOB Found';
  ELSE
    result := 'typename is ' || tn;
  END IF;
  return result;
END;

 

Print LCR utilities
grant all on sys.dbms_lob to strmadmin;

-- procedure that prints the value in a SYS.AnyData Object
CREATE OR REPLACE PROCEDURE STRMADMIN.print_any(data IN SYS.AnyData) IS
  tn  VARCHAR2(61);
  str VARCHAR2(4000);
  chr VARCHAR2(1000);
  num NUMBER;
  dat DATE;
  rw  RAW(4000);
  res NUMBER;
BEGIN
  IF data IS NULL THEN
    DBMS_OUTPUT.PUT_LINE('NULL value');
    RETURN;
  END IF;
  tn := data.GETTYPENAME();
  IF tn = 'SYS.VARCHAR2' THEN
    res := data.GETVARCHAR2(str);
    DBMS_OUTPUT.PUT_LINE(SUBSTR(str,0,253));
  ELSIF tn = 'SYS.CHAR' then
    res := data.GETCHAR(chr);
    DBMS_OUTPUT.PUT_LINE(SUBSTR(chr,0,253));
  ELSIF tn = 'SYS.VARCHAR' THEN
    res := data.GETVARCHAR(chr);
    DBMS_OUTPUT.PUT_LINE(chr);
  ELSIF tn = 'SYS.NUMBER' THEN
    res := data.GETNUMBER(num);
    DBMS_OUTPUT.PUT_LINE(num);
  ELSIF tn = 'SYS.DATE' THEN
    res := data.GETDATE(dat);
    DBMS_OUTPUT.PUT_LINE(dat);
  ELSIF tn = 'SYS.RAW' THEN
    -- res := data.GETRAW(rw);
    -- DBMS_OUTPUT.PUT_LINE(SUBSTR(DBMS_LOB.SUBSTR(rw),0,253));
    DBMS_OUTPUT.PUT_LINE('BLOB Value');
  ELSIF tn = 'SYS.BLOB' THEN
    DBMS_OUTPUT.PUT_LINE('BLOB Found');
  ELSE
    DBMS_OUTPUT.PUT_LINE('typename is ' || tn);
  END IF;
END print_any;
/


-- procedure that prints a specified LCR
CREATE OR REPLACE PROCEDURE STRMADMIN.print_lcr(lcr IN SYS.ANYDATA) IS
  typenm    VARCHAR2(61);
  ddllcr    SYS.LCR$_DDL_RECORD;
  proclcr   SYS.LCR$_PROCEDURE_RECORD;
  rowlcr    SYS.LCR$_ROW_RECORD;
  res       NUMBER;
  newlist   SYS.LCR$_ROW_LIST;
  oldlist   SYS.LCR$_ROW_LIST;
  ddl_text  CLOB;
  ext_attr  SYS.AnyData;
BEGIN
  typenm := lcr.GETTYPENAME();
  DBMS_OUTPUT.PUT_LINE('type name: ' || typenm);
  IF (typenm = 'SYS.LCR$_DDL_RECORD') THEN
    res := lcr.GETOBJECT(ddllcr);
    DBMS_OUTPUT.PUT_LINE('source database: ' || 
                         ddllcr.GET_SOURCE_DATABASE_NAME);
    DBMS_OUTPUT.PUT_LINE('owner: ' || ddllcr.GET_OBJECT_OWNER);
    DBMS_OUTPUT.PUT_LINE('object name: ' || ddllcr.GET_OBJECT_NAME);
    DBMS_OUTPUT.PUT_LINE('object type: ' || ddllcr.GET_OBJECT_TYPE);
    DBMS_OUTPUT.PUT_LINE('schema: ' || ddllcr.GET_CURRENT_SCHEMA);
    DBMS_OUTPUT.PUT_LINE('transaction id: ' || ddllcr.GET_TRANSACTION_ID);
    DBMS_OUTPUT.PUT_LINE('SCN: ' || ddllcr.GET_SCN);
    DBMS_LOB.CREATETEMPORARY(ddl_text, true);
    ddllcr.GET_DDL_TEXT(ddl_text);
    DBMS_OUTPUT.PUT_LINE('ddl: ' || ddl_text);    
  ELSIF (typenm = 'SYS.LCR$_ROW_RECORD') THEN
    res := lcr.GETOBJECT(rowlcr);
    DBMS_OUTPUT.PUT_LINE('source database: ' || 
                         rowlcr.GET_SOURCE_DATABASE_NAME);
    DBMS_OUTPUT.PUT_LINE('owner: ' || rowlcr.GET_OBJECT_OWNER);
    DBMS_OUTPUT.PUT_LINE('object: ' || rowlcr.GET_OBJECT_NAME);
    --DBMS_OUTPUT.PUT_LINE('object type: ' || ddllcr.GET_OBJECT_TYPE);
    --DBMS_OUTPUT.PUT_LINE('schema: ' || ddllcr.GET_CURRENT_SCHEMA);
    --DBMS_OUTPUT.PUT_LINE('transaction id: ' || ddllcr.GET_TRANSACTION_ID);
    --DBMS_OUTPUT.PUT_LINE('SCN: ' || ddllcr.GET_SCN);
    DBMS_OUTPUT.PUT_LINE('command_type: ' || rowlcr.GET_COMMAND_TYPE); 
    oldlist := rowlcr.GET_VALUES('old');
    FOR i IN 1..oldlist.COUNT LOOP
      IF oldlist(i) IS NOT NULL THEN
        DBMS_OUTPUT.PUT_LINE('old(' || i || '): ' || oldlist(i).column_name);
        print_any(oldlist(i).data);
      END IF;
    END LOOP;
    newlist := rowlcr.GET_VALUES('new', 'n');
    FOR i in 1..newlist.count LOOP
      IF newlist(i) IS NOT NULL THEN
        DBMS_OUTPUT.PUT_LINE('new(' || i || '): ' || newlist(i).column_name);
        print_any(newlist(i).data);
      END IF;
    END LOOP;         
  ELSE
    DBMS_OUTPUT.PUT_LINE('Non-LCR Message with type ' || typenm);
  END IF;
END print_lcr;
/


-- procedure that prints all the LCRs in the Error queue
CREATE OR REPLACE PROCEDURE STRMADMIN.print_errors IS
  CURSOR c IS
    SELECT LOCAL_TRANSACTION_ID,
           SOURCE_DATABASE,
           MESSAGE_NUMBER,
           MESSAGE_COUNT,
           ERROR_NUMBER,
           ERROR_MESSAGE
      FROM DBA_APPLY_ERROR
      ORDER BY SOURCE_DATABASE, SOURCE_COMMIT_SCN;
  i      NUMBER;
  txnid  VARCHAR2(30);
  source VARCHAR2(128);
  msgno  NUMBER;
  msgcnt NUMBER;
  errnum NUMBER := 0;
  errno  NUMBER;
  errmsg VARCHAR2(255);
  lcr    SYS.AnyData;
  r      NUMBER;
BEGIN
  FOR r IN c LOOP
    errnum := errnum + 1;
    msgcnt := r.MESSAGE_COUNT;
    txnid  := r.LOCAL_TRANSACTION_ID;
    source := r.SOURCE_DATABASE;
    msgno  := r.MESSAGE_NUMBER;
    errno  := r.ERROR_NUMBER;
    errmsg := r.ERROR_MESSAGE;
    DBMS_OUTPUT.PUT_LINE('*************************************************');
    DBMS_OUTPUT.PUT_LINE('----- ERROR #' || errnum);
    DBMS_OUTPUT.PUT_LINE('----- Local Transaction ID: ' || txnid);
    DBMS_OUTPUT.PUT_LINE('----- Source Database: ' || source);
    DBMS_OUTPUT.PUT_LINE('----Error in Message: '|| msgno);
    DBMS_OUTPUT.PUT_LINE('----Error Number: '||errno);
    DBMS_OUTPUT.PUT_LINE('----Message Text: '||errmsg);
    FOR i IN 1..msgcnt LOOP
      DBMS_OUTPUT.PUT_LINE('--message: ' || i);
        lcr := DBMS_APPLY_ADM.GET_ERROR_MESSAGE(i, txnid);
        print_lcr(lcr);
    END LOOP;
  END LOOP;
END print_errors;
/

 

Remove/Uninstall Streams
#10g
begin
  DBMS_STREAMS_ADM.REMOVE_STREAMS_CONFIGURATION;
end;

 

Views, monitoring and troubleshooting
    select capture_name, queue_name, ERROR_NUMBER, ERROR_MESSAGE 
from dba_capture where status != 'ENABLED'  
  
          select * FROM DBA_CAPTURE;
          select * from dba_propagation;
               
          SELECT r.CONSUMER_NAME,
               r.SOURCE_DATABASE,
               r.SEQUENCE#,
               r.NAME,
               r.DICTIONARY_BEGIN,
               r.DICTIONARY_END
          FROM DBA_REGISTERED_ARCHIVED_LOG r, DBA_CAPTURE c
          WHERE r.CONSUMER_NAME = c.CAPTURE_NAME;
         
          #May safely remove these logs
          SELECT * FROM DBA_LOGMNR_PURGED_LOG;
         
          SELECT * FROM DBA_CAPTURE_PARAMETERS;
          SELECT * FROM DBA_CAPTURE_EXTRA_ATTRIBUTES
         
          SELECT * FROM DBA_QUEUE_TABLES order by owner, queue_table
          SELECT * FROM DBA_QUEUES order by owner
         
          SELECT * FROM DBA_APPLY_DML_HANDLERS

          select * from DBA_QUEUE_SCHEDULES
          select * from DBA_STREAMS_COLUMNS;
          select * from DBA_STREAMS_ADMINISTRATOR;
          select * from DBA_STREAMS_RULES;
          select * from DBA_STREAMS_TABLE_RULES;
          select * from SYS.DBA_STREAMS_UNSUPPORTED;

 

Untested
Remove stream Metalink note 276648.1
Multi version data dictionary refer to Metalink note 212044.1

 

Troubleshooting NO DATA FOUND in apply for update/delete
    
ORA-01403: no data found
ORA-01403: no data found
ORA-06512: at "SYS.LCR$_ROW_RECORD", line 419
ORA-06512: at "MTS_APPLY.TABLE_HANDLERS", line 32
ORA-06512: at line 1

Remember that primary key columns on destination are always evaluated.
You'll get your job very easy using this query:

select tc.owner, tc.table_name, tc.column_name,
a.COMPARE_OLD_ON_DELETE, a.COMPARE_OLD_ON_UPDATE, decode(k.column_name, null, 'N', 'Y') manual_key_column
from dba_tab_columns tc, DBA_APPLY_TABLE_COLUMNS a, dba_apply_key_columns k
where 
a.OBJECT_OWNER (+)= tc.owner and a.OBJECT_NAME (+)= tc.TABLE_NAME and a.COLUMN_NAME (+)= tc.COLUMN_NAME
and k.OBJECT_OWNER (+)= tc.owner and k.OBJECT_NAME (+)= tc.TABLE_NAME and k.COLUMN_NAME (+)= tc.COLUMN_NAME
and owner='MTS_OWNER'
and table_name='AGENTS'

#set pk manually, use null on column_list to reset
execute DBMS_APPLY_ADM.SET_KEY_COLUMNS(object_name => 'MTS_OWNER.AGENTS', column_list => 'ID,NAME');
select * from dba_apply_key_columns

#set old values to be compared, default is true for all columns
execute DBMS_APPLY_ADM.COMPARE_OLD_VALUES(object_name => 'MTS_OWNER.AGENTS', column_list => '*', operation => '*', compare => false);
select * from DBA_APPLY_TABLE_COLUMNS order by 1, 2, 3

 

On the job - Project duration estimation
    
Functional - Analysis
  Determine tables needed for replication
  Measure the global volume(rows and size) of source tables to be streamed
  Considerations on global volume for source tables for initial alignment(CTAS, RMAN, TTS, Datapump)
  Determine apply rules(overview only)
Administration - Environment Setup
  Verify Streams requiremements on databases and tablespace creation for STRMADMIN and functional user
  Test network connection between nodes and detect lag
  Creation of STRMADMIN and setup of empty Streams environment
  Dummy Streams test to certify the correct environment
  Creation of functional user with limited grants
Administration - Complete Streams setup for all tables in log mode(no physical apply on destination)
  Queue creations on source and destination
  Capture processes creations
  Propagation processes creations
  Functional apply procedures(DML and DDL) creation on functional user(log only at the moment)
  Apply processes creation on destination
  Test the Streams environment in log mode
Functional - Development
  Data population and alignment of destination database
  Development of apply procedures
    DML handlers
    DDL handlers
  Functional documentation
  Functional tests
Administration - Crash tests simulation
  Short crash simulation - failed Streams applies may still be dequed
    Reapply failed data
    Estimate downtime for source and/or destination based on the number of failed apply to recover
    Considerations and plan preparation for a production short crash(stop applications, courtesy web page, ...)
  Complete crash simulation
    Recreation of Streams administration schema
    Data population and alignment of destination database(truncation of existing data)
    Considerations and plan preparation for a production complete crash(stop applications, courtesy web page, ...)
Administration - Monitoring, Alarming and Operation
  Get a consistent view of actual procedures of Monitoring, Alarming, Operation and Production Support Team
  Creation of simplified monitoring views for Production Support Team for quick and detailed diagnostic
  Integrate with the current alarming process based on unapplied Stream data threshold
  Supply the Production Support Team a clear step-by-step operational documentation for monitoring, short crash and complete recovery