Here I want to build a pipeline that retrieves economic indicators from the internet, processes them and then loads into the SAP Vora engine, within SAP Data Intelligence.
The Pipeline components I will use for this pipeline are
- Message Generator - Pass the URL to the HTTP Client
- HTTP Client – Download CSV file with Exchange Rates
- JavaScript – Remove 5 Header Rows
- Multiplexer – Split the pipeline to provide multiple outputs
- Wiretap – View the pipeline data on the screen
- Vora Avro Ingestor – Load the data into Vora
- Write File – Persist the data into HDFS, S3 or other storage
- Graph Terminator – Stop the graph from continuously running
The
European Central Bank (ECB) provides the
Statistical Data Warehouse that we will use in our pipeline.
Figure 1: European Central Bank Website
We can then take the desired feed, in my case I’ve chosen the daily USD-EUR Exchange Rate
http://sdw.ecb.europa.eu/quickviewexport.do?SERIES_KEY=120.EXR.D.USD.EUR.SP00.A&type=csv
Figure 2: European Central Bank, CSV Data, with 5 header rows
We can pass the url and method using the code below in a simple javascript message generator,
generateMessage = function() {
var msg = {};
msg.Attributes = {};
msg.Attributes["http.url"] = "http://sdw.ecb.europa.eu/quickviewexport.do?SERIES_KEY=120.EXR.D.USD.EUR.SP00.A&type=csv";
msg.Attributes["http.method"] = "GET";
msg.Body = {};
return msg;
};
$.addGenerator(doTick);
function doTick(ctx) {
$.output(generateMessage());
}
Using the Data Hub Pipeline
HTTP Client operator.
We do not need to specify any fields as they are populated by the javascript above.
I have increased the timeout to 25 seconds,
Request timeout (ms) to 25,000 ms
Figure 3: HTTP Client
We can now test this with the WireTap which accepts any input type.
Figure 4: Skeleton Pipeline
After Saving our pipeline, we can now run this.
Figure 5: Save Pipeline
Creating the file name as above prefixed with "." will automatically create the desired repository folder structure.
Figure 6: Repository Structure
We can press Run and we should see it running at the bottom
Figure 7: Running Pipeline
With the pipeline running, we open the UI for the WireTap
Figure 8: Open Wiretap UI
We can see our data being returned to the screen
Figure 9: CSV in Wiretap output
The pipeline will continue to to run forever, so you should stop it.
We extend the pipeline with a Multiplexer and and Write File
Figure 10: HDFS Configuration
The directory structure is automatically created if it does not already exist. <counter>, <date> and <time> are built in variables that can be used to create filenames. We can re-use connections already defined.
Re-running the pipeline will save the CSV file into HDFS.
We can browse the output in HDFS with the Data Intelligence Metadata Explorer.
We can see that the are a number of header rows that should be handled.
Using a simple piece of JavaScript will allow us to do this,
Much of the code below is the framework for the JavaScript Operator 2, we just need the inbody lines to actually strip out the header
$.setPortCallback("input",onInput);
function isByteArray(data) {
return (typeof data === 'object' && Array.isArray(data)
&& data.length > 0 && typeof data[0] === 'number')
}
function onInput(ctx,s) {
var msg = {};
var inbody = s.Body;
if (isByteArray(inbody)) {
inbody = String.fromCharCode.apply(null, inbody);
}
// remove first 5 lines
// break the textblock into an array of lines
inbody = inbody.split('\n');
// remove 5 lines, starting at the first position
inbody.splice(0,5);
// join the array back into a single string
inbody = inbody.join('\n');
msg.Body = inbody;
$.output(msg);
}
We can save and re-run our job, and check the output with either the WireTap or Discovery
We can now extend the pipeline further, by loading into SAP Vora using the
Vora Avro Ingestor.
Despite what the name suggests, this operator actually works with
JSON,
CSV and
Avro file formats.
The Vora Avro Ingestor expects an
Avro Schema, the schema tells Vora the table name, columns, data types and specification, we need to supply this, which is shown below.
DefaultAvroSchema:
{
"name": "ECB_EXCHANGE_RATE",
"type": "record",
"fields": [
{
"name": "ER_DATE",
"type": [
"null",
"date"
]
},
{
"name": "EXCHANGE_RATE",
"type": [
"null",
"double"
]
}
]
}
The other parameters for the Vora Avro Ingestor with the TA image would like like the following.
dsn: v2://dqp:50000/?binary=true
engineType: Disk
tableType: empty (not Streaming)
databaseSchema: WEBDATA
csvHeaderIncluded: false
user: admin
password: I can't tell you that
Rerunning this pipeline now produces an error, as the we attempt to insert a "-" into an numeric field.
We can add the following additional line of JavaScript to correct this.
// Replace "-" characters with Nulls in Exchange Rate data
inbody = inbody.replace(/,-/g,",");
If we rerun our pipeline now we can check Vora Tools to see that the Schema and Table and have been created and the data has also been loaded.
However the pipeline will continue to run and re-download the same data and insert into our Vora table continuously. To change this behavior we need to supply a commit token within our existing JavaScript operator. The Vora Avro Ingestor will then pass this on to the Graph Terminator.
msg.Attributes = {};
// Add Commit Token to DataHub message header to stop pipeline from running continuously
msg.Attributes["message.commit.token"] = "stop-token";
The completed pipeline would then look like this.
Rerunning it now, we can see that the pipeline completes and all is good
🙂
I have written an alternative solution where I use Python instead of JavaScript, which can be found here.
Using the SAP Data Intelligence Pipeline Python Operator