Technology Blogs by SAP
Learn how to extend and personalize SAP applications. Follow the SAP technology blog for insights into SAP BTP, ABAP, SAP Analytics Cloud, SAP HANA, and more.
cancel
Showing results for 
Search instead for 
Did you mean: 
Ian_Henry
Product and Topic Expert
Product and Topic Expert
3,554
Here I want to build a pipeline that retrieves economic indicators from the internet, processes them and then loads into the SAP Vora engine, within SAP Data Intelligence.

The Pipeline components I will use for this pipeline are

  • Message Generator - Pass the URL to the HTTP Client

  • HTTP Client – Download CSV file with Exchange Rates

  • JavaScript – Remove 5 Header Rows

  • Multiplexer – Split the pipeline to provide multiple outputs

  • Wiretap – View the pipeline data on the screen

  • Vora Avro Ingestor – Load the data into Vora

  • Write File – Persist the data into HDFS, S3 or other storage

  • Graph Terminator – Stop the graph from continuously running


The European Central Bank (ECB) provides the Statistical Data Warehouse  that we will use in our pipeline.


Figure 1: European Central Bank Website


We can then take the desired feed, in my case I’ve chosen the daily USD-EUR Exchange Rate

http://sdw.ecb.europa.eu/quickviewexport.do?SERIES_KEY=120.EXR.D.USD.EUR.SP00.A&type=csv


Figure 2: European Central Bank, CSV Data, with 5 header rows


We can pass the url and method using the code below in a simple javascript message generator,
generateMessage = function() {
var msg = {};
msg.Attributes = {};
msg.Attributes["http.url"] = "http://sdw.ecb.europa.eu/quickviewexport.do?SERIES_KEY=120.EXR.D.USD.EUR.SP00.A&type=csv";
msg.Attributes["http.method"] = "GET";
msg.Body = {};

return msg;
};

$.addGenerator(doTick);

function doTick(ctx) {
$.output(generateMessage());
}

Using the Data Hub Pipeline HTTP Client operator.
We do not need to specify any fields as they are populated by the javascript above.

I have increased the timeout to 25 seconds, Request timeout (ms) to 25,000 ms


Figure 3: HTTP Client


We can now test this with the WireTap which accepts any input type.


Figure 4: Skeleton Pipeline


After Saving our pipeline, we can now run this.


Figure 5: Save Pipeline


Creating the file name as above prefixed with "." will automatically create the desired repository folder structure.


Figure 6: Repository Structure


We can press Run and we should see it running at the bottom


Figure 7: Running Pipeline


With the pipeline running, we open the UI for the WireTap


Figure 8: Open Wiretap UI


We can see our data being returned to the screen


Figure 9: CSV in Wiretap output


The pipeline will continue to to run forever, so you should stop it.

We extend the pipeline with a Multiplexer and and Write File


Figure 10: HDFS Configuration


The directory structure is automatically created if it does not already exist.  <counter>, <date> and <time> are built in variables that can be used to create filenames.  We can re-use connections already defined.

Re-running the pipeline will save the CSV file into HDFS.

We can browse the output in HDFS with the Data Intelligence Metadata Explorer.

We can see that the are a number of header rows that should be handled.



Using a simple piece of JavaScript will allow us to do this,



Much of the code below is the framework for the JavaScript Operator 2, we just need the inbody lines to actually strip out the header
$.setPortCallback("input",onInput);

function isByteArray(data) {
return (typeof data === 'object' && Array.isArray(data)
&& data.length > 0 && typeof data[0] === 'number')
}

function onInput(ctx,s) {
var msg = {};
var inbody = s.Body;

if (isByteArray(inbody)) {
inbody = String.fromCharCode.apply(null, inbody);
}

// remove first 5 lines
// break the textblock into an array of lines
inbody = inbody.split('\n');
// remove 5 lines, starting at the first position
inbody.splice(0,5);
// join the array back into a single string
inbody = inbody.join('\n');

msg.Body = inbody;
$.output(msg);
}

We can save and re-run our job, and check the output with either the WireTap or Discovery





We can now extend the pipeline further, by loading into SAP Vora using the Vora Avro Ingestor.
Despite what the name suggests, this operator actually works with JSON, CSV and Avro file formats.



The Vora Avro Ingestor expects an Avro Schema, the schema tells Vora the table name, columns, data types and specification, we need to supply this, which is shown below.

DefaultAvroSchema:
{
"name": "ECB_EXCHANGE_RATE",
"type": "record",
"fields": [
{
"name": "ER_DATE",
"type": [
"null",
"date"
]
},
{
"name": "EXCHANGE_RATE",
"type": [
"null",
"double"
]
}
]
}

The other parameters for the Vora Avro Ingestor with the TA image would like like the following.
dsn: v2://dqp:50000/?binary=true
engineType: Disk
tableType: empty (not Streaming)
databaseSchema: WEBDATA
csvHeaderIncluded: false
user: admin
password: I can't tell you that

Rerunning this pipeline now produces an error, as the we attempt to insert a "-" into an numeric field.



We can add the following additional line of JavaScript to correct this.
// Replace "-" characters with Nulls in Exchange Rate data
inbody = inbody.replace(/,-/g,",");

If we rerun our pipeline now we can check Vora Tools to see that the Schema and Table and have been created and the data has also been loaded.

However the pipeline will continue to run and re-download the same data and insert into our Vora table continuously.  To change this behavior we need to supply a commit token within our existing JavaScript operator.  The Vora Avro Ingestor will then pass this on to the Graph Terminator.
msg.Attributes = {};    
// Add Commit Token to DataHub message header to stop pipeline from running continuously
msg.Attributes["message.commit.token"] = "stop-token";

The completed pipeline would then look like this.



Rerunning it now, we can see that the pipeline completes and all is good 🙂



I have written an alternative solution where I use Python instead of JavaScript, which can be found here.

Using the SAP Data Intelligence Pipeline Python Operator
11 Comments