
Snowflake as a data warehouse, uses, Amazon Web Services, Google Cloud Platform or Microsoft Azure cloud infrastructure, allowing large data to be stored and compute while offering scalability. Enterprise supply chain planning often require data consolidation from multiple data sources. A data lake becomes handy in such situations. Snowflake is considered as a data lake by many SAP Customers. With SAP Integrated Business Planning as a de-facto industry standard tool for supply chain planning, customers request for integration feasibility between SAP IBP and Snowflake.
This blog exclusively describes how we can read forecast data from SAP IBP and write them to a data base table in Snowflake using SAP Cloud Integration as a middleware. To support large data volumes, we use SAP Remote Functions to extract data from SAP IBP into SAP Cloud Integration for ETL purposes. SAP offers an open connectors platform which hosts a marketplace for 3rd party vendors to resell their integration artifacts. Here we use a 3rd party developed, JDBC based open connector, between SAP Cloud Integration and Snowflake. This blog is split in 2 parts.
Let’s start with the following prerequisites in place:
As a prerequisite, we need a Snowflake database user who has the right role and privilege to write data to a table. This user can be also configured with RSA key pair or basic authentication can be used. Details on how to do this is well documented in this security guide from Snowflake.
As mentioned before, we need an external or an internal Staging integration setup in Snowflake. You can use the Snowflake internal staging mechanism for loading your data. But, in this blog we tried to set up Amazon S3 as an external staging integration. Amazon S3 is optional. You can also use Google Cloud Platform or a Microsoft Azure as external staging integration. You can try the following steps to create an external stage using Amazon S3 bucket which we created as an Object store in one of our previous steps.
Once you are on the landing page of the SAP Cloud Integration section, navigate to Capabilities and then Extend Non-SAP Connectivity pad and you can see the Discover Connectors link in that pad. Click on this to log in at SAP Integration Suite - Open Connectors platform.
On the left menu section of your Open connectors landing page, you would see Instances. Click on it and you would see all the instances you have created.
Using the object store instance in your BTP Sub account, you can create a service key for AWS S3. The credentials JSON file from the service key download is used for creating an instance of the Open Connector for AWS S3. Create an instance with this following details.
After filling up the details, you can use the button "Create Instance" to generate the API Docs for your instance. You can try out an API call to get the User, Element and Organization details like the previous step. User, Element and Organization parameter values are used to create a new Security Material. For this,
You have now created two Open Connector instances and configured this instance on SAP Cloud Integration to be consumed in an iFlow. We will need this user Credentials name and the URL of the Open Connector instance, for later usage.
Update: 05:12:2024 - an alternative way to integrate using Native adapters is also possible. I have captured the details in this new blog
The 2402 IBP release comes with a set of RFC interfaces which can be used with iFlows in SAP Cloud Integration for extracting key figure and master data. These are packed under the communication scenario SAP_COM_0931. Create a new arrangement, a system, and a user for it. These function modules are intended to be used by the SAP Cloud Integration only.
You can create a BTP destination with the communication credentials for the IBP instance in your BTP sub account where the SAP Cloud Integration runtime process is instantiated.
Fig.1 Screen shot of BTP Destination service configuration.
Here is a screen shot of the destination configuration. In the above example, the destination name is called IBP_G5R_100. This name is then later used in the iFlows to make WS-RFC calls to SAP IBP instance which is configured in the jco.client.wshost property.
Once the above sections were done, then we take a closer look into the reading from IBP and writing to Snowflake. The diagram below would give you a high-level overview of the entire process.
Fig.2 High level overview of components.
In the above picture, we want to read the CONSDEMANDQTY which was calculated during a forecast run in SAP IBP. This is read using a set of helper iFlows from Cloud Integration. This data is then stored in an external staging area in AWS S3. Once the file is stored in the S3 bucket, we copy the data from S3 to a table - IBPFORECAST in a Snowflake database using a SQL command via the Open Connector.
Data is read from IBP in three steps. It is an asynchronous process. The helper iFlows are used for all of these steps. First, a select query is initialized. The helper iFlow is invoked via a process direct – SAP_IBP_READ_-_Initialize. The following payload is sent as a request.
<IBPReads>
<IBPRead Key="${header.IBPQueryKey1}"
Destination="${header.IBPDestination}"
PackageSizeInRows="${header.IBPPackageSizeInRows}"
Select="${header.IBPQuerySelect}"
OrderBy="${header.IBPQueryOrderBy}"
Filter="${header.IBPQueryFilterString}"
TypeOfData="${header.IBPQueryTypeOfData}"
TimeAggregationLevel="2"/>
</IBPReads>
In this example, the IBPQueryKey1 is the CONSDEMANDQTY – the key figure which we wanted to read, IBPDestination is the destination service definition on the BTP sub account for the IBP backend. IBPPackageSizeInRows defines the number of rows you want to read from this call. IBPQuerySelect is the actual select parameters. For example: - PRDID, CUSTID, LOCID, CONSDEMANDQTY, PERIOD_LEVEL_3, UOMID. In this example, all names are root attributes for the key figure CONSDEMANDQTY key figure defined in the planning area in IBP. IBPQueryTypeOfData is KeyFigures – type of data we want to read (Key figures vs Master data). You can also use the IBPQueryFilterString and the IBPQueryOrderBy to define the select filter and order. For example:- CONSENSUSDEMANDQTY gt 0 AND UOMTOID eq ‘EQ’ can be the query select string.
In the second step, we check the status of the attribute Status inside the IBPQuery element which is returned from the helper iFlow. This status could be either a PRE-INIT or FETCH_DATA. If the helper iFlow returned with the PRE-INIT, then there was an exception thrown during the query initialization stage. If the status was FETCH_DATA, then the initialization was successful and now we can fetch the data. The helper iFlow waits in a loop till it gets the status of the query init call. The response if success also has a UUID which we can use to fetch the data. The following payload is used in making the fetch data call to the helper iFlows. It is called using the Process Direct SAP_IBP_Read_-_Fetch_Data.
<IBPRead>
<xsl:attribute name="Key">
<xsl:value-of select="$IBPQueryKey1"/>
</xsl:attribute>
<xsl:attribute name="Offset">
<xsl:value-of select="$IBPQueryOffset1"/>
</xsl:attribute>
<xsl:attribute name="PackageSizeInRows">
<xsl:value-of select="$IBPPackageSizeInRows"/>
</xsl:attribute>
<xsl:attribute name="ParallelThread">
<xsl:value-of select="$IBPQueryParallelThread"/>
</xsl:attribute>
</IBPRead>
You can use this call to read data in batches. Using the package size in rows you can read multiple batches. In this example, the IBPQueryOffset1 is the offset number which you can use to skip the number of rows you have already read. The result of this call would return the key figure data using the helper iFlows.
The XML response from SAP IBP WS-RFC calls contain forecast data for the key figure. It is then converted into JSON using the XML to JSON converter. A groovy script is used to map this data to the column names in target table in Snowflake. A JSONBuilder is used to collect all the rows from the JSON array. You can add your own custom logic or extend your existing processes to adapt to these activities in your own iFlow like I do it here. In addition, there are a few attributes needed for the subsequent calls – for example, S3 file name, the name you want to store and the destination name for Snowflake and Amazon S3. We are using a separate process directs for these calls. It is ideal to store these values as JSON key value pairs and pass them as a payload. Forecast data from IBP is stored in S3 bucket as a JSON file. This file is generated automatically during runtime but never stored in the SAP Cloud Integration. To do this, the following groovy function is used,
// Data payload from the previous step
JSONArray valueJSONArray = input.get("AWSPayload");
def prettyBody = valueJSONArray.toString() as String;
def bytes = prettyBody.getBytes();
// New Multipart MINE envelope
MimeBodyPart bodyPart = new MimeBodyPart()
// Data source envelope with input as bytes
ByteArrayDataSource dataSource = new ByteArrayDataSource(bytes, 'application/json')
// Data handler envelop with the data source
DataHandler byteDataHandler = new DataHandler(dataSource)
// Miltipart MIME body with the data
bodyPart.setDataHandler(byteDataHandler)
// File name for the data
String fileName = input.AWSFileName
bodyPart.setFileName(fileName)
// File type as JSON
String fileType = input.AWSFileType
// Second Body part definition
bodyPart.setDisposition('form-data; name="file"')
bodyPart.setHeader("Content-Type",fileType)
MimeMultipart multipart = new MimeMultipart()
// Add body parts to the Multipart
multipart.addBodyPart(bodyPart)
// Convert the body parts to byte array output stream
ByteArrayOutputStream outputStream = new ByteArrayOutputStream()
multipart.writeTo(outputStream)
message.setBody(outputStream)
// Boundries for the body part
String boundary = (new ContentType(multipart.contentType)).getParameter('boundary')
message.setHeader('Content-Type', "multipart/form-data; boundary=${boundary}")
// Return the message with header set - body length and body type
message = setHeader(message);
return message;
Now the data is POST-ed to the Open connectors directly using a HTTP adapter. We use the setHeaders method to create Authentication parameters using the User, organization and element in the following way:-
def Message setHeader(Message message) {
def properties = message.getProperties();
String user = properties.get("OCUser") as String;
String org = properties.get("OCOrg") as String;
String element = properties.get("OCElement") as String;
message.setHeader("Authorization", "User " + user + ", Organization " + org + ", Element " + element);
return message;
}
Data from the staging Amazon S3 bucket has to be moved to the snowflake database table - IBPFORECAST. This is done my calling an SQL command on the Snowflake warehouse. We created a separate local flow which is making this call using the Open Connector Snowflake instance and passed on the following SQL command.
COPY INTO "TEST_DB"."PUBLIC"."IBPFORECAST"
FROM '@"TEST_DB"."PUBLIC"."INTSTAGE"'
FILES = ('forecast.json.gz')
FILE_FORMAT = (
TYPE=JSON,
STRIP_OUTER_ARRAY=TRUE,
REPLACE_INVALID_CHARACTERS=TRUE,
DATE_FORMAT=AUTO,
TIME_FORMAT=AUTO,
TIMESTAMP_FORMAT=AUTO
)
MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE
ON_ERROR=CONTINUE
It is also possible to store multiple files from a folder and then issue one COPY TO command for the folder. This would copy all the files from that folder path into the Snowflake database table. There is also a default delta load comparison in that integration step on Snowflake. Only new files or updated files are loaded in the COPY TO process. The SQL command is prepared in a Groovy script like the following function and then passed as a payload to the Open connector adapter.
def Message handleRequestToLoadSnowflake(Message message) {
def body = message.getBody(java.io.Reader);
def input = new JsonSlurper().parse(body);
// Open connectors
message.setProperty("OCURL", input.OCURL);
// Snowflake
message.setProperty("SFTarget", input.SFTarget);
message.setProperty("SFStaging", input.SFStaging);
message.setProperty("SFDestination", input.SFDestination);
String sqlScript = "COPY INTO " + input.SFTarget + " FROM " + input.SFStaging + " FILE_FORMAT = ( TYPE=JSON, STRIP_OUTER_ARRAY=TRUE,REPLACE_INVALID_CHARACTERS=TRUE, DATE_FORMAT=AUTO, TIME_FORMAT=AUTO, TIMESTAMP_FORMAT=AUTO ) MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE ON_ERROR=CONTINUE";
JSONObject requestObject = new JSONObject();
requestObject.put("script", sqlScript);
def prettyBody = requestObject.toString() as String;
message.setBody(prettyBody);
return message;
}
Once the data is copied into the Snowflake database, the following response is received form Snowflake.
[
{
"rows_loaded": 10000,
"errors_seen": 0,
"file": "s3://< S3 bucket name >/data/sap/forecast.json",
"error_limit": 10000,
"rows_parsed": 10000,
"status": "LOADED"
}
]
The above JSON response shows that 10000 rows of data were inserted into the Snowflake database table from the Amazon S3 bucket using the external staging integration.
In my previous blog we saw how data can be read from Snowflake and written to IBP. this blog, I shared a few details on how I,
Although we used an external staging integration in this exercise, it is also possible to store data in an internal staging environment in Snowflake or even Streaming data. But I leave this to the reader to experiment that approach or more advanced trials.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.
User | Count |
---|---|
12 | |
12 | |
11 | |
11 | |
10 | |
8 | |
8 | |
7 | |
7 | |
7 |