
SAP AI Core is the AI Workload Management Solution on SAP BTP. Its the place to be for Machine Learning Engineers in the SAP domain and brings a bunch of out-of the box features supporting in training and serving machine learning scenarios. To strengthen the development workflow with AI Core, I will introduce an example of how to use a CI/CD pipeline in a multi-stage development environment. It aims to accelerate testing processes and enhance the stability of your production environment.
Anyone who starts developing machine learning content for AI Core knows the struggle: updating templates, building Docker containers, and testing the training or serving source code. This blog post provides an overview of how to minimize deployment time.
Operating within a three-stage landscape, our objective is to streamline the deployment of an end-to-end training/serving ML workflow. Each of the three instances resides in distinct subaccounts on BTP and is linked to dedicated object stores. Our deployment automation strategy revolves around crafting workflows in a development environment first, ensuring everything is fine-tuned before transitioning to production. We strictly segregate data between environments to enforce robust data protection measures.
I have worked with several of the aforementioned technologies, and each one has slight differences in how you execute scripts and manage environment variables. This blog demonstrates how to automate deployment on one platform, but the scripts available on GitHub can be adapted for use on other platforms.
Again to recap: The below are the main steps in building custom ML Solutions in AI Core and how we want to simplify it.
The very first step is to develop our training and serving code. This is done in a local environment using locally available test data stored in files.
The AI Core-specific task involves creating a YAML-based template to orchestrate our workload. This can be quite challenging and involves several steps to configure it as desired. In an upcoming blog post, I will provide a deep dive into template creation. Some important aspects to consider for production are using environment variables and secrets, accessing and writing files via the storage gateway to object stores, and specifying the infrastructure to use (resource plan, multiplicity, etc.).
The actual deployment involves pushing the template to a synchronized Git repository, creating artifacts, configurations, and then performing the respective deployment or execution. These tasks can be particularly cumbersome when executed manually step-by-step, especially when done through the user interface of the AI Launchpad.
The first important principle is to use the Git repository as the single source of truth for all source code-related items. We do this by specifying three branches, one for each stage. These branches are linked to the platform and represent its state. This approach ensures versioning of deployments and modifications in the source code, templates, and configurations, thereby eliminating mistakes.
The deployment process consists of many individual tasks, all of which can be summarized in a deployment script. We utilize the AI Core SDK because it is the most convenient way to interact with the RESTful API.
Most importantly, we use a deployment configuration file to precisely define how and what we want to deploy to the platform when executing our pipeline. This principle allows us to specify different configurations for multiple environments. The configuration file is defined in JSON, as many of the payloads are also JSON-based, making it more convenient compared to the widely used YAML.
It is required to have the Git-Sync enabled for the branches you want to deploy to. For my case this means:
Below is the rough configuration schema I use. Here, I specify all artifacts, executions, and deployments I want to be created upon deployment. Specifically for the executables, we need to define parameter and artifact bindings, paying attention to additional details. For example, the artifact may be enriched with a "key" field, which is then used to map it to the input artifact binding. The "wait for status" field determines at which status we should follow the logs, which can be convenient. All other fields visible, such as name, kind, URL, and scenario_id of an artifact, are the fields leveraged when creating the objects themselves.
The automation is designed to support the deployment of multiple objects and also to view the logs of multiple executables, though currently, the logs are viewed sequentially. Typically, I would use one executable in the configuration for rapid testing and then include the entire configuration when moving between stages. This approach speeds up script execution, and only the necessary deployment can be debugged as needed.
{
"artifacts": [
{
"key": "exampledataset",
"name": "Example Dataset",
"kind": "dataset",
"url": "ai://default/cicd",
"scenario_id": "cicdexample"
}
],
"executions": [
{
"configuration": {
"name": "Configuration CI/CD Example Training",
"scenario_id": "cicdexample",
"executable_id": "cicdexample",
"parameter_bindings": [
{
"key": "envexample",
"value": "test1"
}
],
"input_artifact_bindings": []
},
"wait_for_status": "COMPLETED"
}
],
"deployments": [
{
"configuration": {
"name": "Configuration CI/CD Example Serving",
"scenario_id": "cicdexample",
"executable_id": "cicdexample2",
"parameter_bindings": [
{
"key": "envexample",
"value": "test1"
}
],
"input_artifact_bindings": []
},
"wait_for_status": "RUNNING"
}
]
}
import os
import json
import time
import logging
from datetime import timedelta
from typing import List
from ai_core_sdk.ai_core_v2_client import AICoreV2Client
from ai_api_client_sdk.models.artifact import Artifact
from ai_api_client_sdk.models.parameter_binding import ParameterBinding
from ai_api_client_sdk.models.input_artifact_binding import InputArtifactBinding
from ai_api_client_sdk.models.target_status import TargetStatus
from ai_api_client_sdk.models.log_response import LogResultItem
from destinations import update_deployment_destination
logging.basicConfig(level=logging.INFO, format='%(message)s')
AICORE_AUTH_URL = os.environ["AICORE_AUTH_URL"]
AICORE_BASE_URL = os.environ["AICORE_BASE_URL"]
AICORE_CLIENT_ID = os.environ["AICORE_CLIENT_ID"]
AICORE_CLIENT_SECRET = os.environ["AICORE_CLIENT_SECRET"]
AICORE_RESOURCE_GROUP = os.environ["AICORE_RESOURCE_GROUP"]
def load_deployment_configuration():
"""load ai core deployment configuration file from json, file needs to be in the cicd folder"""
with open("cicd/config.json") as json_file:
configuration = json.load(json_file)
artifacts = configuration["artifacts"]
executions = configuration["executions"]
deployments = configuration["deployments"]
return artifacts, executions, deployments
def display_logs(logs: List[LogResultItem], filter_ai_core=True):
"""print logs and filter ai core platform logs starting with time="""
for log in logs:
if filter_ai_core and log.msg.startswith("time="):
continue
logging.info(f"{log.timestamp.isoformat()} {log.msg}")
def create_artifact(ai_api_v2_client: AICoreV2Client, artifact_b: Artifact):
"""create or find duplicate artifact from json configuration"""
available_artifacts = ai_api_v2_client.artifact.query()
for artifact_a in available_artifacts.resources:
if artifact_a.name == artifact_b["name"] and artifact_a.kind == Artifact.Kind(artifact_b["kind"]) and artifact_a.url == artifact_b["url"] and artifact_a.scenario_id == artifact_b["scenario_id"]:
# duplicate check to not fill up tenant
return artifact_a.id
artifact_response = ai_api_v2_client.artifact.create(artifact_b["name"], Artifact.Kind(artifact_b["kind"]), artifact_b["url"], artifact_b["scenario_id"])
return artifact_response.id
def configuration_to_string(configuration_object):
"""helper to dump config to json-string to compare nested values"""
configuration_dict = {}
configuration_dict["name"] = configuration_object["name"]
configuration_dict["scenario_id"] = configuration_object["scenario_id"]
configuration_dict["executable_id"] = configuration_object["executable_id"]
configuration_dict["parameter_bindings"] = [p.to_dict() for p in configuration_object["parameter_bindings"]]
configuration_dict["input_artifact_bindings"] = [p.to_dict() for p in configuration_object["input_artifact_bindings"]]
return json.dumps(configuration_dict, sort_keys=True)
def create_configuration(ai_api_v2_client: AICoreV2Client, configuration, artifacts):
"""create or find duplicate configuration"""
parameter_bindings = [ParameterBinding(e["key"], e["value"]) for e in configuration["parameter_bindings"]]
input_artifact_bindings = [InputArtifactBinding(e["key"], next(filter(lambda d: d["key"] == e["key"], artifacts))["id"]) for e in configuration["input_artifact_bindings"]]
available_configurations = ai_api_v2_client.configuration.query()
config = { "name": configuration["name"], "scenario_id": configuration["scenario_id"], "executable_id": configuration["executable_id"], "parameter_bindings": parameter_bindings, "input_artifact_bindings": input_artifact_bindings}
sconfig = configuration_to_string(config)
for aconfiguration in available_configurations.resources:
if configuration_to_string(aconfiguration.__dict__) == sconfig: # same configs
return aconfiguration.id
config_resp = ai_api_v2_client.configuration.create(**config)
return config_resp.id
def create_execution(ai_api_v2_client: AICoreV2Client, execution, artifacts):
"""create execution"""
config_id = create_configuration(ai_api_v2_client, execution["configuration"], artifacts)
execution_response = ai_api_v2_client.execution.create(config_id)
logging.info(f"CREATED EXECUTION {execution_response.id}")
return execution_response.id
def create_deployment(ai_api_v2_client: AICoreV2Client, deployment, artifacts):
"""create deployment"""
config_id = create_configuration(ai_api_v2_client, deployment["configuration"], artifacts)
deployment_response = ai_api_v2_client.deployment.create(config_id)
logging.info(f"CREATED DEPLOYMENT {deployment_response.id}")
return deployment_response.id
def executable_status(ai_api_v2_client: AICoreV2Client, executable, last_time):
"""get executable status"""
try:
if executable["type"] == "EXECUTION":
executable_object = ai_api_v2_client.execution.get(executable["id"])
else:
executable_object = ai_api_v2_client.deployment.get(executable["id"])
except:
return "UNKNOWN", [], last_time
status = executable_object.status.value
if not last_time:
start_time = executable_object.submission_time
else:
start_time = last_time + timedelta(seconds=1)
try:
if executable["type"] == "EXECUTION":
logs = ai_api_v2_client.execution.query_logs(executable["id"], start=start_time).data.result
else:
logs = ai_api_v2_client.deployment.query_logs(executable["id"], start=start_time).data.result
except:
return "UNKNOWN", [], last_time
new_last_time = logs[-1].timestamp if logs else last_time
return status, logs, new_last_time
def wait_on_executable_logs(ai_api_v2_client: AICoreV2Client, executable):
"""polling logs and displaying them to console until status is reached"""
logging.info("#"*55)
logging.info(f"""POLLING LOGS {executable["type"]} {executable["configuration"]["executable_id"]} {executable["id"]}""")
last_time = None
logs_started = False
reached_status = False
for _ in range(60):
status, logs, last_time = executable_status(ai_api_v2_client, executable, last_time)
if not logs_started and len(logs) < 1:
logging.info("POLLING LOGS")
else:
logs_started = True
display_logs(logs)
if status == executable["wait_for_status"]:
reached_status = True
break
if status == "DEAD":
break
if logs_started:
time.sleep(2)
else:
time.sleep(15) # sleep longer if not ready
return reached_status
def clean_up_tenant(ai_api_v2_client: AICoreV2Client):
"""gracefully clean up tenant from old instances, by stopping/deleting"""
old_deployments = ai_api_v2_client.deployment.query()
for deployment in old_deployments.resources:
try:
ai_api_v2_client.deployment.modify(deployment.id, TargetStatus.STOPPED)
except:
pass
try:
ai_api_v2_client.deployment.delete(deployment.id)
except:
pass
logging.info(f"DELETED DEPLOYMENT {deployment.id}")
old_executions = ai_api_v2_client.execution.query()
for execution in old_executions.resources:
try:
ai_api_v2_client.execution.delete(execution.id)
except:
pass
logging.info(f"DELETED EXECUTION {execution.id}")
def deploy(cleanup=True, wait_for_status=True, update_destination=True):
"""manage deployment of artifacts, executions and deployments from config file"""
logging.info(f"START DEPLOYING TO RESOURCE GROUP {AICORE_RESOURCE_GROUP}")
artifacts, executions, deployments = load_deployment_configuration()
ai_api_v2_client = AICoreV2Client(
base_url=AICORE_BASE_URL,
auth_url=AICORE_AUTH_URL + "/oauth/token",
client_id=AICORE_CLIENT_ID,
client_secret=AICORE_CLIENT_SECRET,
resource_group=AICORE_RESOURCE_GROUP
)
ai_api_v2_client.resource_groups.create(resource_group_id=AICORE_RESOURCE_GROUP)
logging.info(f"RESOURCE GROUP CREATED {AICORE_RESOURCE_GROUP}")
ai_api_v2_client.applications.refresh("felix-cicd")
for _ in range(60):
status = ai_api_v2_client.applications.get_status("felix-cicd")
if status.sync_status == "Synced":
break
time.sleep(2)
if cleanup:
clean_up_tenant(ai_api_v2_client)
for artifact in artifacts:
artifact["id"] = create_artifact(ai_api_v2_client, artifact)
for execution in executions:
execution["id"] = create_execution(ai_api_v2_client, execution, artifacts)
execution["type"] = "EXECUTION"
for deployment in deployments:
deployment["id"] = create_deployment(ai_api_v2_client, deployment, artifacts)
deployment["type"] = "DEPLOYMENT"
if wait_for_status:
for execution in executions:
wait_on_executable_logs(ai_api_v2_client, execution)
for deployment in deployments:
deployment["reached_status"] = wait_on_executable_logs(ai_api_v2_client, deployment)
if update_destination:
for deployment in deployments:
if deployment["wait_for_status"] and deployment["reached_status"]:
update_deployment_destination(deployment["destination_name"], deployment["id"])
if __name__ == "__main__":
deploy()
At a high level, the script progresses through three phases.
The first phase involves manually syncing with the Git repository to ensure all new templates are up to date.
Next, the script proceeds to create executions and deployments one by one using the configuration JSON file.
Finally, it monitors and reports on the status of these executions and deployments, providing an output of the logs.
Reviewing logs in AI Launchpad can be a tedious task. I find it much more satisfying to have the logs displayed in proper order within a console environment.
The code for the pipeline supports several optional features. For instance, it can clean up the tenant by deleting all previous deployments and executions, or it can prevent duplicate artifact or configuration creations. This is particularly useful when deploying fixes multiple times, as creating individual new configurations can clutter the resource group and diminish usability.
The CI/CD Script shown can be run locally, but maximises its utility deployed on a CI/CD Platform. In my demonstration, I'll illustrate how to set it up with GitHub Actions.
An important feature we rely on to determine which tenant to deploy the content to is environment secrets, typically created in the repository settings. For my example, I've configured secrets for three branches, and the CI pipeline will run upon commits to these branches. Consequently, pushing to the dev branch triggers deployment to the development environment, initiating a pull request from dev to test results in deployment to the test environment upon completion, and vice versa for the production environment.
Additionally, we can incorporate steps for unit/integration testing and approvals.
In action, upon pushing my local changes, the pipeline is triggered, and within approximately 20 seconds, the changes are deployed and scheduled for execution.
Here's how the repository will be structured:
.github/
workflows/
cicd.yml
cicd/
config.json
pipeline.py
requirements.txt
templates/
serve.yaml
train.yaml
Dockerfile
deploy.ps1
requirements.txt
serve.py
train.py
The project structure includes a .github directory for the pipeline's YAML markup, a cicd directory containing the pipeline code, configuration.json, and a requirements.txt file specifying additional dependencies. The templates folder holds WorkflowTemplate and ServingTemplate markup, synced with AI Core. At the root level, Python files and a Dockerfile are present as the source code, with potential for additional structuring as the project grows.
name: AI_CORE_DEPLOY_ON_PUSH
on:
# Triggers the workflow on push or pull request events but only for the main branch
push:
branches: [ main, tst, prd ]
pull_request:
branches: [ main, tst, prd ]
# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:
jobs:
deploy-dev:
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
environment:
name: main
env:
AICORE_AUTH_URL: '${{ secrets.AICORE_AUTH_URL }}'
AICORE_CLIENT_ID: '${{ secrets.AICORE_CLIENT_ID }}'
AICORE_CLIENT_SECRET: '${{ secrets.AICORE_CLIENT_SECRET }}'
AICORE_RESOURCE_GROUP: '${{ secrets.AICORE_RESOURCE_GROUP }}'
AICORE_BASE_URL: '${{ secrets.AICORE_BASE_URL }}'
steps:
- uses: actions/checkout@v4.1.5
- uses: actions/setup-python@v5.1.0
- name: Run pip install
run: pip install -r ./cicd/requirements.txt
- name: Run pipeline script
run: |
python ./cicd/pipeline.py
deploy-tst:
if: github.ref == 'refs/heads/tst'
runs-on: ubuntu-latest
environment:
name: tst
env:
AICORE_AUTH_URL: '${{ secrets.AICORE_AUTH_URL }}'
AICORE_CLIENT_ID: '${{ secrets.AICORE_CLIENT_ID }}'
AICORE_CLIENT_SECRET: '${{ secrets.AICORE_CLIENT_SECRET }}'
AICORE_RESOURCE_GROUP: '${{ secrets.AICORE_RESOURCE_GROUP }}'
AICORE_BASE_URL: '${{ secrets.AICORE_BASE_URL }}'
steps:
- uses: actions/checkout@v4.1.5
- uses: actions/setup-python@v5.1.0
- name: Run pip install
run: pip install -r ./cicd/requirements.txt
- name: Run pipeline script
run: |
python ./cicd/pipeline.py
deploy-prd:
if: github.ref == 'refs/heads/prd'
runs-on: ubuntu-latest
environment:
name: prd
env:
AICORE_AUTH_URL: '${{ secrets.AICORE_AUTH_URL }}'
AICORE_CLIENT_ID: '${{ secrets.AICORE_CLIENT_ID }}'
AICORE_CLIENT_SECRET: '${{ secrets.AICORE_CLIENT_SECRET }}'
AICORE_RESOURCE_GROUP: '${{ secrets.AICORE_RESOURCE_GROUP }}'
AICORE_BASE_URL: '${{ secrets.AICORE_BASE_URL }}'
steps:
- uses: actions/checkout@v4.1.5
- uses: actions/setup-python@v5.1.0
- name: Run pip install
run: pip install -r ./cicd/requirements.txt
- name: Run pipeline script
run: |
python ./cicd/pipeline.py
The YAML configuration at the end can be adjusted as required. For this blog example, I utilize environment variables created as secrets in three environments (dev, tst, prd) and execute identical steps. Leveraging the GitHub Actions ubuntu-latest image, we employ checkout to retrieve the code, setup-python to establish a valid Python environment, and then execute custom commands. This includes pip install to install pipeline dependencies and the execution of our pipeline script. Jobs are triggered upon a push to any of the specified branches.
A final note on facilitating multi-team and project collaboration through this workflow: AI Core provides developers with the opportunity to segregate teams' work using resource groups. I highly recommend leveraging this approach for the CI setup as well. To maintain efficiency, I suggest having an individual instance of a CI Pipeline per resource group. This allows teams to make changes independently and have different deployment schedules. Ultimately, it's as simple as creating a new Git repository and setting up the three branches to connect to another set of resource groups in the environment.
Hope this blog post gave you an idea on how to make use of CI Pipelines in SAP AI Core! Find all the shown code pieces in one harmonized Github repository for you to try out. Feel free to leave a comment!
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.
User | Count |
---|---|
21 | |
21 | |
20 | |
20 | |
14 | |
9 | |
9 | |
9 | |
7 | |
7 |