Implementing an History database

(This published version is the previous version than the actual running in my production. Several changes were made but the core concepts remain the same)

See Oracle Streams if you plan to use Streams to populate the history
Any indication provided for the use of Streams on this page is only intended as overview
Important Oracle concepts to understand before proceeding with this solution
begin
  --A single LCR is generated
  update pippo set name = '45' where id = 1;
  update pippo set name = '54' where id = 1;
  commit;
end;

begin
  --A LCR is generated even if there is no data change!!
  --This is filtered out by the sameValue function
  update pippo set name = name where id = 1;
  commit;
end;

begin
  --Two LCR are generated!!
  --If the history is implemented at transaction level, 
  --then the last LCR is automatically picked up by the combined view
  update pippo set name = '45' where id = 1;
  update pippo set surname = 'AA' where id = 1;
  commit;
end;
 
Premises
In the entire example reported on this web page I'll work on two tables, a parent and a child,
but this is just to demonstrate some joins. This pattern is applicable for every primary key based table
If your primary key is different than a typical ID column, simply substitute it with the one or more columns acting as 
you primary key.

Why not use Oracle CDC(Change Data Capture) to implement an history database?
CDC is an excellent option supported by Oracle but be aware of this limitations at the core:
1 a) CDC uses Streams behind the curtain, Streams requires an expert and costly maintenance
  b) With my solution you may use Streams, triggers or any other way (nightly batch and so on) to populate the history(stage) database
2 a) Since CDC is Streams based it uses the SCN to track changes on source database, 
     portability(exp/imp, duplication, restore...) is difficult since the SCN is different on different databases,
     you must always take care of the Instantiation Number. Expert and costly maintenance is required
  b) My solution uses a sequence instead of the SCN, everybody knows what a sequence is...
3 a) To view historic data in CDC you have to define windows using supplied Stored Procedures
  b) With my solution you simply issue a query. in this page I provide a general example using a view for each table
     You may obviously discard my view example and create one or more customized view schema, one for every different logical view
4 a) What happens when you change the source table structures using CDC? 
     Do not be surprised to pioneer new Oracle bugs having CDC stopped or even the history lost, 
     any experienced DBA knows how Oracle bugs are always behind the corner
  b) With my solution you may manually alter the history structure and manually review the capture, propagtion and apply.
     Since everything is directly under your control nothing can be wrong, no Oracle bugs at all
... In my opinion there are several other reasons not to use CDC but now I'm bored with this and lets code...

Source table creation, normally already present
--don't drop in production...
drop table test_parent cascade constraints;

create table test_parent(
  id number primary key,
  some_parent_data varchar2(50)
)

--don't drop in production...
drop table test_child cascade constraints;

create table test_child (
  id number primary key,
  id_parent number not null references test_parent(id),
  some_child_data varchar2(50)
)

 

History user creation
--FILENAME                 : MTA_HDATA_registry.sql
--Author                   :
--Date                     :
--Application              :
--Run this script with user:
--Notes                    :

whenever oserror exit SQL.OSCODE rollback
--whenever sqlerror exit SQL.SQLCODE rollback
spool MTA_HDATA_registry.log
Set autocommit off timing on echo on feedback on

CREATE SEQUENCE seq_hreg START WITH 0 INCREMENT BY 1 MINVALUE 0;

CREATE OR REPLACE TYPE T_COL_NAMES AS VARRAY(100) OF varchar2(50);

grant execute on T_COL_NAMES to public;

--This table will help us looking up a sequence number based on time
--I also use this table to get all possible combinations for all tables for all snapshot times
create table hreg (
  id number primary key,
  d timestamp default systimestamp not null,
  INSTANCE_NUMBER  NUMBER,
  --required to view the history also at transaction level insted of Logical Change Record only
  TXID             VARCHAR2(100 char),
  operation        VARCHAR2(1 char),
  --all following columns are only for custom user operations, not required to implement a standard history
  --independent from any application logic
  owner            varchar2(100 char),
  table_name       varchar2(100 char),
  update_columns   t_col_names,
  --add as many pk columns as needed, this is the primary key inserted
  pk_1             ANYDATA,
  pk_2             ANYDATA
);

--example, how to query the registry viewing the primary keys in to known types
--no need to create a view for a standard history implementation
CREATE OR REPLACE FORCE VIEW mts_hdata.hreg_base AS 
  select a.*, 
    decode(pk1_type_name, 'SYS.NUMBER', ANYDATA.accessnumber (pk_1), null) pk1_number,
    decode(pk2_type_name, 'SYS.NUMBER', ANYDATA.accessNumber (pk_2), null) pk2_number,
    decode(pk1_type_name, 'SYS.DATE', ANYDATA.accessdate (pk_1), null) pk1_date,
    decode(pk2_type_name, 'SYS.DATE', ANYDATA.accessdate (pk_2), null) pk2_date,
    decode(pk1_type_name, 'SYS.VARCHAR2', ANYDATA.accessVarchar2 (pk_1), null) pk1_varchar2,
    decode(pk2_type_name, 'SYS.VARCHAR2', ANYDATA.accessVarchar2 (pk_2), null) pk2_varchar2
  from (
   SELECT hreg.*, ANYDATA.gettypename (pk_1) pk1_type_name,
          ANYDATA.gettypename (pk_2) pk2_type_name
     FROM hreg
  ) a;

create or replace force view hreg_tx as
select b.*, hreg_first.d begin_d, hreg_last.d end_d from (
  select a.*, 
    (select min(id) from hreg where instance_number=a.instance_number and txid=a.txid) first_seq,
    (select max(id) from hreg where instance_number=a.instance_number and txid=a.txid) last_seq
  from (
    select distinct instance_number, txid from hreg
  ) a
) b, hreg hreg_first, hreg hreg_last
where hreg_first.id=b.first_seq
and hreg_last.id=b.last_seq;

--An easy way to get next sequence and inserted into snapshots table
--remember to use :old values when in deletion
create or replace function getSeq(operation varchar2, mowner varchar2, mtable_name varchar2, upd_cols T_COL_NAMES, 
  pk1 anydata, pk2 anydata default null) return number is
--absolutely do not mark with autonomous transaction pragma, is wrong and not required
  n number;
  instanceNumber number;
  mtxid hreg.txid%type;
begin
  select seq_hreg.nextval into n from dual;
  select instance_number into instanceNumber from v$instance;
  insert into hreg(id, instance_number, txid, operation, owner, table_name, update_columns, pk_1, pk_2) 
    values(n, instanceNumber, DBMS_TRANSACTION.LOCAL_TRANSACTION_ID, operation, mowner, mtable_name, upd_cols, pk1, pk2);
  return n;
end;
/

spool off
exit

 

History table creation

--The primary key is the same of the source table plus the operation time
create table test_parent_H(
  id number,
  some_parent_data varchar2(50),
  seq number not null unique references hreg(id) on delete cascade --will be sequence poulated
)

create table test_child_H (
  id number,
  id_parent number,
  some_child_data varchar2(50),
  seq number not null unique references hreg(id) on delete cascade --will be sequence poulated
)

History population using triggers in source schema

--Just to populate an history table I use a trigger to keep this example simple
--May use Streams
CREATE OR REPLACE TRIGGER trg_parent
after DELETE OR INSERT OR UPDATE
ON TEST_PARENT 
REFERENCING NEW AS New OLD AS Old
FOR EACH ROW
declare
  operation varchar2(1);
BEGIN
   if(inserting) then 
     operation := 'I';
     insert into test_parent_h(id, some_parent_data, seq) values(:new.id, :new.some_parent_data, 
       getSeq(operation, 'ALDO', 'TEST_PARENT', ANYDATA.convertNumber(:new.id)));
   elsif updating then
     operation := 'U';
     insert into test_parent_h(id, some_parent_data, seq) values(:new.id, :new.some_parent_data, 
       getSeq(operation, 'ALDO', 'TEST_PARENT', ANYDATA.convertNumber(:new.id)));
   elsif deleting then
     operation := 'D';
     insert into test_parent_h(id, some_parent_data, seq) values(:old.id, :old.some_parent_data, 
       getSeq(operation, 'ALDO', 'TEST_PARENT', ANYDATA.convertNumber(:old.id)));
   end if;
END trg_parent;

--Just to populate an history table I use a trigger to keep this example simple
--May use Streams
CREATE OR REPLACE TRIGGER trg_child
after DELETE OR INSERT OR UPDATE
ON TEST_CHILD 
REFERENCING NEW AS New OLD AS Old
FOR EACH ROW
declare
  operation varchar2(1);
BEGIN
   if(inserting) then 
     operation := 'I';
     insert into test_child_h(id, id_parent, some_child_data, seq) values(:new.id, :new.id_parent, :new.some_child_data, 
       getSeq(operation, 'ALDO', 'TEST_CHILD', Anydata.convertNumber(:new.id)));
   elsif updating then
     operation := 'U';
     insert into test_child_h(id, id_parent, some_child_data, seq) values(:new.id, :new.id_parent, :new.some_child_data, 
       getSeq(operation, 'ALDO', 'TEST_CHILD', Anydata.convertNumber(:new.id)));
   elsif deleting then
     operation := 'D';
     insert into test_child_h(id, id_parent, some_child_data, seq) values(:old.id, :old.id_parent, :old.some_child_data, 
       getSeq(operation, 'ALDO', 'TEST_CHILD', Anydata.convertNumber(:old.id)));
   end if;
END trg_parent;

History population using Streams

Compare ANYDATA values

--This is the only way to compare ANYDATA values.
--Do not rely on DUMP or RAWTOHEX functions; especially RAWTOHEX returns different values(cicling) for same input!
CREATE OR REPLACE function sameValue(a anydata, b anydata) return number is
  result number;
  atype varchar2(50);
  btype varchar2(50);
begin
  if(a is null and (not b is null))or
    (b is null and (not a is null)) then
    result := 0;
  elsif(a is null and b is null) then
    result := 1;
  else
    atype := Anydata.gettypename(a);
    btype := Anydata.gettypename(b);
    if(atype != btype) then
      result := 0;
    elsif(atype = 'SYS.NUMBER') then
      if(Anydata.accessNumber(a) = Anydata.accessNumber(b)) then result := 1;
      else result := 0;
      end if;
    elsif(atype = 'SYS.DATE') then
      if(Anydata.accessDate(a) = Anydata.accessDate(b)) then result :=1;
      else result := 0;
      end if;
    elsif(atype = 'SYS.VARCHAR2') then
      if(Anydata.accessVarchar2(a) = Anydata.accessVarchar2(b)) then result := 1;
      else result := 0;
      end if;
    elsif(atype = 'SYS.TIMESTAMP') then
      if(Anydata.accessTimestamp(a) = Anydata.accessTimestamp(b)) then result := 1;
      else result := 0;
      end if;
    else raise_application_error(-20001, 'Data type ' || atype || ' not implemented');
    end if;
  end if;
  return result;
end;

 

--Streams, as you know, is a complex feature, obviously you'll need to customize this code to fit your env.
ALTER TABLE test_parent ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

ALTER TABLE test_child ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
   
BEGIN
  DBMS_STREAMS_ADM.SET_UP_QUEUE(
    queue_table  => 'strmadmin.q_sourcetable',
    queue_name   => 'strmadmin.q_sourcetable',
    queue_user => 'STRMADMIN');
END;

begin
  --Must run at least once on SOURCE db
  DBMS_CAPTURE_ADM.BUILD;
end;
/

BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_RULES(
    table_name     => 'aldo.test_parent',
    streams_type   => 'capture',
    streams_name   => 'capture_test',
    queue_name     => 'strmadmin.q_sourcetable',
    include_dml    =>  true,
    include_ddl    =>  false,
    inclusion_rule =>  true,
    --global_name
    source_database => null);
END;

BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_RULES(
    table_name     => 'aldo.test_child',
    streams_type   => 'capture',
    streams_name   => 'capture_test',
    queue_name     => 'strmadmin.q_sourcetable',
    include_dml    =>  true,
    include_ddl    =>  false,
    inclusion_rule =>  true,
    --global_name
    source_database => null);
END;

begin
  dbms_capture_adm.start_capture(capture_name => 'capture_test');
end;

create or replace procedure prepareInstantiation(source_table_name varchar2) is
  source_scn  NUMBER;
  source_db_name varchar2(100);
BEGIN
  select global_name into source_db_name from global_name;
  source_scn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
  --use the dblink if you are propagating to a remote queue
  DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
    source_object_name      => source_table_name,
    source_database_name    => source_db_name,
    instantiation_scn       => source_scn);
  dbms_output.put_line(source_scn);
  --on source database
  dbms_capture_adm.prepare_table_instantiation(source_table_name);
END;

begin
  prepareInstantiation('ALDO.TEST_PARENT');
  prepareInstantiation('ALDO.TEST_CHILD');
end;

--here we create the apply process on destination
create or replace procedure prepareApply(tableName varchar2, sourceDb varchar2) is
  applyName varchar2(50) := 'APPLY_TEST';
BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_RULES(
    table_name     => tableName,
    streams_type   => 'apply',
    streams_name   => applyName ,
    queue_name     => 'strmadmin.Q_SOURCETABLE',
    include_dml    =>  true,
    include_ddl    =>  true,
    inclusion_rule =>  true,
    source_database => null);
END;

begin
  prepareApply('ALDO.TEST_PARENT', null);
  prepareApply('ALDO.TEST_CHILD', null);
end;

begin 
  --make your reflections on this value
  DBMS_APPLY_ADM.SET_PARAMETER(apply_name => 'APPLY_TEST',parameter  => 'disable_on_error', value => 'N');
end;

BEGIN
  DBMS_APPLY_ADM.START_APPLY(apply_name => 'apply_test');
END;

CREATE OR REPLACE procedure STRMADMIN.prepareDmlHandler(mOwner varchar2, mTableName varchar2, ownProcName varchar2) is
  applyName varchar2(50) := 'APPLY_TEST';
begin
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name         => mOwner||'.'||mTableName,
    object_type         => 'TABLE',
    operation_name      => 'INSERT',
    error_handler       => false,
    user_procedure      => ownProcName,
    apply_database_link => NULL,
    apply_name          => applyName);
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name         => mOwner||'.'||mTableName,
    object_type         => 'TABLE',
    operation_name      => 'UPDATE',
    error_handler       => false,
    user_procedure      => ownProcName,
    apply_database_link => NULL,
    apply_name          => applyName);
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name         => mOwner||'.'||mTableName,
    object_type         => 'TABLE',
    operation_name      => 'DELETE',
    error_handler       => false,
    user_procedure      => ownProcName,
    apply_database_link => NULL,
    apply_name          => applyName);
END;

grant all on table_handlers to strmadmin;

begin
  prepareDmlHandler('ALDO', 'TEST_PARENT', 'ALDO.TABLE_HANDLERS.TEST_PARENT');
  prepareDmlHandler('ALDO', 'TEST_CHILD', 'ALDO.TABLE_HANDLERS.TEST_CHILD');
end;

 

DML handler package head
CREATE OR REPLACE package ALDO.TABLE_HANDLERS is
  PROCEDURE TEST_PARENT(in_any IN ANYDATA);
  PROCEDURE TEST_CHILD(in_any IN ANYDATA);
end TABLE_HANDLERS;

 

DML handler package body
    
CREATE OR REPLACE package body TABLE_HANDLERS is

procedure globalHandler(in_any IN ANYDATA, pk_1_name varchar2, pk_2_name varchar2 default null) is
  lcr          SYS.LCR$_ROW_RECORD;
  old_values   SYS.LCR$_ROW_LIST;
  new_values   SYS.LCR$_ROW_LIST;
  rc           PLS_INTEGER;  
  command      VARCHAR2(30);
  aVERIFY_DATE ANYDATA;
  operation varchar2(1);
  currTimestamp timestamp;
  nextSequence number;
  pk1 anydata;
  pk2 anydata := null;
  upd_cols T_COL_NAMES;
  same boolean := false;
  currOldValue ANYDATA;
BEGIN
  rc := in_any.GETOBJECT(lcr);
  command:=lcr.GET_COMMAND_TYPE();
  lcr.SET_COMMAND_TYPE('INSERT');
  --lcr.set_object_owner('MTS_HDATA');
  --Sure you'll need to change the destination table in your production
  lcr.SET_OBJECT_NAME(lcr.get_object_name || '_H');
  new_values := lcr.get_values('new');
  currTimestamp := systimestamp;
  IF command IN ('UPDATE') THEN
    operation := 'U';
    same := true;
    FOR i IN 1.. new_values.COUNT LOOP
      --upd cols
      upd_cols.extend;
      upd_cols(upd_cols.last) := new_values(i).column_name;
      currOldValue := lcr.get_value('old', new_values(i).column_name);
      if(same) then
        same := sameValue(new_values(i).data, currOldValue);
      end if;
      lcr.set_value('old', new_values(i).column_name, new_values(i).data);
    END LOOP;
    lcr.set_values('new', lcr.GET_VALUES('old'));
    lcr.set_values('old', null);
  elsif command in ('INSERT') then
    operation := 'I';
  elsif command in ('DELETE') then
    operation := 'D';
    old_values := lcr.GET_VALUES('old');
    lcr.SET_VALUES('new', old_values);
    lcr.SET_VALUES('old', NULL);
  END IF; 
  begin -- PK, I always take the new since the deletion has been converted to an insert 
    pk1 := lcr.get_value('new', pk_1_name);
    if(not pk2 is null) then pk2 := lcr.get_value('new', pk_2_name); end if;
  end;
  if(not same) then
    nextSequence := getSeq(operation, lcr.get_object_owner, lcr.get_object_name, upd_cols, pk1, pk2);
    lcr.add_column('new', 'SEQ',  ANYDATA.ConvertNumber(nextSequence));
    lcr.EXECUTE(true);
  end if;
end;

PROCEDURE TEST_PARENT(in_any IN ANYDATA) IS BEGIN globalHandler(in_any, 'ID'); end;
PROCEDURE TEST_CHILD(in_any IN ANYDATA) IS BEGIN globalHandler(in_any, 'ID'); end;
end TABLE_HANDLERS;

Testing the example, dml data and basic history view

Example cleanup
begin
  delete from test_child;
  delete from test_parent;
  delete from test_child_h;
  delete from test_parent_h;
  delete from hreg;
  commit;
end;

DML 1
begin
  insert into test_parent(id, some_parent_data) values (1, 'pippo');
  insert into test_child(id, id_parent, some_child_data) values (1, 1, 'pippo_child');
  insert into test_parent(id, some_parent_data) values (2, 'pippo2');
  insert into test_child(id, id_parent, some_child_data) values (2, 2, 'pippo_child2');
  insert into test_child(id, id_parent, some_child_data) values (3, 1, 'pippo_child2');
  insert into test_child(id, id_parent, some_child_data) values (4, 2, 'pippo_child2');
  insert into test_child(id, id_parent, some_child_data) values (5, 2, 'pippo_child2');
  insert into test_child(id, id_parent, some_child_data) values (6, 1, 'pippo_child2');
  commit;
end;

Lets view the data
--save this date for later check
select to_char(sysdate, 'dd/mm/yyyy hh24:mi:ss') from dual

12/09/2008 15:44:13

select * from test_parent order by id

id	some_parent_data
1	pippo
2	pippo2

select * from test_child order by id

id  id_parent   some_child_data
1   1	        pippo_child
2   2	        pippo_child2
3   1	        pippo_child2
4   2	        pippo_child2
5   2	        pippo_child2
6   1	        pippo_child2

 

DML 2
begin
  update test_parent set some_parent_data = 'ioooo' where id = 1;
  update test_child set some_child_data = 'weriwiee' where id between 3 and 5;
  commit;
end;

Lets view the data
--save this date for later check
select to_char(sysdate, 'dd/mm/yyyy hh24:mi:ss') from dual

12/09/2008 15:47:28

select * from test_parent order by id

id	some_parent_data
1	ioooo
2	pippo2

select * from test_child order by id

id  id_parent	some_child_data
1	  1	        pippo_child
2	  2	        pippo_child2
3	  1	        weriwiee
4	  2	        weriwiee
5	  2	        weriwiee
6	  1	        pippo_child2

 

DML 3
begin
  delete from test_child where id = 2;
  commit;
end;

Lets view the data
--save this date for later check
select to_char(sysdate, 'dd/mm/yyyy hh24:mi:ss') from dual

12/09/2008 15:50:28

select * from test_parent order by id

id	some_parent_data
1	ioooo
2	pippo2

select * from test_child order by id

id  id_parent	some_child_data
1	  1	        pippo_child
3	  1	        weriwiee
4	  2	        weriwiee
5	  2	        weriwiee
6	  1	        pippo_child2

Query the history at Logical Change Record level

Historical views
#The id column used here must be replaced with you primary key of the source table

create or replace view test_parent_h_global as  
select t.*, hreg.operation, a.global_operation_seq from test_parent_h t, hreg, (
  select id, last_id_seq, global_operation_seq from (
    /* A virtual table is built up to the global sequence time */
    select a.*, hreg.id global_operation_seq
    ,(select max(seq) from test_parent_h 
      where /* source PK here, may use also more columns for PK */test_parent_h.id=a.id and seq<=hreg.id) last_id_seq
    from test_parent_h a, hreg
    where a.seq <= hreg.id
  ) group by id, last_id_seq, global_operation_seq
) a
where t.id = a.id and t.seq = a.last_id_seq
and hreg.id=t.seq
--and not operation = 'D'
and operation in ('I', 'U',
  --I want to see deletions only if it is the last operation 
  decode(seq, global_operation_seq, hreg.operation, ''))
  
--do the same for the child

 

Querying historical data

--determine the sequence number, for example using
select id from hreg where d <= to_date('18/09/2008 00:18:37', 'dd/mm/yyyy hh24:mi:ss')
order by d desc

select c.*
from test_child_h_global c
where c.global_operation_seq = (select max(seq) from hreg where seq <= 7)
order by id

id  id_parent   some_child_data
1   1	        pippo_child
2   2	        pippo_child2
3   1	        pippo_child2
4   2	        pippo_child2
5   2	        pippo_child2
6   1	        pippo_child2

 

Avoid specifying timestamp in queries
    
CREATE OR REPLACE package history as
  seq number := null;
  
  function getSeqNotNull return number;
end history;
/

CREATE OR REPLACE package body history as

  function getSeqNotNull return number is
  begin
    if(seq is null) then
      raise_application_error(-20001, 'Please set the working snapshot for this session using:'||chr(13)||
        'execute history.seq := ');
    else return seq;
    end if;
  end;
end history;
/

create view test_parent as 
select c.*
from aldo.test_parent_h_global c
where c.global_operation_seq = history.getSeqNotNull()

execute history.seq := 12

select * from test_child

Query the history at transaction level

--Simply create a view for thr table containing more infos
create or replace view test_parent_hv as
select a.*, hreg.instance_number, hreg.txid, hreg.d, hreg.operation,
hreg_tx.FIRST_SEQ, hreg_tx.LAST_SEQ, hreg_tx.BEGIN_D, hreg_tx.END_D 
from test_parent_h a, hreg, hreg_tx 
where hreg.id = a.seq
and hreg_tx.instance_number = hreg.instance_number 
and hreg_tx.txid = hreg.txid

create or replace view test_parent_hva as
select t.*, a.global_instance_number, a.global_txid, a.global_end_d,
--if the last operation was done in the transaction than the operation type is shown else S='same as previous'
decode(t.instance_number, a.global_instance_number, decode(t.txid, a.global_txid, t.operation, 'S'), 'S') operation_in_tx
from test_parent_hv t, (
  select /* source PK here, may use also more columns for PK */ id, last_id_seq, global_instance_number, global_txid, global_end_d from (
    /* A virtual table is built up to the global sequence time */
    select a.*, hreg_tx.instance_number global_instance_number, hreg_tx.txid global_txid, hreg_tx.end_d global_end_d
    --this extra column is the last operation for a record in a transaction, 
    --from now this is the sequence that will always be considered
    ,(select max(seq) from test_parent_h 
      where /* source PK here, may use also more columns for PK */test_parent_h.id=a.id and seq<=hreg_tx.last_seq) last_id_seq
    from test_parent_hv a, hreg_tx
    where a.seq <= hreg_tx.last_seq
  ) group by /* source PK here, may use also more columns for PK */ id, last_id_seq, global_instance_number, global_txid, global_end_d
) a
/* source PK here, may use also more columns for PK */
where t.id = a.id and t.seq = a.last_id_seq
--and not operation = 'D'
and t.operation in ('I', 'U',
  --I want to see deletions only if it is the last operation 
  decode(t.instance_number, global_instance_number, decode(t.txid, global_txid, t.operation, ''), ''))
  
--select the last/actual situation for a single table
select * from test_parent_hva, hreg_tx
where hreg_tx.end_d = (select max(end_d) from hreg_tx) 
and global_instance_number=hreg_tx.instance_number
and global_txid=hreg_tx.txid

--history join example
select child.*, parent.some_parent_data 
from test_child_hva child, test_parent_hva parent
where parent.instance_number = child.instance_number 
and parent.global_txid = child.global_txid
and parent.id= child.id_parent

Tuning

--I recommend this index for tuning if you often use this column to get the snapshot number
--based on time
create index tHREG on hreg(d);

Using the history at transaction level
--If you are going to use the history at transaction level then is manadatory the creation of this two indexes
CREATE unique INDEX MTS_HDATA.ttx ON HREG(ID, INSTANCE_NUMBER, TXID);
CREATE INDEX MTS_HDATA.ttx2 ON HREG(INSTANCE_NUMBER, TXID);

--Run this procedure from the source database, the indexes must be created on the history tables
--Implementation at job required also to include the 'R', make your consideration about this
create or replace procedure printIndex(includeSeq boolean) is
  cursor c1 is
    select * from user_constraints where constraint_type in ('P', 'U', 'R')
    and table_name in (...);
  cursor c2(mconstraint varchar2) is
    select * from user_cons_columns where constraint_name = mconstraint; 
  s varchar2(4000);
  i number := 0;
  history_owner varchar2(50) := 'MTS_HDATA';
  cols varchar2(4000);
begin
  for rc1 in c1 loop
    s := 'create ';
    if(includeSeq) then s := s || 'unique '; end if;
    s := s || 'index ' || history_owner || '.';
    if(includeSeq) then s := s || 'SEQ'; else s := s || rc1.constraint_type; end if;
    s := s || i || ' on ' || history_owner || '.' || rc1.table_name || '(';
    cols := '';
    for rc2 in c2(rc1.constraint_name) loop
      if(length(cols)>0) then cols := cols || ', '; end if;
      cols := cols || rc2.column_name;
    end loop;
    if(includeSeq) then cols := cols || ', SEQ'; end if;
    dbms_output.put_line(s || cols || ');');
    i := i+1;
  end loop;
end;

begin
  printIndex(true);
end;

begin
  printIndex(false);
end;