The example below should be straight forward for you to modify for many python use cases. There's only really a couple of steps, create a docker (if you need additional Python libraries), configure the Python operator, code, plus input and outputs.
Building a docker
There is a great
existing blog that describes how to create a simple docker, so I won't repeat that here. Below you can see my docker definition.
# Use an official Python 3.6 image as a parent image
FROM python:3.6.4-slim-stretch
# Install python libraries
RUN pip install pandas
RUN pip install tornado==5.0.2
# Add vflow user and vflow group to prevent error
# container has runAsNonRoot and image will run as root
RUN groupadd -g 1972 vflow && useradd -g 1972 -u 1972 -m vflow
USER 1972:1972
WORKDIR /home/vflow
ENV HOME=/home/vflow
Lets take the pipeline that
we previously developed but now we will switch the JavaScript for Python.
Placing the Python3Operator on the canvas, shows no inputs and no outputs, for most pipelines you would want to modify this. The above JavaScript operator has an input called input(message) and an output called output(message), we would need something similar for Python.
I found acquiring the data into Python as a blob to be the easiest, as I had experienced character encoding issues, using the blob data type avoided this issue. The HTTP Client provides a blob output, which we will connect to.
We want the output of the python operator to be a message so that we can stop the pipeline running as before.
Now we have a Python operator with our input and output defined
Here's the Python3 code that I used within the operator, the code is equivalent to the JavaScript example I shared previously
import pandas as pd
from io import BytesIO
def on_input(data):
# Acquire Data as Bytes
dataio = BytesIO(data)
# Load data into Pandas Data Frame, skipping 5 rows
df = pd.read_table(dataio, sep=',',skiprows=5, encoding='latin1', names=['ER_DATE','EXCHANGE_RATE'])
# Replace the "-" characters with Null
df['EXCHANGE_RATE'].replace('-', None, inplace=True)
df = df.to_csv(index=False,header=False)
# Create a DH Message - Data Hub api.Message
attr = dict()
attr["message.commit.token"] = "stop-token"
messageout = api.Message(body=df, attributes=attr)
api.send("outmsg", messageout)
api.set_port_callback("input", on_input)
The easiest way I found to specify that my Python3Operator should use the pandas docker image, was to use the "Group" feature. We can then tag the group with the same tags as my docker to link them both together. Just right click on the python operator and choose Group. Now we can see the tags.
With that the pipeline is completed, we can save it (with a new name) and run it.
All being well, the pipeline should complete and we will see the same data as before.
Here's a couple of links you may want to refer to.
Develop a custom Pipeline Operator with own Dockerfile
Automating Web Data Acquisition With SAP Data Intelligence
Hope it was useful for someone.
🙂
Thanks, Ian.