Technology Blog Posts by Members
cancel
Showing results for 
Search instead for 
Did you mean: 
Wanda_Soetomo
Explorer
5,519

Many users are facing an issue on how to reduce records to be processed in their Data & Analytics platform, if the source object is not delta capable. That means, they have to pull the data per full from source system and then process all records per full inside their Data & Analytics platform. In this article I would like to show you, how to reduce records to be processed inside of SAP Datasphere, so that only new, changed and deleted records will be processed and Change Data Capture can be used for the next data processing via Transformation Flow.

Wanda_Soetomo_0-1733238360137.png

These are the steps need to be implemented in SAP Datasphere:

In our case the remote table and the data integration into the first Local Table "rw_cskt" in SAP Datasphere are already done. Here is the property of the Local Table "rw_cskt":

Wanda_Soetomo_1-1733223542234.png

Wanda_Soetomo_2-1733223623291.png

1. Create a SQL View "vw_rw_cskt_delta_test" to extract only new, modified and deleted records from source Local Table "rw_cskt"

Wanda_Soetomo_0-1733237263327.png

Source code:

/*Modified records*/
select
source.row as "source_row",
target.row as "target_row",
source."mandt",source."spras",source."kokrs",source."kostl",source."datbi",source."ktext",source."ltext",source."mctxt",source."operation_flag",source."is_deleted",source."datvon", source."recordstamp", case when source."row" = target."row" then '' else 'U' end as "Change_Type", now() as "Change_Date"
from
(
select
HASH_MD5(to_binary(coalesce("mandt", '')), to_binary(coalesce("spras", '')), to_binary(coalesce("kokrs", '')), to_binary(coalesce("kostl", '')), to_binary(coalesce("datbi", '')), to_binary(coalesce("ktext", '')), to_binary(coalesce("ltext", '')), to_binary(coalesce("mctxt", '')), to_binary(coalesce("operation_flag", '')), to_binary(coalesce("is_deleted", '')), to_binary(coalesce("datvon", ''))) as row,
HASH_MD5(to_binary(coalesce("mandt", '')), to_binary(coalesce("spras", '')), to_binary(coalesce("kokrs", '')), to_binary(coalesce("kostl", '')), to_binary(coalesce("datbi", ''))) as "key",
"mandt","spras","kokrs","kostl","datbi","ktext","ltext","mctxt","operation_flag","is_deleted","recordstamp","datvon"
from "rw_cskt"
) as source
inner join
(
select
HASH_MD5(to_binary(coalesce("mandt", '')), to_binary(coalesce("spras", '')), to_binary(coalesce("kokrs", '')), to_binary(coalesce("kostl", '')), to_binary(coalesce("datbi", '')), to_binary(coalesce("ktext", '')), to_binary(coalesce("ltext", '')), to_binary(coalesce("mctxt", '')), to_binary(coalesce("operation_flag", '')), to_binary(coalesce("is_deleted", '')), to_binary(coalesce("datvon", ''))) as row,
HASH_MD5(to_binary(coalesce("mandt", '')), to_binary(coalesce("spras", '')), to_binary(coalesce("kokrs", '')), to_binary(coalesce("kostl", '')), to_binary(coalesce("datbi", ''))) as "key"
from "rw_cskt_Delta"
) as target on
source.key = target.key
where
source."row" != target."row"

union all

/*New records*/
select
source.row as "source_row",
target.row as "target_row",
"mandt","spras","kokrs","kostl","datbi","ktext","ltext","mctxt","operation_flag","is_deleted","datvon","recordstamp", 'D' as "Change_Type", now() as "Change_Date"
from
(
select
HASH_MD5(to_binary(coalesce("mandt", '')), to_binary(coalesce("spras", '')) ,to_binary(coalesce("kokrs", '')), to_binary(coalesce("kostl", '')), to_binary(coalesce("datbi", ''))) as row,
"mandt","spras","kokrs","kostl","datbi","ktext","ltext","mctxt","operation_flag","is_deleted","recordstamp","datvon"
from
"rw_cskt"
) as source
left outer join
(
select
HASH_MD5(to_binary(coalesce("mandt", '')), to_binary(coalesce("spras", '')) ,to_binary(coalesce("kokrs", '')), to_binary(coalesce("kostl", '')), to_binary(coalesce("datbi", ''))) as row
from
"rw_cskt_Delta"
) as target on
source.row = target.row
where
target.row is null

union all

/*Deleted records*/
select
source.row as "source_row",
target.row as "target_row",
"mandt","spras","kokrs","kostl","datbi","ktext","ltext","mctxt","operation_flag","is_deleted","datvon","recordstamp", 'D' as "Change_Type", now() as "Change_Date"
from
(
select
HASH_MD5(to_binary(coalesce("mandt", '')), to_binary(coalesce("spras", '')) ,to_binary(coalesce("kokrs", '')), to_binary(coalesce("kostl", '')), to_binary(coalesce("datbi", ''))) as row,
"mandt","spras","kokrs","kostl","datbi","ktext","ltext","mctxt","operation_flag","is_deleted","recordstamp","datvon"
from
"rw_cskt_Delta"
) as target
left outer join
(
select
HASH_MD5(to_binary(coalesce("mandt", '')), to_binary(coalesce("spras", '')) ,to_binary(coalesce("kokrs", '')), to_binary(coalesce("kostl", '')), to_binary(coalesce("datbi", ''))) as row
from
"rw_cskt"
) as source on
source.row = target.row
where
source.row is null

In this source code I transform key and all columns in each row (source table and target table) to binary and then to hash MD5 format. And then I check, if there are modifications on existing records in source and target table. In this case Change Type will be set 'U'.

New records will be identify by comparing key columns in the source and target table. If records are not available in target table, then this record will be inserted (Change Type = 'L').

To identify deleted records I transform only key columns in each row (source table and target table) to binary and then to hash MD5 format. If a record in target table is not available in source table, then this record has to be deleted (Change Type = 'D').

The benefit using these functions to_binary and then hash_md5 is, that its conversion and search performance is very fast and the coding is more simple.

If I execute this SQL View, it delivers in this case two records (a modified and a deleted record).

Wanda_Soetomo_1-1733237808155.png

 

2. Use this SQL View "vw_rw_cskt_delta_test" as data source for a new Transformation Flow "tf_rw_cskt_Delta"

Wanda_Soetomo_5-1733225232449.png

3. Create a new Local Table as target with Delta Capture active

Wanda_Soetomo_0-1733228643823.png

Map the "Change Type" and the field "Change Date" can not be mapped. It will be filled automatically by SAP Datasphere.

Wanda_Soetomo_1-1733228763131.png

Since View is not Delta Capture capable, so I can only choose Load Type: Initial only.

Wanda_Soetomo_2-1733229644600.png

4. Execute Transformation Flow

After deploying the Transformation Flow, I execute it and receive this result:

Wanda_Soetomo_2-1733238011454.png

Only two records are being processed by Transformation Flow.

Wanda_Soetomo_3-1733238099795.png

 

Summary:

I hope this approach helps minimize the volume of records to be processed in SAP Datasphere, especially when dealing with hundreds of millions of records. After processing the data, you can use the target local table as a data source for further delta-capable transformation flows, business logic processing, or other purposes

 

Thanks

Wanda Soetomo

 

17 Comments
Thibault_dockx
Explorer

Hi,

 

Interesting blog!

How's the performance of this SQL View: Any limits on table sizes, time to lookup these changes?

 

KR,

Thibault

_Toni_
Explorer
0 Kudos

Nice blog and interesting approach with the hashing. Will definitely check it out!

GabTan
Explorer
0 Kudos

Hi @Wanda_Soetomo 

Why, in this implementation, the choice was made to not use the built-in "Delta Capture" functionality available for the local table?

What does this custom approach provides which "Delta Capture" does not?

I think it is important to highlight such reasons in the post, so that readers can make better informed choices.

Thank you!

Wanda_Soetomo
Explorer

Hello @Thibault_dockx,

in my demo case I only have 53k records with 2 medified records. And the runtime was 3 seconds. But in one of my customer we had 1 Billion records on SAP HANA Native On Premise and I used Flowgraph to do the same with HASH and MD5 function. And it worked well, since this HASH and MD5 functions create 32 string long and to compare 32 chars is much faster then to compare each fields in a row. The flowgraph was crashed if I compare each field with memory alllocation error.

Best regards

Wanda Soetomo

 

Wanda_Soetomo
Explorer

Hello @GabTan ,

the problem is, that we can not use Delta Capture activated local Table as target in DataFlow to extract data directly from source system via remote table. To use Delta Capture activated local Table as target you have to use Transformation Flow, but if you have remote table as data source you can not use Transformation Flow to extract data from source system. 🙄

That is the reason why I have written this blog and hope that SAP will provide us the solution for this issue. The other option would be, to use Delta capable ABAP CDS View in combination with Replication Flow to get the data per Delta into SAP Datasphere and after that to process the delta records in SAp Datasphere using Transformation Flow.

Best regards

Wanda Soetomo

albertosimeoni
Participant
0 Kudos

@Wanda_Soetomo 

but you can use graphical view above remote table.

due to the nature of the query in both cases you have a full table scan (no need for the first dataflow).

probably in case of remote table without replication is more efficient (network) to have a persistency (the query is a union of 3 subquery so we have 3 remote table scan).

a possibile solution could be:

- remote table

- view with persistency (more robust in case of failure than dataflow as it works with snapshots).

- view that implement CDC logic

- transformation flow.

Wanda_Soetomo
Explorer
0 Kudos

Hello @albertosimeoni ,

thank you for your feedback. I have tested this before, since SAP Datasphere will not allow any view (with data persistency as well) with Remote Table as data source and use this as data source in Transformation Flow.

Wanda_Soetomo_0-1733915434008.png

Best regards

Wanda Soetomo

albertosimeoni
Participant
0 Kudos

@Wanda_Soetomo 

great discovery!!

a very strange limitation as in the database remote table are virtual table and all the logic is executed by stored procedure.

tmeyer
Participant
0 Kudos

One Question why do you use as Change Type L instead of I for new records? In the help, it is documented as I 

https://help.sap.com/docs/SAP_DATASPHERE/c8a54ee704e94e15926551293243fd1d/154bdffb35814d5481d1f6de14...

albertosimeoni
Participant
0 Kudos

@tmeyer  hello, from what I found,

A,L,M -> output of replication flow.

U,I,D -> from manual changes to the table with table editor.

tmeyer
Participant
0 Kudos

First, thanks for the information @Wanda_Soetomo & @albertosimeoni 

So, as you mentioned the s-note @Wanda_Soetomo, it would be possible to set one row to update before/after image. So I can build my own changelog? 

Best regards

VikasParmar055
Explorer
0 Kudos

.

Sebastian_Gesiarz
Active Participant
0 Kudos

Thank you for the blog, It gives me a new level of appreciation of the 'snapshot' ADSO feature in BW/4HANA 😉 A neat touch with the binary conversion and generating hashes. Out of curiosity, how often did you update the source table, and through which method?

puneetc
Participant
0 Kudos

@Wanda_Soetomo thanks for the wonderful blog.. could you please share the textpad or a word document for the code as the snapshot and the code given has some mis match due to width of the screen. 

alwinmalko
Newcomer
0 Kudos

@Wanda_Soetomo first of all thanks for this blog. 

Correct me if i understood it wrong, but you still have to do a full load in your staging table. Have you maybe tried to use this logic on the remote table directly? And if yes are the filters pushed down to the source? Because nevertheless the first step will be a full load and if there are many records you still can have performance issues. 

albertosimeoni
Participant
0 Kudos

@Wanda_Soetomo i find probably why we can not use remote table as source.

if the remote table uses fill snapshot replica, the replication is done with tables in the same schema of the space in which the remote table is defined (as virtual table).

if the remote table uses real time replicationi (for example from an HANA source system), the replica table is inside the schema _SYS_TABLE_REPLICA.

A select statement on a virtual table with replica is routed to the table in schema _SYS_TABLE_REPLICA.

Probably there are limitations in one of those use case when it comes to transformation flows?