Currently we are in the process of implementation of open source BI product [SPAGO]. In which I was involed in extracting data from out production environment to BI environment. During this process I have developed following stored procedure which insert data from production schema to BI schema. This is totally dynamic stored procedure.
Process flow
When transferring the data from
JD Edwards production environment system is converted into a sequential data
transfer to SpagoBI environment using oracle stored procedure [Name:
SP_ETL_BI_DATA with two parameters as table name & parallel operation
number]
Steps followed:
1. Identified relevant base tables from production
environment. Following table [F99BI001] is created in which all required list of
tables has been entered.
Column Name
|
Data Type
|
Description
|
TCOBNM
|
VARCHAR2(12 BYTE)
|
Object Name
|
TCDAFT
|
NUMBER
|
Number of Days from
Today
|
TCENVSRC
|
VARCHAR2(10 BYTE)
|
Source Environment
|
TCGDCA
|
VARCHAR2(20 BYTE)
|
Date Column like UPMJ
|
TCCECB
|
VARCHAR2(10 BYTE)
|
Current Environment
|
TCRSTDN
|
VARCHAR2(20 BYTE)
|
Target Database Name
|
TCRSDN
|
VARCHAR2(20 BYTE)
|
SourceDatabase Name
|
TCIDXN
|
VARCHAR2(200 BYTE)
|
Index expression for
merge
|
TCINSERT
|
CLOB
|
Insert Statement as per
Merge Syntax
|
TCPOPR
|
NUMBER
|
Parallel Operations
|
TCUPMJ
|
DATE
|
Updated Date
|
TCWDUR
|
NUMBER
|
Time Taken in Seconds
|
TCPWPRCD
|
NUMBER
|
Rows
|
TCTDA017
|
VARCHAR2(200 BYTE)
|
Primary Key
|
TCRDA082
|
VARCHAR2(4000 BYTE)
|
All Column
|
TCUPDATE
|
CLOB
|
Update Statement as per
MERGE Syntax
|
3. Analyzing the table structure – At present all
columns from source table has been taken.
4. Oracle [impdp] utility has been used to
create & load initial data.
5. Following stored procedure [SP_ETL_COLUMN] created which is fetching required column, primary key for the table entered in F99BI001.
Fetching is done using oracle data dictionary view.
CREATE OR REPLACE Procedure SP_ETL_COLUMN(sTableName IN VARCHAR2)
IS
sPrimaryONSQL VARCHAR2(4000);
sPrimaryColumn VARCHAR2(4000);
sUpdate1SQL VARCHAR2(4000);
sUpdate2SQL VARCHAR2(4000);
sInsert1SQL VARCHAR2(4000);
sInsert2SQL VARCHAR2(4000);
sValue1SQL VARCHAR2(4000);
sValue2SQL VARCHAR2(4000);
sTotalColumn VARCHAR2(4000);
bStatus VARCHAR2(1);
CURSOR ListOfTables
IS
SELECT TCOBNM FROM F99BI001 WHERE TCOBNM LIKE sTableName ORDER BY TCOBNM;
--------------------------------------------------------------------------------------------------------------------------------
-- Main Processing
--------------------------------------------------------------------------------------------------------------------------------
BEGIN
--------------------------------------------------------------------------------------------------------------------------------
FOR cTableList IN ListOfTables
LOOP
sInsert1SQL := 'INSERT /*+ APPEND */ ( ';
sInsert2SQL := '';
sValue1SQL := ' VALUES ( ';
sValue2SQL := '';
sTotalColumn := '';
sPrimaryONSQL := '';
sPrimaryColumn := '';
sUpdate1SQL := 'UPDATE SET ';
sUpdate2SQL := '';
-- Following line will initialize columns in F99BI001
UPDATE F99BI001 SET TCIDXN = '', TCUPDATE = '', TCINSERT = '', TCTDA017='', TCRDA082 = '' WHERE TCOBNM = cTableList.TCOBNM;
COMMIT;
-- Following lines will fetch available column information from Oracle data dictionary
FOR cColumnList IN (SELECT COLUMN_NAME FROM all_tab_columns where table_name = cTableList.TCOBNM AND OWNER='target Schema' ORDER BY COLUMN_ID)
LOOP
-- Following lines will fetch primary key information from Oracle data dictionary
bStatus := 'N';
FOR cPrimaryCol IN
(SELECT cols.table_name, cols.column_name, cols.position, cons.status, cons.owner
FROM all_constraints cons, all_cons_columns cols
WHERE cons.constraint_type = 'P'
AND cons.constraint_name = cols.constraint_name
AND cons.owner = cols.owner AND TRIM(COLS.TABLE_NAME)= TRIM(cTableList.TCOBNM)
AND TRIM(COLS.COLUMN_NAME)= trim(cColumnList.column_name) AND CONS.OWNER=''target schema'')
LOOP
sPrimaryONSQL := sPrimaryONSQL || 'B.' || TRIM(cColumnList.COLUMN_NAME) || '=' || 'E.' || TRIM(cColumnList.COLUMN_NAME) || ' AND ';
sPrimaryColumn := sPrimaryColumn || TRIM(cColumnList.COLUMN_NAME) || ', ';
bStatus := 'Y';
END LOOP;
IF bStatus = 'N' THEN
IF LENGTH(sUpdate1SQL) < 3885 THEN
sUpdate1SQL := sUpdate1SQL || 'B.' || TRIM(cColumnList.COLUMN_NAME) || '=' || 'E.' || TRIM(cColumnList.COLUMN_NAME) || ', ';
ELSE
sUpdate2SQL := sUpdate2SQL || 'B.' || TRIM(cColumnList.COLUMN_NAME) || '=' || 'E.' || TRIM(cColumnList.COLUMN_NAME) || ', ';
END IF;
END IF;
IF LENGTH(sInsert1SQL) < 3885 THEN
sInsert1SQL := sInsert1SQL || 'B.' || TRIM(cColumnList.COLUMN_NAME) || ', ';
ELSE
sInsert2SQL := sInsert2SQL || 'B.' || TRIM(cColumnList.COLUMN_NAME) || ', ';
END IF;
IF LENGTH(sValue1SQL) < 3885 THEN
sValue1SQL := sValue1SQL || 'E.' || TRIM(cColumnList.COLUMN_NAME) || ', ';
ELSE
sValue2SQL := sValue2SQL || 'E.' || TRIM(cColumnList.COLUMN_NAME) || ', ';
END IF;
sTotalColumn := sTotalColumn || TRIM(cColumnList.COLUMN_NAME) || ', ';
END LOOP;
sPrimaryColumn := SUBSTR(sPrimaryColumn, 1, LENGTH(sPrimaryColumn) -2) ;
sPrimaryONSQL := SUBSTR(sPrimaryONSQL, 1, LENGTH(sPrimaryONSQL) -4) ;
IF LENGTH(sUpdate1SQL) < 3885 THEN
sUpdate1SQL := SUBSTR(sUpdate1SQL, 1, LENGTH(sUpdate1SQL) -2) ;
ELSE
sUpdate2SQL := SUBSTR(sUpdate2SQL, 1, LENGTH(sUpdate2SQL) -2) ;
END IF;
IF LENGTH(sInsert1SQL) < 3885 THEN
sInsert1SQL := SUBSTR(sInsert1SQL, 1, LENGTH(sInsert1SQL) -2) || ')';
ELSE
sInsert2SQL := SUBSTR(sInsert2SQL, 1, LENGTH(sInsert2SQL) -2) || ')';
END IF;
IF LENGTH(sValue1SQL) < 3885 THEN
sValue1SQL := SUBSTR(sValue1SQL, 1, LENGTH(sValue1SQL) -2) || ')';
ELSE
sValue2SQL := SUBSTR(sValue2SQL, 1, LENGTH(sValue2SQL) -2) || ')';
END IF;
sTotalColumn := SUBSTR(sTotalColumn, 1, LENGTH(sTotalColumn) -2);
UPDATE F99BI001 SET TCIDXN = sPrimaryONSQL, TCRDA082 = sTotalColumn, TCTDA017 = sPrimaryColumn,
TCUPDATE = TO_CLOB(sUpdate1SQL) || TO_CLOB(sUpdate2SQL),
TCINSERT = TO_CLOB(sInsert1SQL) || TO_CLOB(sInsert2SQL) || ' ' || TO_CLOB(sValue1SQL) || TO_CLOB(sValue2SQL)
WHERE TCOBNM = cTableList.TCOBNM;
COMMIT;
END LOOP;
--------------------------------------------------------------------------------------------------------------------------------
-- Error Handling (If Any)
--------------------------------------------------------------------------------------------------------------------------------
-- EXCEPTION
END SP_ETL_COLUMN;
--*************************************************************************************************************
-- End PROCDURE
--*************************************************************************************************************
/
6. Created stored procedure [SP_ETL_BI_DATA]
which is used to transfer production data to SpagoBI environment. Method used
to transfer data is ‘MERGE INTO’ statement. Only missing or change data is
select from source [production] environment & then depends on the criteria
data will either insert, delete or update will happen in target [SpagoBI]
environment.
CREATE OR REPLACE Procedure SP_ETL_BI_DATA(sTableName IN VARCHAR2, nParallel IN INTEGER)
IS
sSQL Long;
sWhere Long;
iFromDate NUMBER;
iThruDate NUMBER;
NumberOfRows NUMBER;
time_before BINARY_INTEGER;
time_after BINARY_INTEGER;
time_taken BINARY_INTEGER;
v_user VARCHAR2(20);
-- Cursor for list of tables from F99BI001
CURSOR ListOfTables
IS
SELECT * FROM F99BI001
WHERE TCOBNM LIKE sTableName AND TCPOPR = nParallel;
CURSOR UserName IS SELECT USER FROM DUAL;
--------------------------------------------------------------------------------------------------------------------------------
-- Main Processing
--------------------------------------------------------------------------------------------------------------------------------
BEGIN
--------------------------------------------------------------------------------------------------------------------------------
-- Structure of F99BI001 which is base table. This is parameterised table by which data is copied from source to target environment
-- Following are the columns of table F99BI001
-- TCOBNM - Object Name [Name of the table to be copied]
-- TCDAFT - Number of Days from Today [Number of days to be copied from SYSDATE - Today]
-- TCENVSRC - Source Environment [Source Environment - It is either PRODDTA/PRODCTL]
-- TCGDCA - Date Column like UPMJ [Date field name for filter condition. Based on this date condition where clause will be built]
-- TCCECB - Current Environment [Environment when data will be copied.
-- TCRSTDN - Target Database Name [Target Instance Name. ]
-- TCRSDN - SourceDatabase Name [Source Instance Name. ]
-- TCIDXN - Index expression for merge [Where clause expression]
-- TCUPDATE - Update Statement as per Merge into Syntax
-- TCINSERT - Insert Statement as per Merge Syntax
-- TCPOPR - Parallel Operations - Number of Parallel operation which is going to execute thru schedular
-- TCUPMJ - Updated Date - After completion of copy job date will be updated here
-- TCWDUR - Time Taken in Seconds - After completion of copy time taken will be updated here in seconds.
-- TCPWPRCD - Rows - After completion of copy # of rows processed will be updated here
-- TCTDA017 - Primary Key - This field is required to delete records which is present in target environment and deleted from source environment
-- TCRDA082 - Total Columns in the table. SELECT MINUS quert has been built using this column
-- TCHSIFT - Filter Condition for retriving data
-- Following lines will fetch logged in User Name
OPEN UserName ;
FETCH UserName INTO v_user;
CLOSE UserName;
FOR cTableList IN ListOfTables
LOOP
-- Building of MERGE statement
sSQL := 'MERGE INTO ' || TRIM(cTableList.TCCECB) || '.' || cTableList.TCOBNM;
IF LENGTH(TRIM(cTableList.TCRSTDN)) > 0 THEN
sSQL := sSQL || '@' || TRIM(cTableList.TCRSTDN);
END IF;
sSQL := sSQL || ' b USING (SELECT ' || TRIM(cTableList.TCRDA082) || ' FROM ' || TRIM(cTableList.TCENVSRC) || '.' || cTableList.TCOBNM ;
IF LENGTH(TRIM(cTableList.TCRSDN)) > 0 THEN
sSQL := sSQL || '@' || TRIM(cTableList.TCRSDN);
END IF;
IF cTableList.TCDAFT > 0 AND cTableList.TCGDCA IS NOT NULL THEN
iFromDate := FDATETOJULIAN(SYSDATE) - cTableList.TCDAFT;
iThruDate := FDATETOJULIAN(SYSDATE);
sSQL := sSQL || ' WHERE ' || cTableList.TCGDCA || ' BETWEEN ' || iFromDate || ' AND ' || iThruDate ;
END IF;
IF cTableList.TCHSIFT IS NOT NULL THEN
IF cTableList.TCDAFT > 0 AND cTableList.TCGDCA IS NOT NULL THEN
sSQL := sSQL || ' AND ';
ELSE
sSQL := sSQL || ' WHERE ';
END IF;
sSQL := sSQL || cTableList.TCHSIFT;
END IF;
sSQL := sSQL || ' MINUS SELECT ' || TRIM(cTableList.TCRDA082) || ' FROM ' || TRIM(cTableList.TCCECB) || '.' || cTableList.TCOBNM ;
IF LENGTH(TRIM(cTableList.TCRSTDN)) > 0 THEN
sSQL := sSQL || '@' || TRIM(cTableList.TCRSTDN);
END IF;
IF cTableList.TCDAFT > 0 AND cTableList.TCGDCA IS NOT NULL THEN
iFromDate := FDATETOJULIAN(SYSDATE) - cTableList.TCDAFT;
iThruDate := FDATETOJULIAN(SYSDATE);
sSQL := sSQL || ' WHERE ' || cTableList.TCGDCA || ' BETWEEN ' || iFromDate || ' AND ' || iThruDate ;
END IF;
IF cTableList.TCHSIFT IS NOT NULL THEN
IF cTableList.TCDAFT > 0 AND cTableList.TCGDCA IS NOT NULL THEN
sSQL := sSQL || ' AND ';
ELSE
sSQL := sSQL || ' WHERE ';
END IF;
sSQL := sSQL || cTableList.TCHSIFT;
END IF;
sSQL := sSQL || ') e ON (' || TRIM(cTableList.TCIDXN) || ') WHEN MATCHED THEN ' || TRIM(cTableList.TCUPDATE);
sSQL := sSQL || ' WHEN NOT MATCHED THEN ' || TRIM(cTableList.TCINSERT);
-- MERGE STATMENT will be executed here
EXECUTE IMMEDIATE sSQL;
COMMIT;
-- Building of DELETE MINUS Statement
sSQL := 'DELETE FROM ' || TRIM(cTableList.TCCECB) || '.' || cTableList.TCOBNM;
IF LENGTH(TRIM(cTableList.TCRSTDN)) > 0 THEN
sSQL := sSQL || '@' || TRIM(cTableList.TCRSTDN);
END IF;
sSQL := sSQL || ' WHERE (' || TRIM(cTableList.TCTDA017) || ') IN ( SELECT ' || TRIM(cTableList.TCTDA017) || ' FROM ' ;
sSQL := sSQL || TRIM(cTableList.TCCECB) || '.' || cTableList.TCOBNM;
IF LENGTH(TRIM(cTableList.TCRSTDN)) > 0 THEN
sSQL := sSQL || '@' || TRIM(cTableList.TCRSTDN);
END IF;
sSQL := sSQL || ' MINUS SELECT ' || TRIM(cTableList.TCTDA017) || ' FROM ' ;
sSQL := sSQL || TRIM(cTableList.TCENVSRC) || '.' || cTableList.TCOBNM;
IF LENGTH(TRIM(cTableList.TCRSTDN)) > 0 THEN
sSQL := sSQL || '@' || TRIM(cTableList.TCRSDN);
END IF;
sSQL := sSQL || ')';
-- DELETE MINUS statement will execute here
EXECUTE IMMEDIATE sSQL;
COMMIT;
EXECUTE IMMEDIATE sSQL;
COMMIT;
END LOOP;
--------------------------------------------------------------------------------------------------------------------------------
-- Error Handling (If Any)
--------------------------------------------------------------------------------------------------------------------------------
EXCEPTION
END SP_ETL_BI_DATA;
--*************************************************************************************************************
-- End PROCDURE
--*************************************************************************************************************