Many users are facing an issue on how to reduce records to be processed in their Data & Analytics platform, if the source object is not delta capable. That means, they have to pull the data per full from source system and then process all records per full inside their Data & Analytics platform. In this article I would like to show you, how to reduce records to be processed inside of SAP Datasphere, so that only new, changed and deleted records will be processed and Change Data Capture can be used for the next data processing via Transformation Flow.
These are the steps need to be implemented in SAP Datasphere:
In our case the remote table and the data integration into the first Local Table "rw_cskt" in SAP Datasphere are already done. Here is the property of the Local Table "rw_cskt":
Source code:
/*Modified records*/
select
source.row as "source_row",
target.row as "target_row",
source."mandt",source."spras",source."kokrs",source."kostl",source."datbi",source."ktext",source."ltext",source."mctxt",source."operation_flag",source."is_deleted",source."datvon", source."recordstamp", case when source."row" = target."row" then '' else 'U' end as "Change_Type", now() as "Change_Date"
from
(
select
HASH_MD5(to_binary(coalesce("mandt", '')), to_binary(coalesce("spras", '')), to_binary(coalesce("kokrs", '')), to_binary(coalesce("kostl", '')), to_binary(coalesce("datbi", '')), to_binary(coalesce("ktext", '')), to_binary(coalesce("ltext", '')), to_binary(coalesce("mctxt", '')), to_binary(coalesce("operation_flag", '')), to_binary(coalesce("is_deleted", '')), to_binary(coalesce("datvon", ''))) as row,
HASH_MD5(to_binary(coalesce("mandt", '')), to_binary(coalesce("spras", '')), to_binary(coalesce("kokrs", '')), to_binary(coalesce("kostl", '')), to_binary(coalesce("datbi", ''))) as "key",
"mandt","spras","kokrs","kostl","datbi","ktext","ltext","mctxt","operation_flag","is_deleted","recordstamp","datvon"
from "rw_cskt"
) as source
inner join
(
select
HASH_MD5(to_binary(coalesce("mandt", '')), to_binary(coalesce("spras", '')), to_binary(coalesce("kokrs", '')), to_binary(coalesce("kostl", '')), to_binary(coalesce("datbi", '')), to_binary(coalesce("ktext", '')), to_binary(coalesce("ltext", '')), to_binary(coalesce("mctxt", '')), to_binary(coalesce("operation_flag", '')), to_binary(coalesce("is_deleted", '')), to_binary(coalesce("datvon", ''))) as row,
HASH_MD5(to_binary(coalesce("mandt", '')), to_binary(coalesce("spras", '')), to_binary(coalesce("kokrs", '')), to_binary(coalesce("kostl", '')), to_binary(coalesce("datbi", ''))) as "key"
from "rw_cskt_Delta"
) as target on
source.key = target.key
where
source."row" != target."row"
union all
/*New records*/
select
source.row as "source_row",
target.row as "target_row",
"mandt","spras","kokrs","kostl","datbi","ktext","ltext","mctxt","operation_flag","is_deleted","datvon","recordstamp", 'D' as "Change_Type", now() as "Change_Date"
from
(
select
HASH_MD5(to_binary(coalesce("mandt", '')), to_binary(coalesce("spras", '')) ,to_binary(coalesce("kokrs", '')), to_binary(coalesce("kostl", '')), to_binary(coalesce("datbi", ''))) as row,
"mandt","spras","kokrs","kostl","datbi","ktext","ltext","mctxt","operation_flag","is_deleted","recordstamp","datvon"
from
"rw_cskt"
) as source
left outer join
(
select
HASH_MD5(to_binary(coalesce("mandt", '')), to_binary(coalesce("spras", '')) ,to_binary(coalesce("kokrs", '')), to_binary(coalesce("kostl", '')), to_binary(coalesce("datbi", ''))) as row
from
"rw_cskt_Delta"
) as target on
source.row = target.row
where
target.row is null
union all
/*Deleted records*/
select
source.row as "source_row",
target.row as "target_row",
"mandt","spras","kokrs","kostl","datbi","ktext","ltext","mctxt","operation_flag","is_deleted","datvon","recordstamp", 'D' as "Change_Type", now() as "Change_Date"
from
(
select
HASH_MD5(to_binary(coalesce("mandt", '')), to_binary(coalesce("spras", '')) ,to_binary(coalesce("kokrs", '')), to_binary(coalesce("kostl", '')), to_binary(coalesce("datbi", ''))) as row,
"mandt","spras","kokrs","kostl","datbi","ktext","ltext","mctxt","operation_flag","is_deleted","recordstamp","datvon"
from
"rw_cskt_Delta"
) as target
left outer join
(
select
HASH_MD5(to_binary(coalesce("mandt", '')), to_binary(coalesce("spras", '')) ,to_binary(coalesce("kokrs", '')), to_binary(coalesce("kostl", '')), to_binary(coalesce("datbi", ''))) as row
from
"rw_cskt"
) as source on
source.row = target.row
where
source.row is null
In this source code I transform key and all columns in each row (source table and target table) to binary and then to hash MD5 format. And then I check, if there are modifications on existing records in source and target table. In this case Change Type will be set 'U'.
New records will be identify by comparing key columns in the source and target table. If records are not available in target table, then this record will be inserted (Change Type = 'L').
To identify deleted records I transform only key columns in each row (source table and target table) to binary and then to hash MD5 format. If a record in target table is not available in source table, then this record has to be deleted (Change Type = 'D').
The benefit using these functions to_binary and then hash_md5 is, that its conversion and search performance is very fast and the coding is more simple.
If I execute this SQL View, it delivers in this case two records (a modified and a deleted record).
Map the "Change Type" and the field "Change Date" can not be mapped. It will be filled automatically by SAP Datasphere.
Since View is not Delta Capture capable, so I can only choose Load Type: Initial only.
After deploying the Transformation Flow, I execute it and receive this result:
Only two records are being processed by Transformation Flow.
I hope this approach helps minimize the volume of records to be processed in SAP Datasphere, especially when dealing with hundreds of millions of records. After processing the data, you can use the target local table as a data source for further delta-capable transformation flows, business logic processing, or other purposes
Thanks
Wanda Soetomo
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.
User | Count |
---|---|
11 | |
11 | |
10 | |
9 | |
8 | |
7 | |
6 | |
6 | |
5 | |
5 |