See Advanced Queue
Troubleshooting http://www.acs.ilstu.edu/docs/Oracle/server.101/b10727/strms_tr.htm#1006156
See note 290605.1 to configure strmmon
My Forum, see note 265933.1 to merge more tables from source in to one dest table(not possible)
, see http://forums.oracle.com/forums/thread.jspa?messageID=2828984�
for Streams on RAC and listener
See Implementing an history database for a real life example and requirement using Streams
Requirements
Officially global_names=true is required but referring to metalink note 344239.1 it should work even if false
Archive log required on source db
I recommend a FORCE LOGGING on db otherwise you lose the NOLOGGING sql DML
Premises for this document and references to untested issues As usual every line of code was personally tested and verified. Streams errors encountered are included in Oracle Managed Errors, simply search it. This is not a document were you just copy and paste to have a Streams environment working, instead for every statement altering the Streams structure I'll provide detailed methods for troubleshooting and analyzing to see what happens behind the scene I just remember the basic Streams structure: Source DB _________ Dest DB | Capture | _______ | | | | | | Queue ------> Propagate -> Queue | |_________| | | | | Apply | |_______| You may also have everything on a single database: DB _________ | Capture | | | | | Queue | | | | | Apply | |_________| Basically for each apply there is a queue. STRMADMIN user is a DBA user and should not contain logic procedures, you may drop it and recreate on demand. Anyway the examples below are based from a simple application retaining historical data changes on the Streams destination, this is a good real life example to start. Source table: CREATE TABLE TEST.SOURCETABLE ( NOME VARCHAR2(50 BYTE), ID NUMBER primary key ) Dest table: CREATE TABLE TEST.SOURCETABLE ( ID NUMBER, NOME VARCHAR2(50 BYTE), NOME_OLD VARCHAR2(50 BYTE), ACTION VARCHAR2(30 CHAR), DATE_LOG DATE DEFAULT sysdate NOT NULL, USERNAME_LOG VARCHAR2(100 CHAR) ) Using different owner/table_name between instances Using a different owner/table_name for the destination is not supported and you get several errors during environment set up, however you can do it since you'll use a custom dml handler to apply changes. Simply create a table with one column on destination with the same owner/table_name of source and then drop it, if you want to avoid errors on alert log on destination. ! Its a very complicated matter applying ddl if the destination owner/table_name is different since on the ddl handler ! you only have the row ddl text to use. #Untested See DBMS_STREAMS_ADM.RENAME_SCHEMA Hub and Spoke on the forum http://forums.oracle.com/forums/thread.jspa?threadID=683801&tstart=30 Different OS platform Is possible, check note UNTESTED 418755.1, 224255.1 Compare and determine schema differences UNTESTED 10g DBMS_RECTIFIER_DIFF.DIFFERENCES Note: 1062732.6 11g DBMS_COMPARISON.COMPARE and DBMS_COMPARISON.CONVERGE Note: 463295.1 My Forum - Accessing others LCR$_ROW_RECORD in same transaction from DML handler http://forums.oracle.com/forums/thread.jspa?messageID=2784542 |
User creation and grants
create user strmadmin identified by strmadmin
default tablespace stream
quota unlimited on stream;
grant connect,resource,dba to strmadmin;
BEGIN
DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE(
grantee => 'strmadmin',
grant_privileges => true);
END;
/
|
Db link setup (so basic... I could omit this...) 10g, strmadmin>--create the database link from source to dest CREATE DATABASE LINK AL12 CONNECT TO STRMADMIN IDENTIFIED BY STRMADMIN USING 'al12'; #test select * from dual@AL12 11g, strmadmin>--create the database link from dest to source CREATE DATABASE LINK DB10 CONNECT TO STRMADMIN IDENTIFIED BY STRMADMIN USING 'DB10'; #test select * from dual@DB10 |
Source data - Setup supplemental logging and grant
#From 10.2 primary/unique key are logged by default, if no key then all columns are logged (see Asynchronous CDC Cookbook)
#always set suppl. logg. at very source db
#not recommended
alter database add supplemental log data (all) columns;
#recommended
ALTER TABLE test.sourceTable ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
if dont use the clause (ALL) you may get this depending on your application, due to missing columns data :
ORA-01400: cannot insert NULL into ("MTS_HDATA"."AGENTS"."PARTY_BIC")
ORA-06512: at "SYS.LCR$_ROW_RECORD", line 419
ORA-06512: at "MTS_APPLY.TABLE_HANDLERS", line 52
ORA-06512: at "MTS_APPLY.TABLE_HANDLERS", line 55
ORA-06512: at line 1
UNTESTED
You may also use DBMS_CAPTURE_ADM.prepare_table_instantiation(supplemental_logging=>'all')
|
Check supplemental logging SELECT always, table_name, log_group_type FROM dba_log_groups order by table_name; |
Queue
#Sharing the queue
#A single queue may be used for many tables
--Create the queue with STRMADMIN user
BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
queue_table => 'strmadmin.q_sourcetable',
queue_name => 'strmadmin.q_sourcetable',
queue_user => 'STRMADMIN');
END;
#Untested, manual dequeue http://forums.oracle.com/forums/thread.jspa?threadID=611832&tstart=240
|
Queue troubleshooting
SELECT * FROM DBA_QUEUES where owner = 'STRMADMIN'
How to remove it?
begin
DBMS_STREAMS_ADM.REMOVE_QUEUE(queue_name => 'STRMADMIN.STREAMS_QUEUE',
cascade => true, drop_unused_queue_table => true);
end;
Any data sitting in the queue? UNTESTED
select * from streams_QUEUE_TABLE;
SELECT * FROM AQ$streams_queue_table;
|
Need to moodify rules? #A single rule is more performing http://forums.oracle.com/forums/thread.jspa?messageID=1774910� #See below for how to select the rule name. #The rule may relate to any Streams process #I recommend to use a single rule for capture, propagation and apply even for many tables, #to accomplish this I normally create a real dummy rule for each process and then I alter the rule. #Never rely then on DBA_STREAMS_RULES --Valid example condition: --'(:dml.get_object_owner() = ''MY_OWNER'' and (:dml.get_object_name() in (''TABLE1'', ''TABLE2'')))' begin DBMS_RULE_ADM.ALTER_RULE(rule_name => 'MY_RULE', condition => '(((:dml.get_object_owner() = ''ALDO'' and :dml.get_object_name() = ''PROVA'')) and :dml.is_null_tag() = ''Y'' and :dml.get_source_database_name() = ''AL12'')' --evaluation_context IN VARCHAR2 DEFAULT NULL,--no need to change --remove_evaluation_context IN BOOLEAN DEFAULT false, --action_context IN SYS.RE$NV_LIST DEFAULT NULL, --remove_action_context IN BOOLEAN DEFAULT false, --rule_comment IN VARCHAR2 DEFAULT NULL, --remove_rule_comment IN BOOLEAN DEFAULT false ); end; Need extra columns? (Since 10.2) select * from DBA_STREAMS_ADD_COLUMN #Usefull to create columns at Capture time Detect the rule name as above. Column will be added when rule evaluates true. BEGIN DBMS_STREAMS_ADM.ADD_COLUMN( rule_name => 'MY_RULE', table_name => 'ALDO.PROVA', column_name => 'birth_date', column_value => ANYDATA.ConvertDate(NULL), value_type => 'NEW', step_number => 0, operation => 'ADD'); END; #To remove this rule use: BEGIN DBMS_STREAMS_ADM.ADD_COLUMN ( rule_name => 'MY_RULE', table_name => 'ALDO.PROVA', column_name => 'birth_date', column_value => ANYDATA.ConvertDate(NULL), value_type => 'NEW', step_number => 0, operation => 'REMOVE'); END; Need a customized function? See http://docs.huihoo.com/oracle/docs/B19306_01/server.102/b14229/strms_mtransform.htm#i1006552, http://forums.oracle.com/forums/thread.jspa?messageID=1421560 Caution deterministic function needed... Function is called outside originated transaction context Sometime people ignore that a deterministic function cache results and my be called by system from outside any context See for example this thread http://forums.oracle.com/forums/thread.jspa?messageID=45218 Remove a generic rule from DBA_STREAMS_RULES begin DBMS_STREAMS_ADM.REMOVE_RULE(rule_name=>'BANK_CHARGES_VOL_BREAKS290', streams_type=>'APPLY', streams_name=>'APPLY_MTS', drop_unused_rule=>TRUE, inclusion_rule=>TRUE); end; |
Using the DBMS_STREAMS.ADD_TABLE_RULES procedure
--Creates capture, propagations and apply.
--Here is how to use to have a single rule
declare
mdml_rule_name VARCHAR2(200);
mddl_rule_name VARCHAR2(200);
m_condition varchar2(32000) := '(:dml.get_object_owner() = ''ALDO'' and (:dml.get_object_name() in (''PERSONE'', ''INDIRIZZI'')))';
begin
--creates a dummy rule
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'ALDO.PALLO',
--APPLY or CAPTURE
streams_type => 'apply',
streams_name => 'APPLY_MTS' ,
queue_name => 'STRMADMIN.Q_MTS',
include_dml => true,
include_ddl => false,
inclusion_rule => true,
dml_rule_name => mdml_rule_name,
ddl_rule_name => mddl_rule_name,
source_database => '&2');
--alter capture
DBMS_RULE_ADM.ALTER_RULE(rule_name => mdml_rule_name, condition => m_condition);
end;
/
|
Capture #Untested - adding rules to capture process http://forums.oracle.com/forums/thread.jspa?threadID=656049&tstart=75 #Sharing the capture process #You may use a single capture process for more than one table #Use the name of an existing capture to add a new table begin --Must run at least once on SOURCE db DBMS_CAPTURE_ADM.BUILD; end; / --ADD the capture rules using the DBMS_STREAMS.ADD_TABLE_RULES procedure, see above --The user changing source data will be available to the Streams declare capture_name varchar2(200) := 'capture_test'; BEGIN DBMS_CAPTURE_ADM.INCLUDE_EXTRA_ATTRIBUTE(capture_name => capture_name, attribute_name => 'username', include => true); DBMS_CAPTURE_ADM.INCLUDE_EXTRA_ATTRIBUTE(capture_name => capture_name, attribute_name => 'row_id', include => true); DBMS_CAPTURE_ADM.INCLUDE_EXTRA_ATTRIBUTE(capture_name => capture_name, attribute_name => 'serial#', include => true); DBMS_CAPTURE_ADM.INCLUDE_EXTRA_ATTRIBUTE(capture_name => capture_name, attribute_name => 'session#', include => true); DBMS_CAPTURE_ADM.INCLUDE_EXTRA_ATTRIBUTE(capture_name => capture_name, attribute_name => 'thread#', include => true); DBMS_CAPTURE_ADM.INCLUDE_EXTRA_ATTRIBUTE(capture_name => capture_name, attribute_name => 'tx_name', include => true); END; begin dbms_capture_adm.start_capture(capture_name => 'capture_test'); end; Remove capture rules select 'execute DBMS_STREAMS_ADM.REMOVE_RULE(rule_name=>''' || rule_name || ''', streams_type=>''' || streams_type || ''', streams_name=>''' || streams_name || ''', drop_unused_rule=>TRUE, inclusion_rule=>TRUE);' from dba_streams_rules where streams_type = 'CAPTURE' |
Capture troubleshooting
--View capture status
SELECT c.capture_name, SUBSTR (s.program, INSTR (s.program, '(') + 1, 4) process_name, c.SID,
c.serial#, c.state, c.total_messages_captured,
c.total_messages_enqueued, c.enqueue_time last_enqueue, sysdate
FROM v$streams_capture c, v$session s
WHERE c.SID = s.SID AND c.serial# = s.serial#;
Capture rules
--Do not rely on DBA_STREAMS_TABLE_RULES, if you uncleanely dropped
--STRMADMIN user or something, old unused rules will be there
select DBA_RULES.* from dba_capture, DBA_RULE_SET_RULES, DBA_RULES
where DBA_RULE_SET_RULES.rule_set_name (+)= dba_capture.rule_set_name
and DBA_RULES.rule_name (+)= DBA_RULE_SET_RULES.rule_name
#Create this function to have a clear idea of the SCN order
CREATE OR REPLACE function sortScn(startScn number, capturedScn number,
appliedScn number, firstScn number, sourceResetlogsScn number,
maxCheckpointScn number, requiredCheckpointScn number, lastEnqueuedScn number)
return varchar2 is
type tRec is record(name varchar2(50), value number);
r tRec;
TYPE nestTable IS TABLE OF tRec;
n nestTable;
tmpvalue tRec;
i number;
result varchar2(1000) := '';
begin
n := nestTable();
--Add start SCN
r.name := 'Start SCN '; r.value := startScn;
n.extend(); n(n.count()) := r;
--Add captured SCN
r.name := 'Captured SCN '; r.value := capturedScn;
n.extend(); n(n.count()) := r;
--Add applied SCN
r.name := 'Applied SCN '; r.value := appliedScn;
n.extend(); n(n.count()) := r;
--Add first SCN
r.name := 'First SCN '; r.value := firstScn;
n.extend(); n(n.count()) := r;
--Add source resetlogs SCN
r.name := 'Source resetlogs SCN '; r.value := sourceResetlogsScn;
n.extend(); n(n.count()) := r;
--Add start SCN
r.name := 'Max Checkpoint SCN '; r.value := maxCheckpointScn;
n.extend(); n(n.count()) := r;
--Add required checkpoint SCN
r.name := 'Required checkpoint SCN '; r.value := requiredCheckpointScn;
n.extend(); n(n.count()) := r;
--Add last enqueued SCN SCN
r.name := 'Last enqueued SCN '; r.value := lastEnqueuedScn;
n.extend(); n(n.count()) := r;
--Add current SCN
r.name := 'Current SCN '; r.value := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
n.extend(); n(n.count()) := r;
begin
--Sort
i := n.first;
while(i<n.last-1) loop
--DBMS_OUTPUT.PUT_LINE(i);
--for i in n.first..n.last-1 loop
if(n(i).value>n(i+1).value) then
--DBMS_OUTPUT.PUT_LINE('found');
tmpvalue := n(i);
n(i) := n(i+1);
n(i+1) := tmpvalue;
i := n.first;
else i := i+1;
end if;
end loop;
end;
for i in n.first..n.last loop
result := result || (n(i).name || ' ' || n(i).value);
if(i<n.last) then
result := result || chr(13) || chr(10);
end if;
end loop;
return result;
end;
#Query DBA_STREAMS_RULES to lookup RULE_SET_NAME
select c.*, sortScn(start_scn, captured_Scn, applied_Scn, first_Scn,
source_Resetlogs_Scn, max_Checkpoint_Scn, required_Checkpoint_Scn,
last_Enqueued_Scn) scn_order from dba_capture c
#Normally the result of the scn_order column is:
Source resetlogs SCN 33636153271
Start SCN (or First SCN) 33636310564
First SCN (or Start SCN) 33636310564
Required checkpoint SCN 33636697161
Captured SCN 33636770454
Applied SCN 33636770454
Max Checkpoint SCN 33636770454
Last enqueued SCN 33636782303
Current SCN 33636783363
#Is there a capture history?
select * from DBA_HIST_STREAMS_CAPTURE
#Tracing capture - UNTESTED!
http://forums.oracle.com/forums/thread.jspa?threadID=687265&tstart=30 |
Propagation
#Sharing the propagation
#A single propagation process may be used for many tables
#First create a queue on destination(see above)
declare
mdml_rule_name VARCHAR2(200);
mddl_rule_name VARCHAR2(200);
m_condition varchar2(32000) := '(:dml.get_object_owner() = ''ALDO'' and (:dml.get_object_name() in (''PERSONE'', ''INDIRIZZI'')))';
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(table_name => 'MTS_OWNER.BANK_CHARGES',
streams_name => 'PROPAGATION_MTS',
source_queue_name => 'strmadmin.Q_MTS',
destination_queue_name => 'STRMADMIN.Q_MTS@SMTAA.ST',
include_dml => true,
include_ddl => false,
dml_rule_name => mdml_rule_name,
ddl_rule_name => mddl_rule_name,
source_database => 'SMTGA.ST');
DBMS_RULE_ADM.ALTER_RULE(rule_name => mdml_rule_name, condition => m_condition);
END;
/
|
Propagation
select * from dba_propagation where propagation_name = 'PROPAGATION_TEST'
Propagation rules
select DBA_RULES.* from dba_propagation, DBA_RULE_SET_RULES, DBA_RULES
where DBA_RULE_SET_RULES.rule_set_name (+)= dba_propagation.rule_set_name
and DBA_RULES.rule_name (+)= DBA_RULE_SET_RULES.rule_name
BEGIN
DBMS_PROPAGATION_ADM.STOP_PROPAGATION(
propagation_name => 'prop1',
--clean statistics
force => true);
END;
/
BEGIN
DBMS_PROPAGATION_ADM.DROP_PROPAGATION(
propagation_name => 'PROPAGATION_MTS',
drop_unused_rule_sets => TRUE);
END;
/
BEGIN
DBMS_PROPAGATION_ADM.START_PROPAGATION(
propagation_name => 'prop1');
END;
|
Instantiation number
Concept:
The instantiation informations are recorded in the destination database
#Now we are going to start the apply process but the scn must be aligned with source
#the dblink is the destination of the apply process
#you only use the source table name here even if the destination table is different
create or replace procedure prepareInstantiation(source_table_name varchar2) is
source_scn NUMBER;
--This is the global_name of the source
source_db_name varchar2(100) := 'DMTGA.DE';
BEGIN
select global_name into source_db_name from global_name;
source_scn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
--use the dblink if you are propagating to a remote queue
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@DMTAA(
source_object_name => source_table_name,
source_database_name => source_db_name,
instantiation_scn => source_scn);
dbms_output.put_line(source_scn);
--on source database
dbms_capture_adm.prepare_table_instantiation(source_table_name, supplemental_logging=>'all');
END;
begin
prepareInstantiation('TEST.SOURCETABLE');
end;
#What does this do?
select first_scn from dba_capture where capture_name = 'CAPTURE_TEST'
#Query DBA_STREAMS_RULES to lookup RULE_SET_NAME
select decode(sign(difference_scn), -1, '!!CHANGES NOT BEING APPLIED, LOGS LOOSING!!', 'Normal') status,
b.* from (
select curr_scn - instantiation_scn difference_scn, a.* from (
select DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER() curr_scn, i.*
from DBA_APPLY_INSTANTIATED_OBJECTS@al12 i
) a
) b
|
Apply
#Sharing the apply
#For every apply process the apply order will be mantained,
#you'll likely use a single apply process for a schema
--here we create the apply process on destination
--ADD the capture rules using the DBMS_STREAMS.ADD_TABLE_RULES procedure, see above
begin
--make your reflections on this value, see DBA_APPLY_PARAMETERS for existing values
DBMS_APPLY_ADM.SET_PARAMETER(apply_name => 'APPLY_TEST',parameter => 'disable_on_error', value => 'y');
end;
BEGIN
DBMS_APPLY_ADM.START_APPLY(apply_name => 'apply_test');
END;
|
Apply troubleshooting
Apply Parameters
select * from DBA_APPLY_PARAMETERS
Apply rules
select DBA_RULES.* from dba_apply, DBA_RULE_SET_RULES, DBA_RULES
where DBA_RULE_SET_RULES.rule_set_name (+)= dba_apply.rule_set_name
and DBA_RULES.rule_name (+)= DBA_RULE_SET_RULES.rule_name
Apply status
SELECT dba_apply.*, sysdate FROM DBA_apply where apply_name = 'APPLY_TEST'
select * from dba_rules where upper(rule_condition) like upper('%TESTONA%')
How to delete it?
execute DBMS_APPLY_ADM.STOP_APPLY(apply_name => 'apply_test');
execute DBMS_APPLY_ADM.DROP_APPLY(apply_name => 'apply_test');
BEGIN
DBMS_APPLY_ADM.START_APPLY(apply_name => 'apply_test');
END;
Errors in apply? Is the apply process running? Remember to re-execute pending errors before restarting the apply
--First check sequence
select * from dba_apply
select * from dba_apply_error
execute DBMS_APPLY_ADM.EXECUTE_All_ERRORS;
select * from dba_apply_error
execute DBMS_APPLY_ADM.START_APPLY(apply_name => 'APPLY_MTS');
select * from DBA_APPLY_PROGRESS
#Check the handler exists
SELECT * FROM DBA_APPLY_DML_HANDLERS WHERE APPLY_DATABASE_LINK IS NULL ORDER BY OBJECT_OWNER, OBJECT_NAME;
#What is on alert log if the apply stop due to an exception?
Wed Aug 27 01:01:45 2008
Streams Apply Reader AS05 for APPLY_TEST2 with pid=38 OS id=12691 stopped
#You also find a trace in bdump in the form al12_as02_10503.trc <db>_as##_####.trc
#You may resubmit the failed apply using:
begin
DBMS_APPLY_ADM.EXECUTE_ERROR(local_transaction_id => '24.21.1447');
end;
#or
execute DBMS_APPLY_ADM.EXECUTE_All_ERRORS;
#The apply process stopped for a full tbs, now the tbs is ok how many messages are now to be dequeued?
SELECT s.SUBSCRIBER_NAME, q.QUEUE_SCHEMA, q.QUEUE_NAME, s.LAST_DEQUEUED_SEQ,
s.NUM_MSGS, s.TOTAL_SPILLED_MSG
FROM V$BUFFERED_QUEUES q, V$BUFFERED_SUBSCRIBERS s, DBA_APPLY a
WHERE q.QUEUE_ID = s.QUEUE_ID AND s.SUBSCRIBER_ADDRESS IS NULL AND s.SUBSCRIBER_NAME = a.APPLY_NAME;
|
DML Handler
grant all on sys.dbms_system to test;
SYS.LCR$_ROW_RECORD.COMMAND_TYPE = INSERT(NEW values only)
SYS.LCR$_ROW_RECORD.COMMAND_TYPE = UPDATE(NEW and OLD values)
SYS.LCR$_ROW_RECORD.COMMAND_TYPE = DELETE(OLD values only)
CREATE OR REPLACE PROCEDURE TEST.aldo_prova_handl(in_any IN ANYDATA) IS
lcr SYS.LCR$_ROW_RECORD;
rc PLS_INTEGER;
command VARCHAR2(30);
old_values SYS.LCR$_ROW_LIST;
mID SYS.AnyData;
destOwner varchar2(50) := 'TEST';
destTable varchar2(50) := 'DESTTABLE';
BEGIN
rc := in_any.GETOBJECT(lcr);
command:=lcr.GET_COMMAND_TYPE();
sys.dbms_system.ksdwrt(2,command);
--We always insert since this table keeps track of historical data
lcr.SET_COMMAND_TYPE('INSERT');
lcr.set_object_owner(destOwner);
lcr.SET_OBJECT_NAME(destTable);
IF command IN ('UPDATE') THEN
mID := lcr.get_value('old', 'ID');
lcr.add_column('new', 'ID', mID);
lcr.add_column('new', 'NOME_OLD', lcr.get_value('old', 'nome'));
lcr.SET_VALUES('old', NULL);
elsif command in ('DELETE') then
--Get the old values in the row LCR
old_values := lcr.GET_VALUES('old');
lcr.SET_VALUES('new', old_values);
lcr.SET_VALUES('old', NULL);
END IF;
lcr.ADD_COLUMN('new', 'USERNAME_LOG', lcr.GET_EXTRA_ATTRIBUTE('USERNAME') );
lcr.ADD_COLUMN('new', 'ACTION', ANYDATA.ConvertVarChar2(command));
lcr.EXECUTE(true);
--Do not execute COMMIT or ROLLBACK statements. Doing so may endanger the
--consistency of the transaction that contains the LCR.
exception
when others then
begin
sys.dbms_system.ksdwrt(3, SQLERRM );
sys.dbms_system.ksdwrt(3, DBMS_UTILITY.FORMAT_ERROR_BACKTRACE );
raise;
end;
end;
/
grant all on TEST.aldo_prova_handl to strmadmin;
--Object name is the source table, even if the destination table is different because you use a
--Remember that the relative error handler is removed if exists
CREATE OR REPLACE procedure STRMADMIN.prepareDmlHandler(mOwner varchar2, mTableName varchar2, ownProcName varchar2) is
applyName varchar2(50) := 'APPLY_TEST';
begin
DBMS_APPLY_ADM.SET_DML_HANDLER(
object_name => mOwner||'.'||mTableName,
object_type => 'TABLE',
operation_name => 'INSERT',
error_handler => false,
user_procedure => ownProcName,
apply_database_link => NULL,
apply_name => applyName);
DBMS_APPLY_ADM.SET_DML_HANDLER(
object_name => mOwner||'.'||mTableName,
object_type => 'TABLE',
operation_name => 'UPDATE',
error_handler => false,
user_procedure => ownProcName,
apply_database_link => NULL,
apply_name => applyName);
DBMS_APPLY_ADM.SET_DML_HANDLER(
object_name => mOwner||'.'||mTableName,
object_type => 'TABLE',
operation_name => 'DELETE',
error_handler => false,
user_procedure => ownProcName,
apply_database_link => NULL,
apply_name => applyName);
END;
/
begin
prepareDmlHandler('TEST', 'SOURCETABLE', 'TEST.aldo_prova_handl');
end;
#Remember that you may also have operation_name=>'LOB_UPDATE'
#What does this do?
SELECT * FROM DBA_APPLY_DML_HANDLERS |
Error Handler
#Using an error handler discards the normal dml handler and changes are automatically applied
--grant select, references on DBA_APPLY_ERROR to test;
create or replace PROCEDURE TEST.aldo_prova_handl_err(in_any IN ANYDATA, error_record IN DBA_APPLY_ERROR%ROWTYPE,
error_message_number IN NUMBER, messaging_default_processing IN OUT BOOLEAN, out_any OUT ANYDATA) is
LCR sys.lcr$_row_record;
RC pls_integer;
begin
sys.dbms_system.ksdwrt(3, 'Processing error');
messaging_default_processing:=TRUE;
out_any:=in_any;
sys.dbms_system.ksdwrt(3, 'Streams error due to ORA-' || error_record.ERROR_NUMBER);
/*if error_record.ERROR_NUMBER=1403 then
rc:=in_any.getobject(lcr);
lcr.set_values('NEW',lcr.get_values('OLD'));
lcr.set_values('OLD',NULL);
lcr.set_command_type('INSERT');
out_any := ANYDATA.ConvertObject(lcr);
else;*/
exception
when others then
begin
sys.dbms_system.ksdwrt(3, 'Streams error due to ORA-' || error_record.ERROR_NUMBER);
sys.dbms_system.ksdwrt(3, SQLERRM );
sys.dbms_system.ksdwrt(3, DBMS_UTILITY.FORMAT_ERROR_BACKTRACE );
raise;
end;
end;
BEGIN
DBMS_APPLY_ADM.SET_DML_HANDLER(
object_name => 'TEST.SOURCETABLE',
object_type => 'TABLE',
operation_name => 'INSERT',
error_handler => true,
user_procedure => 'test.aldo_prova_handl_err',
--user_procedure => null,
apply_database_link => NULL,
apply_name => 'APPLY_TEST');
END;
/
BEGIN
DBMS_APPLY_ADM.SET_DML_HANDLER(
object_name => 'TEST.SOURCETABLE',
object_type => 'TABLE',
operation_name => 'UPDATE',
error_handler => true,
user_procedure => 'test.aldo_prova_handl_err',
--user_procedure => null,
apply_database_link => NULL,
apply_name => 'APPLY_TEST');
END;
/
BEGIN
DBMS_APPLY_ADM.SET_DML_HANDLER(
object_name => 'TEST.SOURCETABLE',
object_type => 'TABLE',
operation_name => 'DELETE',
error_handler => true,
user_procedure => 'test.aldo_prova_handl_err',
--user_procedure => null,
apply_database_link => NULL,
apply_name => 'APPLY_TEST');
END;
/
#Remember that you may also have operation_name=>'LOB_UPDATE'
|
DDL Handler
CREATE OR REPLACE PROCEDURE TEST.apply_ddl_handler(in_any IN ANYDATA) IS
lcr SYS.LCR$_DDL_RECORD;
rc PLS_INTEGER;
ddl_text CLOB;
BEGIN
rc := in_any.GETOBJECT(lcr);
DBMS_LOB.CREATETEMPORARY(ddl_text, true);
lcr.GET_DDL_TEXT(ddl_text);
sys.dbms_system.ksdwrt(2, 'DDL text:' || ddl_text);
DBMS_LOB.FREETEMPORARY(ddl_text);
lcr.EXECUTE();
exception
when others then
begin
sys.dbms_system.ksdwrt(2, SQLERRM );
sys.dbms_system.ksdwrt(2,DBMS_UTILITY.FORMAT_ERROR_STACK );
sys.dbms_system.ksdwrt(2, DBMS_UTILITY.FORMAT_ERROR_BACKTRACE );
raise;
end;
END;
/
#You need this specific grant
grant all on test.apply_ddl_handler to strmadmin;
BEGIN
DBMS_APPLY_ADM.ALTER_APPLY(apply_name => 'apply_test', ddl_handler => 'test.apply_ddl_handler');
END;
|
My custom function Anydata to Varchar2
CREATE OR REPLACE function anydataToVarchar2(data IN SYS.AnyData) return varchar2 IS
tn VARCHAR2(61);
str VARCHAR2(4000);
chr VARCHAR2(1000);
num NUMBER;
dat DATE;
rw RAW(4000);
res NUMBER;
result varchar2(4000);
BEGIN
IF data IS NULL THEN
result := 'null';
END IF;
tn := data.GETTYPENAME();
IF tn = 'SYS.VARCHAR2' THEN
res := data.GETVARCHAR2(str);
result := (SUBSTR(str,0,253));
ELSIF tn = 'SYS.CHAR' then
res := data.GETCHAR(chr);
result := (SUBSTR(chr,0,253));
ELSIF tn = 'SYS.VARCHAR' THEN
res := data.GETVARCHAR(chr);
result := chr;
ELSIF tn = 'SYS.NUMBER' THEN
res := data.GETNUMBER(num);
result := num;
ELSIF tn = 'SYS.DATE' THEN
res := data.GETDATE(dat);
result := dat;
ELSIF tn = 'SYS.RAW' THEN
-- res := data.GETRAW(rw);
-- DBMS_OUTPUT.PUT_LINE(SUBSTR(DBMS_LOB.SUBSTR(rw),0,253));
result := 'BLOB Value';
ELSIF tn = 'SYS.BLOB' THEN
result := 'BLOB Found';
ELSE
result := 'typename is ' || tn;
END IF;
return result;
END; |
Print LCR utilities
grant all on sys.dbms_lob to strmadmin;
-- procedure that prints the value in a SYS.AnyData Object
CREATE OR REPLACE PROCEDURE STRMADMIN.print_any(data IN SYS.AnyData) IS
tn VARCHAR2(61);
str VARCHAR2(4000);
chr VARCHAR2(1000);
num NUMBER;
dat DATE;
rw RAW(4000);
res NUMBER;
BEGIN
IF data IS NULL THEN
DBMS_OUTPUT.PUT_LINE('NULL value');
RETURN;
END IF;
tn := data.GETTYPENAME();
IF tn = 'SYS.VARCHAR2' THEN
res := data.GETVARCHAR2(str);
DBMS_OUTPUT.PUT_LINE(SUBSTR(str,0,253));
ELSIF tn = 'SYS.CHAR' then
res := data.GETCHAR(chr);
DBMS_OUTPUT.PUT_LINE(SUBSTR(chr,0,253));
ELSIF tn = 'SYS.VARCHAR' THEN
res := data.GETVARCHAR(chr);
DBMS_OUTPUT.PUT_LINE(chr);
ELSIF tn = 'SYS.NUMBER' THEN
res := data.GETNUMBER(num);
DBMS_OUTPUT.PUT_LINE(num);
ELSIF tn = 'SYS.DATE' THEN
res := data.GETDATE(dat);
DBMS_OUTPUT.PUT_LINE(dat);
ELSIF tn = 'SYS.RAW' THEN
-- res := data.GETRAW(rw);
-- DBMS_OUTPUT.PUT_LINE(SUBSTR(DBMS_LOB.SUBSTR(rw),0,253));
DBMS_OUTPUT.PUT_LINE('BLOB Value');
ELSIF tn = 'SYS.BLOB' THEN
DBMS_OUTPUT.PUT_LINE('BLOB Found');
ELSE
DBMS_OUTPUT.PUT_LINE('typename is ' || tn);
END IF;
END print_any;
/
-- procedure that prints a specified LCR
CREATE OR REPLACE PROCEDURE STRMADMIN.print_lcr(lcr IN SYS.ANYDATA) IS
typenm VARCHAR2(61);
ddllcr SYS.LCR$_DDL_RECORD;
proclcr SYS.LCR$_PROCEDURE_RECORD;
rowlcr SYS.LCR$_ROW_RECORD;
res NUMBER;
newlist SYS.LCR$_ROW_LIST;
oldlist SYS.LCR$_ROW_LIST;
ddl_text CLOB;
ext_attr SYS.AnyData;
BEGIN
typenm := lcr.GETTYPENAME();
DBMS_OUTPUT.PUT_LINE('type name: ' || typenm);
IF (typenm = 'SYS.LCR$_DDL_RECORD') THEN
res := lcr.GETOBJECT(ddllcr);
DBMS_OUTPUT.PUT_LINE('source database: ' ||
ddllcr.GET_SOURCE_DATABASE_NAME);
DBMS_OUTPUT.PUT_LINE('owner: ' || ddllcr.GET_OBJECT_OWNER);
DBMS_OUTPUT.PUT_LINE('object name: ' || ddllcr.GET_OBJECT_NAME);
DBMS_OUTPUT.PUT_LINE('object type: ' || ddllcr.GET_OBJECT_TYPE);
DBMS_OUTPUT.PUT_LINE('schema: ' || ddllcr.GET_CURRENT_SCHEMA);
DBMS_OUTPUT.PUT_LINE('transaction id: ' || ddllcr.GET_TRANSACTION_ID);
DBMS_OUTPUT.PUT_LINE('SCN: ' || ddllcr.GET_SCN);
DBMS_LOB.CREATETEMPORARY(ddl_text, true);
ddllcr.GET_DDL_TEXT(ddl_text);
DBMS_OUTPUT.PUT_LINE('ddl: ' || ddl_text);
ELSIF (typenm = 'SYS.LCR$_ROW_RECORD') THEN
res := lcr.GETOBJECT(rowlcr);
DBMS_OUTPUT.PUT_LINE('source database: ' ||
rowlcr.GET_SOURCE_DATABASE_NAME);
DBMS_OUTPUT.PUT_LINE('owner: ' || rowlcr.GET_OBJECT_OWNER);
DBMS_OUTPUT.PUT_LINE('object: ' || rowlcr.GET_OBJECT_NAME);
--DBMS_OUTPUT.PUT_LINE('object type: ' || ddllcr.GET_OBJECT_TYPE);
--DBMS_OUTPUT.PUT_LINE('schema: ' || ddllcr.GET_CURRENT_SCHEMA);
--DBMS_OUTPUT.PUT_LINE('transaction id: ' || ddllcr.GET_TRANSACTION_ID);
--DBMS_OUTPUT.PUT_LINE('SCN: ' || ddllcr.GET_SCN);
DBMS_OUTPUT.PUT_LINE('command_type: ' || rowlcr.GET_COMMAND_TYPE);
oldlist := rowlcr.GET_VALUES('old');
FOR i IN 1..oldlist.COUNT LOOP
IF oldlist(i) IS NOT NULL THEN
DBMS_OUTPUT.PUT_LINE('old(' || i || '): ' || oldlist(i).column_name);
print_any(oldlist(i).data);
END IF;
END LOOP;
newlist := rowlcr.GET_VALUES('new', 'n');
FOR i in 1..newlist.count LOOP
IF newlist(i) IS NOT NULL THEN
DBMS_OUTPUT.PUT_LINE('new(' || i || '): ' || newlist(i).column_name);
print_any(newlist(i).data);
END IF;
END LOOP;
ELSE
DBMS_OUTPUT.PUT_LINE('Non-LCR Message with type ' || typenm);
END IF;
END print_lcr;
/
-- procedure that prints all the LCRs in the Error queue
CREATE OR REPLACE PROCEDURE STRMADMIN.print_errors IS
CURSOR c IS
SELECT LOCAL_TRANSACTION_ID,
SOURCE_DATABASE,
MESSAGE_NUMBER,
MESSAGE_COUNT,
ERROR_NUMBER,
ERROR_MESSAGE
FROM DBA_APPLY_ERROR
ORDER BY SOURCE_DATABASE, SOURCE_COMMIT_SCN;
i NUMBER;
txnid VARCHAR2(30);
source VARCHAR2(128);
msgno NUMBER;
msgcnt NUMBER;
errnum NUMBER := 0;
errno NUMBER;
errmsg VARCHAR2(255);
lcr SYS.AnyData;
r NUMBER;
BEGIN
FOR r IN c LOOP
errnum := errnum + 1;
msgcnt := r.MESSAGE_COUNT;
txnid := r.LOCAL_TRANSACTION_ID;
source := r.SOURCE_DATABASE;
msgno := r.MESSAGE_NUMBER;
errno := r.ERROR_NUMBER;
errmsg := r.ERROR_MESSAGE;
DBMS_OUTPUT.PUT_LINE('*************************************************');
DBMS_OUTPUT.PUT_LINE('----- ERROR #' || errnum);
DBMS_OUTPUT.PUT_LINE('----- Local Transaction ID: ' || txnid);
DBMS_OUTPUT.PUT_LINE('----- Source Database: ' || source);
DBMS_OUTPUT.PUT_LINE('----Error in Message: '|| msgno);
DBMS_OUTPUT.PUT_LINE('----Error Number: '||errno);
DBMS_OUTPUT.PUT_LINE('----Message Text: '||errmsg);
FOR i IN 1..msgcnt LOOP
DBMS_OUTPUT.PUT_LINE('--message: ' || i);
lcr := DBMS_APPLY_ADM.GET_ERROR_MESSAGE(i, txnid);
print_lcr(lcr);
END LOOP;
END LOOP;
END print_errors;
/ |
Remove/Uninstall Streams #10g begin DBMS_STREAMS_ADM.REMOVE_STREAMS_CONFIGURATION; end; |
Views, monitoring and troubleshooting
select capture_name, queue_name, ERROR_NUMBER, ERROR_MESSAGE
from dba_capture where status != 'ENABLED'
select * FROM DBA_CAPTURE;
select * from dba_propagation;
SELECT r.CONSUMER_NAME,
r.SOURCE_DATABASE,
r.SEQUENCE#,
r.NAME,
r.DICTIONARY_BEGIN,
r.DICTIONARY_END
FROM DBA_REGISTERED_ARCHIVED_LOG r, DBA_CAPTURE c
WHERE r.CONSUMER_NAME = c.CAPTURE_NAME;
#May safely remove these logs
SELECT * FROM DBA_LOGMNR_PURGED_LOG;
SELECT * FROM DBA_CAPTURE_PARAMETERS;
SELECT * FROM DBA_CAPTURE_EXTRA_ATTRIBUTES
SELECT * FROM DBA_QUEUE_TABLES order by owner, queue_table
SELECT * FROM DBA_QUEUES order by owner
SELECT * FROM DBA_APPLY_DML_HANDLERS
select * from DBA_QUEUE_SCHEDULES
select * from DBA_STREAMS_COLUMNS;
select * from DBA_STREAMS_ADMINISTRATOR;
select * from DBA_STREAMS_RULES;
select * from DBA_STREAMS_TABLE_RULES;
select * from SYS.DBA_STREAMS_UNSUPPORTED; |
Untested Remove stream Metalink note 276648.1 Multi version data dictionary refer to Metalink note 212044.1 |
Troubleshooting NO DATA FOUND in apply for update/delete
ORA-01403: no data found
ORA-01403: no data found
ORA-06512: at "SYS.LCR$_ROW_RECORD", line 419
ORA-06512: at "MTS_APPLY.TABLE_HANDLERS", line 32
ORA-06512: at line 1
Remember that primary key columns on destination are always evaluated.
You'll get your job very easy using this query:
select tc.owner, tc.table_name, tc.column_name,
a.COMPARE_OLD_ON_DELETE, a.COMPARE_OLD_ON_UPDATE, decode(k.column_name, null, 'N', 'Y') manual_key_column
from dba_tab_columns tc, DBA_APPLY_TABLE_COLUMNS a, dba_apply_key_columns k
where
a.OBJECT_OWNER (+)= tc.owner and a.OBJECT_NAME (+)= tc.TABLE_NAME and a.COLUMN_NAME (+)= tc.COLUMN_NAME
and k.OBJECT_OWNER (+)= tc.owner and k.OBJECT_NAME (+)= tc.TABLE_NAME and k.COLUMN_NAME (+)= tc.COLUMN_NAME
and owner='MTS_OWNER'
and table_name='AGENTS'
#set pk manually, use null on column_list to reset
execute DBMS_APPLY_ADM.SET_KEY_COLUMNS(object_name => 'MTS_OWNER.AGENTS', column_list => 'ID,NAME');
select * from dba_apply_key_columns
#set old values to be compared, default is true for all columns
execute DBMS_APPLY_ADM.COMPARE_OLD_VALUES(object_name => 'MTS_OWNER.AGENTS', column_list => '*', operation => '*', compare => false);
select * from DBA_APPLY_TABLE_COLUMNS order by 1, 2, 3
|
On the job - Project duration estimation
Functional - Analysis
Determine tables needed for replication
Measure the global volume(rows and size) of source tables to be streamed
Considerations on global volume for source tables for initial alignment(CTAS, RMAN, TTS, Datapump)
Determine apply rules(overview only)
Administration - Environment Setup
Verify Streams requiremements on databases and tablespace creation for STRMADMIN and functional user
Test network connection between nodes and detect lag
Creation of STRMADMIN and setup of empty Streams environment
Dummy Streams test to certify the correct environment
Creation of functional user with limited grants
Administration - Complete Streams setup for all tables in log mode(no physical apply on destination)
Queue creations on source and destination
Capture processes creations
Propagation processes creations
Functional apply procedures(DML and DDL) creation on functional user(log only at the moment)
Apply processes creation on destination
Test the Streams environment in log mode
Functional - Development
Data population and alignment of destination database
Development of apply procedures
DML handlers
DDL handlers
Functional documentation
Functional tests
Administration - Crash tests simulation
Short crash simulation - failed Streams applies may still be dequed
Reapply failed data
Estimate downtime for source and/or destination based on the number of failed apply to recover
Considerations and plan preparation for a production short crash(stop applications, courtesy web page, ...)
Complete crash simulation
Recreation of Streams administration schema
Data population and alignment of destination database(truncation of existing data)
Considerations and plan preparation for a production complete crash(stop applications, courtesy web page, ...)
Administration - Monitoring, Alarming and Operation
Get a consistent view of actual procedures of Monitoring, Alarming, Operation and Production Support Team
Creation of simplified monitoring views for Production Support Team for quick and detailed diagnostic
Integrate with the current alarming process based on unapplied Stream data threshold
Supply the Production Support Team a clear step-by-step operational documentation for monitoring, short crash and complete recovery
|