Friday 25 November 2016

ETL Process - Load Production data into BI schema

Currently we are in the process of implementation of open source BI product [SPAGO]. In which I was involed in extracting data from out production environment to BI environment. During this process I have developed following stored procedure which insert data from production schema to BI schema. This is totally dynamic stored procedure. 

Process flow
When transferring the data from JD Edwards production environment system is converted into a sequential data transfer to SpagoBI environment using oracle stored procedure [Name: SP_ETL_BI_DATA with two parameters as table name & parallel operation number]


Steps followed:

1.  Identified relevant base tables from production environment. Following table [F99BI001] is created  in which all required list of tables has been entered.
Column Name
Data Type
Description
TCOBNM
VARCHAR2(12 BYTE)
Object Name
TCDAFT
NUMBER
Number of Days from Today
TCENVSRC
VARCHAR2(10 BYTE)
Source Environment
TCGDCA
VARCHAR2(20 BYTE)
Date Column like UPMJ
TCCECB
VARCHAR2(10 BYTE)
Current Environment
TCRSTDN
VARCHAR2(20 BYTE)
Target Database Name
TCRSDN
VARCHAR2(20 BYTE)
SourceDatabase Name
TCIDXN
VARCHAR2(200 BYTE)
Index expression for merge
TCINSERT
CLOB
Insert Statement as per Merge Syntax
TCPOPR
NUMBER
Parallel Operations
TCUPMJ
DATE
Updated Date
TCWDUR
NUMBER
Time Taken in Seconds
TCPWPRCD
NUMBER
Rows
TCTDA017
VARCHAR2(200 BYTE)
Primary Key
TCRDA082
VARCHAR2(4000 BYTE)
All Column
TCUPDATE
CLOB
Update Statement as per MERGE Syntax

3.     Analyzing the table structure – At present all columns from source table has been taken.

4.    Oracle [impdp] utility has been used to create & load initial data.

5.  Following stored procedure [SP_ETL_COLUMN] created which is fetching required column, primary key for the table entered in F99BI001. Fetching is done using oracle data dictionary view.

CREATE OR REPLACE Procedure SP_ETL_COLUMN(sTableName IN VARCHAR2)
IS

sPrimaryONSQL VARCHAR2(4000);
sPrimaryColumn VARCHAR2(4000);

sUpdate1SQL VARCHAR2(4000);
sUpdate2SQL VARCHAR2(4000);

sInsert1SQL VARCHAR2(4000);
sInsert2SQL VARCHAR2(4000);

sValue1SQL VARCHAR2(4000);
sValue2SQL VARCHAR2(4000);

sTotalColumn VARCHAR2(4000);

bStatus VARCHAR2(1);


CURSOR ListOfTables
IS
SELECT TCOBNM FROM F99BI001 WHERE TCOBNM LIKE sTableName ORDER BY TCOBNM;

--------------------------------------------------------------------------------------------------------------------------------
-- Main Processing
--------------------------------------------------------------------------------------------------------------------------------
BEGIN
--------------------------------------------------------------------------------------------------------------------------------


FOR cTableList IN ListOfTables
LOOP

sInsert1SQL := 'INSERT /*+ APPEND */ ( ';
sInsert2SQL := '';
sValue1SQL := ' VALUES ( ';
sValue2SQL := '';

sTotalColumn := '';
sPrimaryONSQL   := '';
sPrimaryColumn := '';

sUpdate1SQL := 'UPDATE SET ';
sUpdate2SQL := '';

-- Following line will initialize columns in F99BI001
UPDATE F99BI001 SET TCIDXN = '', TCUPDATE = '', TCINSERT = '', TCTDA017='', TCRDA082 = '' WHERE TCOBNM = cTableList.TCOBNM;
COMMIT;


-- Following lines will fetch available column information from Oracle data dictionary
FOR cColumnList IN (SELECT COLUMN_NAME FROM all_tab_columns where table_name = cTableList.TCOBNM AND OWNER='target Schema' ORDER BY COLUMN_ID)
LOOP



-- Following lines will fetch primary key information from Oracle data dictionary
bStatus := 'N';
FOR cPrimaryCol IN 
(SELECT cols.table_name, cols.column_name, cols.position, cons.status, cons.owner
FROM all_constraints cons, all_cons_columns cols
WHERE cons.constraint_type = 'P'
AND cons.constraint_name = cols.constraint_name
AND cons.owner = cols.owner AND TRIM(COLS.TABLE_NAME)= TRIM(cTableList.TCOBNM)
AND TRIM(COLS.COLUMN_NAME)= trim(cColumnList.column_name) AND CONS.OWNER=''target schema'')
LOOP
sPrimaryONSQL := sPrimaryONSQL || 'B.' || TRIM(cColumnList.COLUMN_NAME) || '=' || 'E.' || TRIM(cColumnList.COLUMN_NAME) || ' AND ';
sPrimaryColumn := sPrimaryColumn || TRIM(cColumnList.COLUMN_NAME) || ', ';

bStatus := 'Y';
END LOOP;

IF bStatus = 'N'  THEN
IF LENGTH(sUpdate1SQL) < 3885 THEN
sUpdate1SQL := sUpdate1SQL || 'B.' || TRIM(cColumnList.COLUMN_NAME) || '=' || 'E.' || TRIM(cColumnList.COLUMN_NAME) || ', ';
ELSE
sUpdate2SQL := sUpdate2SQL || 'B.' || TRIM(cColumnList.COLUMN_NAME) || '=' || 'E.' || TRIM(cColumnList.COLUMN_NAME) || ', ';
END IF;

END IF;

IF LENGTH(sInsert1SQL) < 3885 THEN
sInsert1SQL := sInsert1SQL || 'B.' || TRIM(cColumnList.COLUMN_NAME) || ', ';
ELSE
sInsert2SQL := sInsert2SQL || 'B.' || TRIM(cColumnList.COLUMN_NAME) || ', ';
END IF;

IF LENGTH(sValue1SQL) < 3885 THEN
sValue1SQL := sValue1SQL || 'E.' || TRIM(cColumnList.COLUMN_NAME) || ', ';
ELSE
sValue2SQL := sValue2SQL || 'E.' || TRIM(cColumnList.COLUMN_NAME) || ', ';
END IF;

sTotalColumn := sTotalColumn || TRIM(cColumnList.COLUMN_NAME) || ', ';


END LOOP;

sPrimaryColumn := SUBSTR(sPrimaryColumn, 1, LENGTH(sPrimaryColumn) -2) ;
sPrimaryONSQL := SUBSTR(sPrimaryONSQL, 1, LENGTH(sPrimaryONSQL) -4) ;

IF LENGTH(sUpdate1SQL) < 3885 THEN
sUpdate1SQL := SUBSTR(sUpdate1SQL, 1, LENGTH(sUpdate1SQL) -2) ;
ELSE
sUpdate2SQL := SUBSTR(sUpdate2SQL, 1, LENGTH(sUpdate2SQL) -2) ;
END IF;


IF LENGTH(sInsert1SQL) < 3885 THEN
sInsert1SQL := SUBSTR(sInsert1SQL, 1, LENGTH(sInsert1SQL) -2) || ')';
ELSE
sInsert2SQL := SUBSTR(sInsert2SQL, 1, LENGTH(sInsert2SQL) -2) || ')';
END IF;

IF LENGTH(sValue1SQL) < 3885 THEN
sValue1SQL := SUBSTR(sValue1SQL, 1, LENGTH(sValue1SQL) -2) || ')';
ELSE
sValue2SQL := SUBSTR(sValue2SQL, 1, LENGTH(sValue2SQL) -2) || ')';
END IF;
sTotalColumn := SUBSTR(sTotalColumn, 1, LENGTH(sTotalColumn) -2);

UPDATE F99BI001 SET TCIDXN = sPrimaryONSQL, TCRDA082 = sTotalColumn, TCTDA017 = sPrimaryColumn, 
TCUPDATE = TO_CLOB(sUpdate1SQL) || TO_CLOB(sUpdate2SQL),
TCINSERT = TO_CLOB(sInsert1SQL) || TO_CLOB(sInsert2SQL) || ' ' || TO_CLOB(sValue1SQL) || TO_CLOB(sValue2SQL)
WHERE TCOBNM = cTableList.TCOBNM;
COMMIT;


END LOOP;


--------------------------------------------------------------------------------------------------------------------------------
-- Error Handling (If Any)
--------------------------------------------------------------------------------------------------------------------------------
-- EXCEPTION


END SP_ETL_COLUMN;
--*************************************************************************************************************
-- End PROCDURE
--*************************************************************************************************************
/

6.  Created stored procedure [SP_ETL_BI_DATA] which is used to transfer production data to SpagoBI environment. Method used to transfer data is ‘MERGE INTO’ statement. Only missing or change data is select from source [production] environment & then depends on the criteria data will either insert, delete or update will happen in target [SpagoBI] environment.

CREATE OR REPLACE Procedure SP_ETL_BI_DATA(sTableName IN VARCHAR2, nParallel IN INTEGER)
IS
sSQL Long;
sWhere Long;
iFromDate  NUMBER;
iThruDate NUMBER;
NumberOfRows  NUMBER;

time_before  BINARY_INTEGER;
time_after  BINARY_INTEGER;
time_taken  BINARY_INTEGER;
v_user VARCHAR2(20);

-- Cursor for list of tables from F99BI001
CURSOR ListOfTables 
IS
SELECT * FROM F99BI001
WHERE TCOBNM LIKE sTableName AND TCPOPR = nParallel;

CURSOR UserName IS SELECT USER FROM DUAL;

--------------------------------------------------------------------------------------------------------------------------------
-- Main Processing
--------------------------------------------------------------------------------------------------------------------------------
BEGIN
--------------------------------------------------------------------------------------------------------------------------------

-- Structure of F99BI001 which is base table. This is parameterised table by which data is copied from source to target environment
-- Following are the columns of table F99BI001

-- TCOBNM  - Object Name [Name of the table to be copied]
-- TCDAFT  - Number of Days from Today [Number of days to be copied from SYSDATE - Today]
-- TCENVSRC - Source Environment [Source Environment - It is either PRODDTA/PRODCTL]
-- TCGDCA - Date Column like UPMJ [Date field name for filter condition. Based on this date condition where clause will be built]
-- TCCECB - Current Environment [Environment when data will be copied. 
-- TCRSTDN - Target Database Name [Target Instance Name. ]
-- TCRSDN - SourceDatabase Name  [Source Instance Name. ]
-- TCIDXN - Index expression for merge [Where clause expression]
-- TCUPDATE - Update Statement as per Merge into Syntax 
-- TCINSERT - Insert Statement as per Merge Syntax
-- TCPOPR - Parallel Operations - Number of Parallel operation which is going to execute thru schedular
-- TCUPMJ - Updated Date - After completion of copy job date will be updated here
-- TCWDUR - Time Taken in Seconds - After completion of copy time taken will be updated here in seconds.
-- TCPWPRCD - Rows - After completion of copy # of rows processed will be updated here
-- TCTDA017 - Primary Key - This field is required to delete records which is present in target environment and deleted from source environment
-- TCRDA082 - Total Columns in the table. SELECT MINUS quert has been built using this column
-- TCHSIFT - Filter Condition for retriving data


-- Following lines will fetch logged in User Name
OPEN UserName ;
FETCH UserName INTO v_user;
CLOSE UserName;

FOR cTableList IN ListOfTables 
LOOP

-- Building of MERGE statement
sSQL := 'MERGE INTO ' || TRIM(cTableList.TCCECB) || '.' || cTableList.TCOBNM;

IF LENGTH(TRIM(cTableList.TCRSTDN)) > 0 THEN
sSQL := sSQL || '@' || TRIM(cTableList.TCRSTDN);
END IF;


sSQL := sSQL  || ' b USING (SELECT ' || TRIM(cTableList.TCRDA082) || ' FROM ' ||  TRIM(cTableList.TCENVSRC) || '.' || cTableList.TCOBNM ;

IF LENGTH(TRIM(cTableList.TCRSDN)) > 0 THEN
sSQL := sSQL || '@' || TRIM(cTableList.TCRSDN);
END IF;

IF cTableList.TCDAFT > 0 AND cTableList.TCGDCA IS NOT NULL THEN
iFromDate := FDATETOJULIAN(SYSDATE) - cTableList.TCDAFT;
iThruDate := FDATETOJULIAN(SYSDATE);
sSQL := sSQL || ' WHERE ' || cTableList.TCGDCA || ' BETWEEN ' || iFromDate || ' AND ' || iThruDate  ;
END IF;
IF cTableList.TCHSIFT IS NOT NULL THEN
IF cTableList.TCDAFT > 0 AND cTableList.TCGDCA IS NOT NULL THEN
sSQL := sSQL || ' AND ';
ELSE
sSQL := sSQL || ' WHERE ';
END IF;
sSQL := sSQL || cTableList.TCHSIFT;
END IF;



sSQL := sSQL  || ' MINUS SELECT ' || TRIM(cTableList.TCRDA082) || ' FROM ' ||  TRIM(cTableList.TCCECB) || '.' || cTableList.TCOBNM ;

IF LENGTH(TRIM(cTableList.TCRSTDN)) > 0 THEN
sSQL := sSQL || '@' || TRIM(cTableList.TCRSTDN);
END IF;


IF cTableList.TCDAFT > 0 AND cTableList.TCGDCA IS NOT NULL THEN
iFromDate := FDATETOJULIAN(SYSDATE) - cTableList.TCDAFT;
iThruDate := FDATETOJULIAN(SYSDATE);
sSQL := sSQL || ' WHERE ' || cTableList.TCGDCA || ' BETWEEN ' || iFromDate || ' AND ' || iThruDate  ;
END IF;

IF cTableList.TCHSIFT IS NOT NULL THEN
IF cTableList.TCDAFT > 0 AND cTableList.TCGDCA IS NOT NULL THEN
sSQL := sSQL || ' AND ';
ELSE
sSQL := sSQL || ' WHERE ';
END IF;
sSQL := sSQL || cTableList.TCHSIFT;
END IF;


sSQL := sSQL || ') e ON (' || TRIM(cTableList.TCIDXN) || ') WHEN MATCHED THEN ' || TRIM(cTableList.TCUPDATE);

sSQL := sSQL || ' WHEN NOT MATCHED THEN ' || TRIM(cTableList.TCINSERT);

-- MERGE STATMENT will be executed here
EXECUTE IMMEDIATE sSQL;
COMMIT;


-- Building of DELETE MINUS Statement
sSQL := 'DELETE FROM ' || TRIM(cTableList.TCCECB) || '.' || cTableList.TCOBNM;

IF LENGTH(TRIM(cTableList.TCRSTDN)) > 0 THEN
sSQL := sSQL || '@' || TRIM(cTableList.TCRSTDN);
END IF;

sSQL := sSQL || ' WHERE (' || TRIM(cTableList.TCTDA017) || ') IN ( SELECT ' || TRIM(cTableList.TCTDA017) || ' FROM ' ;
sSQL := sSQL || TRIM(cTableList.TCCECB) || '.' || cTableList.TCOBNM;

IF LENGTH(TRIM(cTableList.TCRSTDN)) > 0 THEN
sSQL := sSQL || '@' || TRIM(cTableList.TCRSTDN);
END IF;

sSQL := sSQL || ' MINUS SELECT ' ||  TRIM(cTableList.TCTDA017) || ' FROM ' ;

sSQL := sSQL || TRIM(cTableList.TCENVSRC) || '.' || cTableList.TCOBNM;

IF LENGTH(TRIM(cTableList.TCRSTDN)) > 0 THEN
sSQL := sSQL || '@' || TRIM(cTableList.TCRSDN);
END IF;
sSQL := sSQL || ')';

-- DELETE MINUS statement will execute here
EXECUTE IMMEDIATE sSQL;
COMMIT;

EXECUTE IMMEDIATE sSQL;
COMMIT;


END LOOP;

--------------------------------------------------------------------------------------------------------------------------------
-- Error Handling (If Any)
--------------------------------------------------------------------------------------------------------------------------------
EXCEPTION


END SP_ETL_BI_DATA;
--*************************************************************************************************************
-- End PROCDURE
--*************************************************************************************************************
/