In the previous parts of this series of tutorials, you learned how to develop your own Pipeline Operator without the need of bringing an own Docker environment. In this the tutorial, I explain how to integrate a custom Dockerfile into the SAP Data Hub and how to use this Dockerfile in a custom operator.
This is the third article of a series of tutorials:
Create an Operator with own Docker File
In the following, we create a custom Python operator "Stock Price Reader" which reads stock prices from a public API.
Python is a programming language which is natively supported by the pre-shipped Python base operators. This means that the SAP Data Hub pipeline engine is capable running Python scripts natively without the need of providing your own Python environment and there is an API available which allows you to integrate your own Python code, reading configuration parameters and connecting to ports without the need of messing around with process execution and the handling of arguments. The Stock Price Reader requires a specific Python library that is not included in the Python standard library and therefore requires a custom Docker image that provides Python with that library.
It is worth mention that there is a pre-shipped HTTP Client Operator that could be used to achieve the same. Nevertheless, we use this simple example to demonstrate the Docker-based extensibility concept of the SAP Data Hub.
1. Create a Dockerfile
A Docker image is described by a Dockerfile (
https://docs.docker.com/engine/reference/builder/), which is a text document that contains all the os level commands required to assemble a Docker image. In the SAP Data Hub, all the pre-shipped and also the custom Dockerfiles are stored in a repository together with the operators and pipelines (graphs). In the following, you learn how to create a Dockerfile in the SAP Data Hub Pipeline Modeler which is later used in our custom operator.
To separate your Dockerfiles from the pre-shipped Dockerfiles, create an own root folder in the Docker Files section:
- Open the Repository tab in the SAP Data Hub Pipeline Modeler, navigate to the Docker Files section, right-click and click on Create Folder:
- Type in a Name for the folder, in our case we choose “acme” and click OK:
Next, create a subfolder in the root folder for categorizing your Dockerfile:
- Right-click on the previously created “acme” folder and click again Create Folder:
- Type in a Name for the folder (in our case we type “python” as we want to structure all Dockerfiles with Python in the same folder). Then click OK:
Now, we create a Dockerfile that will be used when running your custom operator.
- Right-click on the folder “python” and click on Create Docker File:
- Type in a name for the Docker File, in our case we type “requests” and click OK:
A new tab opens where you can describe the details of the Dockerfile.
- In the Code Editor, paste the following Dockerfile instructions:
# Use an official Python 3.6 image as a parent image
FROM python:3.6.4-slim-stretch
# Install python library "requests"
RUN pip install requests
# Install python library "tornado" (Only required with SAP Data Hub version >= 2.5)
RUN pip install tornado==5.0.2
- The FROM instruction initializes a new build stage and sets Python in version 3.6 as the base image for subsequent instructions.
- The RUN command installs the Python library requests with the Python package manager pip. This library is later used in our custom operator.
Next, provide tags for the Docker image to describe its properties:
- Open the Docker File Configuration Pane by clicking on the icon in the upper right corner:
- Add new Tags by clicking on the "+"-icon:
- Add the Tag "python36": We use this tag to declare that our Docker image includes Python version 3.6. We could also choose a different name and add the version to the corresponding version field on the rights side. However, this tag is used in the pre-shipped Python operator for which reason we choose the same naming convention.
- Add the Tag "python_requests": We use this tag to declare that the Python library requests is available in the Docker image.
- When you are using SAP Data Hub version >= 2.5, you also need to add the Tag "tornado" with version 5.0.2 as this is required by the updated Python Subengine.
- Save the Dockerfile by pressing [CTRL]+[S] or click on Save in the upper right corner:
- Build the Docker Image by clicking on the Build icon in the upper right side:
You can monitor the status of the Docker build process from the
Log tab in the bottom pane:
Once finished, the SAP Data Hub Pipeline pushes the image to the local Docker registry that was configured during installation of the SAP Data Hub Distributed Runtime.
2. Create a custom Operator using the Docker Image
2.1. Create the Operator
- Expand the Operators section in the Repository
- Right-click the folder "acme" that you have created in the previous tutorials and choose the Create Operator menu option:
- In the Name text field, provide the name "stock_price_reader" for the new operator.
- In the Display Name text field, provide the display name "Stock Price Reader" for the operator.
- In the Base Operator dropdown list, select the "Python3Operator":
The Python3Operator allows us to run inlined Python code or an attached Python script.
The tool opens the form-based
Operator Editor Window:
2.2. Define the Input and Output Ports
- Add an Input Port with the name "input" of type "string":
This port will be used later to pass a stock symbol to the operator and to trigger a request for the stock price.
- Add two Output Ports with the name "output" and "debug", both of the same type "string":
The "output" port will be used later to send the requested stock price and the "debug" port will be used to send debug messages, such as error messages.
2.3. Define Tags
The
Tags describe the runtime requirements of the operator and allow to force the execution in a specific Docker image instance whose
Docker file was annotated with the same
Tag and
Version.
In our case, we require Python version 3.6 and the Python library "requests" which is not included in the Python standard library. Both are provided by the Dockerfile which we have created before.
- In the Tags section, choose + (Add tag) and choose the tag “python_requests" and the tag "python36". Since there are no different versions available, we do not need to choose any version:
If the Python standard library would be sufficient, we could also skip adding additional Tags to our operator. Python 3.6 is already provided by the pre-shipped Python36 Docker image that is used by the Python3 Base Operator from which we derived our custom operator.
2.4. Provide the Operator Configuration
In the
Operator Configuration section, you can find already one Parameter "codelanguage" that was inherited from the Python3Operator. It is not possible to remove the inherited Parameters, but you can change their default values.
- Add three additional Parameters that we will later use to control the behavior of the operator during runtime:
Name |
Type |
Default-Value |
|
connection_timeout |
String |
5 |
This allows controlling the connection timeout in seconds |
request_interval |
String |
2 |
This allows controlling how often the stock price is requested |
stock_symbol |
String |
sap |
This allows controlling which stock price is requested |
You can generate (Auto Propose) a
Type from the Parameters that allows adding additional semantics on top, such as validation of the parameter values, UI helpers, the definition of Enums as well as re-use in other operators. However, we do not make use of this feature in this tutorial.
2.5. Define the Operator Script
In the
Script section, you can provide your own Python script in two different ways:
- Inline Editor: The code can be written directly into the Script Editor which stores the code together (inline) with the operator definition in the repository. This is the preferred way in cases where you only want to write small scripts that do not require external testing.
- Upload File: You can upload a Python script as a file which is then stored side-by-side with the Operator in the Repository and referenced in the Operator definition. This is the preferred way in cases where you plan to integrate more-complex application logic in Python language which shall also be testable externally, e.g. by accessing the Python script on disk.
In this example, we go for option (2) and upload a file containing a Python script.
- Click on the Inline Editor in the Script section and then click on Uploaded File in the drop-down menu:
This will by default reference and create a file
script.py in the Repository as shown on the right side of the Script editor. All the code written into the Script Editor will end up in this script which can be accessed from the Repository or disk.
In our case, we will create an own Python script locally and upload this to the Repository via the Script Editor.
- Open an Editor of your choice, e.g. Notepad, paste the following Python code and save it as a file called "stock_price_reader.py":
import requests
'''
Retreive latest stock price from public api
'''
def request_stock_price(stock_symbol,connection_timeout):
url = "https://api.iextrading.com/1.0/stock/%s/quote" % (stock_symbol)
r = requests.get(url, timeout=connection_timeout)
if r.status_code == 200:
quote = r.json()
return quote["symbol"] + "," + str(quote["latestPrice"]) + "," + str(quote["latestUpdate"])
else:
raise ValueError(r.content)
'''
Mock pipeline engine api to allow testing outside pipeline engine
'''
try:
api
except NameError:
class api:
def send(port,data):
print("Send data \"" + str(data) + "\" to port \"" + port + "\"")
def set_port_callback(port, callback):
print("Call \"" + callback.__name__ + "\" to simulate behavior when messages arrive at port \"" + port + "\"..")
callback()
def add_timer(interval, callback):
print("Call \"" + callback.__name__ + "\" to simulate behavior when timer calls the callback.")
callback()
class config:
stock_symbol = "sap"
connection_timeout = "5"
request_interval = "2"
'''
Interface for integrating the request_stock_price() function into the pipeline engine
'''
def interface(stock_symbol=None):
connection_timeout = float(api.config.connection_timeout)
try:
result = request_stock_price(stock_symbol,connection_timeout)
api.send("output", result)
except Exception as inst:
api.send("debug", str(inst) )
def timer_callback():
stock_symbol = api.config.stock_symbol
interface(stock_symbol)
# Triggers the request for every message (the message provides the stock_symbol)
api.set_port_callback("input", interface)
# Triggers the request autonomously every x seconds (The stock_symbol is read from the configuration)
if api.config.request_interval != 0:
api.add_timer(str(api.config.request_interval) + "s", timer_callback)
The script mocks the Python pipeline API, which allows to use and test the Python script externally. The wrapped function
request_stock_price does not contain any pipeline dependencies and represents our actual custom code.
- Click the upload button on the right side of the Script Editor:
- Choose the file "stock_price_reader.py" from your local disk where you have stored it, e.g.:
The Pipeline Modeler uploads the file to the Repository, references the script in the Operator definition and shows the content in the Script Editor:
When you now change the code in the Script Editor, it will be changed in the stock_price_reader.py file, although it appears like an inline script.
2.6. Modify the Operator Display Icon
A default operator display icon is used when you create a custom operator. You can change the icon within the tool or upload your own icon in Scalable Vector Graphics (SVG) format.
- In the Operator editor, click the operator's default icon:
- In the Icon dropdown list, select the wanted icon, in our case we choose "line-chart":
The tool uses the new icon for operators when it displays the operator in the Pipeline editor:
2.7. Maintain Documentation for the Operator
- In the operator editor toolbar, click the documentation icon:
- The documentation can be written in Markdown language, e.g.:
Stock Price Reader
===========
This operator reads the latest stock price of a given stock from a public API (https://iextrading.com/apps/stocks/#/)
Configuration parameters
------------
* **connection_timeout** (type int): The connection timeout in seconds
* **request_interval** (type int): Interval which defines how often the stock price is requested
* **stock_symbol** (type string): The symbol of the stock that shall be returned
Input
------------
* **input** (type string): Every message being send to the input port triggers a request of the given stock
Output
------------
* **output** (type string): The stock price in format <symbol>,<stock_price>,<last_update>
* **debug** (type string): Debug messages
- Click on Save to store the text.
2.8. Save the Operator:
- In the editor toolbar, click the Save-icon to save the operator:
3. Explore the Repository Content
Open the
System Management application (vsystem) in the browser.
- You can find the host and the TCP port by discovering the vsystem Service in the Kubernetes services, e.g via kubectl:
kubectl get services -n <namespace> | grep vsystem
- Open the File Management by clicking on the corresponding icon on the left side:
- In the View User Files tab, type the Name of the previously created operator "stock_price_reader" into the Search field:
The result shows all files that are stored together with the operator in the Repository.
From the same UI, you can download the corresponding files as .tgz-file via
Export Files and import the .tgz-file data via I
mport Files into another Data Hub instance.
4. Use the Operator in a Pipeline
In the previous tutorials, you have already learned how to create a pipeline by adding and customizing existing operators. In the following steps, you will learn how to define a pipeline by copying a pipeline JSON definition.
- In the navigation pane on the left side, choose the Graphs tab and click on the + icon (Create Graph) to create a new Pipeline:
- Open the JSON view of the Pipeline by clicking on the JSON button on the right side:
- Copy and paste the following JSON definition into the JSON editor:
{
"properties": {},
"description": "",
"processes": {
"terminal1": {
"component": "com.sap.util.terminal",
"metadata": {
"label": "Stock Price Terminal",
"x": 615,
"y": 182,
"height": 80,
"width": 120,
"ui": "dynpath",
"config": {}
}
},
"constantgenerator1": {
"component": "com.sap.util.constantGenerator",
"metadata": {
"label": "Request StockPrice",
"x": 209,
"y": 281,
"height": 80,
"width": 120,
"extensible": true,
"config": {
"mode": "pulse",
"content": "AAPL",
"duration": "2s"
}
}
},
"terminal2": {
"component": "com.sap.util.terminal",
"metadata": {
"label": "Debug Terminal",
"x": 611,
"y": 368,
"height": 80,
"width": 120,
"ui": "dynpath",
"config": {}
}
},
"stockpricereader1": {
"component": "acme.stock_price_reader",
"metadata": {
"label": "StockPrice Reader",
"x": 419,
"y": 281,
"height": 80,
"width": 120,
"extensible": true,
"config": {}
}
}
},
"groups": [],
"connections": [
{
"metadata": {
"points": "333,321 415,321"
},
"src": {
"port": "out",
"process": "constantgenerator1"
},
"tgt": {
"port": "input",
"process": "stockpricereader1"
}
},
{
"metadata": {
"points": "543,312 577,312 577,222 611,222"
},
"src": {
"port": "output",
"process": "stockpricereader1"
},
"tgt": {
"port": "in1",
"process": "terminal1"
}
},
{
"metadata": {
"points": "543,330 575,330 575,408 607,408"
},
"src": {
"port": "debug",
"process": "stockpricereader1"
},
"tgt": {
"port": "in1",
"process": "terminal2"
}
}
],
"inports": {},
"outports": {}
}
- Switch back to the Diagram view to see the rendered definition of the Pipeline that you just copied:
The pipeline uses the Stock Price Reader operator that you have created previously.
- Press [CTRL] + [S] or click on the disk icon to save the Pipeline:
Congratulations, you have reached the end of this tutorial.