Technology Blogs by SAP
Learn how to extend and personalize SAP applications. Follow the SAP technology blog for insights into SAP BTP, ABAP, SAP Analytics Cloud, SAP HANA, and more.
cancel
Showing results for 
Search instead for 
Did you mean: 

This blog post explains how to write a custom logic to incorporate Limit file Size incase of SLT replication in the SAP Data Intelligence Pipeline Modeler (Version: 3.1-2010) by extending a predefined Base Operator.


 

Background:


For a customer scenario, we were supposed to replicate data from Customer ECC using ABAP System Landscape Transformation (SLT) Connector to Azure Data Lake Storage (ADLS Gen 2) filesystem using SAP Data Intelligence Cloud 3.1-2010..
The Customer requirement was to have the multiple csv files divided based on particular timestamp (YYYYMMDD_HHMM) and size along with Header Information. Currently there is no direct provision in the SAP Data Intelligence SLT Operator to generate multiple csv files based on timestamp (YYYYMMDD_HHMM) and along with headers.
There is an existing Limit File Size operator in SAP Data Intelligence under scenario template which is in GoLang and generates counter for each portion of data coming from SLT.

The default SLT Connector operator supports 3 file formats:
1. XML
2. CSV
3. JSON

However, the csv format doesn't contain the actual column names coming from source instead it appends with generic column names like C0,C1 etc.
A custom python code was created to append with actual column names, more detailed information can be found in this blog: https://blogs.sap.com/2021/02/19/sap-data-intelligence3.1-2010-csv-as-target-using-slt-connector-ope...



Since, we decided to build the Custom code for header generation in Python and Sample Limit File Size operator is in Golang. The switch between the sub-engines was an expensive approach especially when it comes to huge data volume (in our case, it was 600M+).

Hence, we created a python custom operator which incorporates both functionalities (split the files based on timestamp and size) and generates header with a better throughput.

 



Getting Started


ABAP SLT Connector


The SLT Connector operator establishes a connection between SAP Landscape Transformation Replication Server (SLT) and SAP Data Intelligence. You can then use SLT to replicate tables from a source system into SAP Data Intelligence.

The SLT Connector has different versions. In V0 and V1 the output type was *abap.

Recently, V2 version has been released - the output type is now *message.

For the below use case, we have used V2.

Transfer Mode selected for the use case : Replication (which does Initial load as well Replication)

 

Python Operator - 'Limit File Size with Header'


We have built a custom python operator using the base python operator offered by SAP Data Intelligence to exploit the functionality of SLT V2 Operator.


Basically, In SLT V2 operator the message output has two sections:
1. Attributes
2. Data


In this operator, we have extracted the body and attributes of the input message separately and it works as below:

  • The attributes of the input message basically contains the metadata information with the column names, data type etc which is used to generate header information for each file.

  • We have used a 'counter' variable to generate a counter based on file size limit provided.

  • Set the 'maxizekb' (limit size in kbs) in the porgram: upper limit of the file size that needs to be created in target filesystem.





  • Counter will keep on increasing for each data block until the graph terminates/stops.

  • Incase, the graphs terminated abnormally - the graphs will restart from 0 with different timestamp.




import sys
import io
from io import StringIO
from io import BytesIO
import csv
import pandas as pd
import json
import numpy as np

mysize = 0
mycounter = 0
mykblimit = 25000
counterp = 0

def on_input(inData):
global counterp
global mysize
global mycounter
global mykblimit

counterp += 1

data = StringIO(inData.body)

attr = inData.attributes

ABAPKEY = attr['ABAP']
col= []

for columnname in ABAPKEY['Fields']:
col.append(columnname['Name'])

if(data=='NULL'): return

if(mykblimit == 0):
mykblimit = 1024

a = str(inData.body)

mysize += sys.getsizeof(a)

if (counterp == 1 and mycounter == 0 and mysize < mykblimit * 1024):
attr['cnt'] = str(mycounter)
df = pd.read_csv(data, index_col=False, names=col, dtype = 'str')
df_csv = df.to_csv(index=False, header = True)

elif (counterp > 1 and mycounter == 0 and mysize < mykblimit * 1024):
attr['cnt'] = str(mycounter)
df = pd.read_csv(data, index_col=False, names=col, dtype = 'str')
df_csv = df.to_csv(index=False, header = False)

elif mysize >= mykblimit * 1024:
mycounter += 1
mysize = sys.getsizeof(a)
attr['cnt'] = str(mycounter)
df = pd.read_csv(data, index_col=False, names=col, dtype = 'str')
df_csv = df.to_csv(index=False, header = True)

else:
attr['cnt'] = str(mycounter)
df = pd.read_csv(data, index_col=False, names=col, dtype = 'str')
df_csv = df.to_csv(index=False, header = False)

api.send("output", api.Message(attributes=attr, body=df_csv))

api.set_port_callback("input1", on_input)



 

Write File


This operator writes files to various services. A write operation happens at every input, unless inputs are in batches and Join batches is true.


Each operation uses a connection according to the configured Connection. And uses a path according to the configured Path mode.

Under Write file path, specify the counter created in python operator in <header: counter> pleaceholder as below:

Pathmode: Static with Placeholder
Mode: Append
Join Batches: False





Overall Graph


The overall graph looks as below:





Customer Specific Implementation


** To keep the explaination clear and simple in this post, code snippet has been provided with the counter based on file size only. Some of the customer specific implemenations done are explained below.

For the scenario, we have enhanced the solution (specific to customer requirement in filename) as below:




  • Generated the counter based on timestamp (Date:YYYYMMDD and time: HHMM) and file size.

  • 'cnt’ is in ‘<counter>_<YYYYMMDD>_<HHMM>’ format.

  • A seperate folder will be created on each date.

  • Counter will get reset every day and starts from 0.

  • To simplify and make the code reusable, we created a custom operator extended from base Python operator - where the size limit is a configuration parameter ('maxsizekb')

  • Refer the link, if you are interested in creating the custom operator: https://help.sap.com/viewer/1c1341f6911f4da5a35b191b40b426c8/Cloud/en-US/049d2f3cc69c4281a3f4570c0d2...





 

Files will be created as below (maxsizekb = 50000):





 

To Conclude,


We have learned, how do we create the target files based on the file size provided, how do we get headers and what we can further do to enhance the pipelines specific to the user requiremnents.

Voila! Now, you don’t have any troubles with Limiting the size of the target files with headers in SLT replication 😃

If you are interested to understand how the enhancements were carried out or have ideas for the next blog post, please let me know in the comment section below.


For more information on SAP Data Intelligence, please see:


Exchange Knowledge: SAP Community | Q&A Blogs

2 Comments