Technology Blogs by SAP
Learn how to extend and personalize SAP applications. Follow the SAP technology blog for insights into SAP BTP, ABAP, SAP Analytics Cloud, SAP HANA, and more.
Showing results for 
Search instead for 
Did you mean: 
The SAP Data Hub pipeline engine offers the possibility to develop custom operators in Python, JavaScript, Go, R and more languages. But, developing in a local environment offers a lot of great features, you don´t want to miss in your daily work:

  • Code checks and completion: SAP Data Hub provides a basic text editor – of course it is not a full IDE.

  • Immediate startup: A pipeline runs as a dynamic docker container, depending on your configuration also individual operators. To get your code pipeline-ready, start with a local development and reduce the “trial and error” due to coding flaws.

  • Error messages: Easily access full error messages including stack.

  • Debugging: Debugging the code of your operator with breakpoints.

  • Unit tests: Automate the testing of your operator (development operations).

  • ..

Keeping this in mind, we started to develop a mock of the pipeline engine. This offers the possibility to develop and test your code locally in the development environment of your choice with all those features making a great developer experience. When you´re done, you just copy and paste the code into the SAP Data Hub pipeline and it works. With this we were able to fasten our development speed in recent projects. But how is this done?

As a starting point, we based our extensions on the code from Jens Rannacher's blog series. We generalized the mock engine and added some more interfaces also provided by the pipeline engine api in SAP Data Hub. Below you can find the code of the "mock pipeline engine" implemented in Python with a small usage example. Feel free to develop a similar local simulation in other languages and share them with the community.

import json

except NameError:
class api:
def send(port, data):
print("Send data '%s' to '%s'" % (str(data), port))

def set_port_callback(port, callback):
print("Call '%s' to simulate behavior when messages arrive at port '%s'" % (callback.__name__, port))

def add_timer(interval, callback):
print("Call '%s' to simulate behavior when timer calls the callback." % (callback.__name__))

def Message(body, attributes):
return Message(body, attributes)

class config:
env = 'local' # used to detect the local env
your_config_var = 'value'

class logger:
def info(s):
print("info: %s" % (s))

def debug(s):
print("debug: %s" % (s))

def warn(s):
print("warn: %s" % (s))

def error(s):
print("error: %s" % (s))

class Message:
def __init__(self, body, attributes):
self.body = body
self.attributes = attributes

def __str__(self):
return json.dumps({'attributes': self.attributes, 'body': str(self.body)})

your operator coding
counter = 0

def interface(msg=None):"received a message")"incrementing the counter")
global counter
counter += 1"sending the current message count to the output")
api.send("result", counter)

Starting the operator
api.set_port_callback("input", interface)

# detecting environment (local vs. vflow)
print("Operator started in local environment")

# mock an incoming message from a previous operator
msg = api.Message("message", {})
except AttributeError:
print("Operator started in productive mode")