Developing AI applications for the cloud can be a time-consuming process, especially when it comes to testing and deploying code changes. As an SAP AI Core developer, I've faced the challenge of packaging my code into a Docker image, pushing it to Docker Hub, and then triggering a kserve deployment on SAP AI Core to test my changes on GPU-powered hardware. This process can take upwards of 10 minutes for each iteration, not to mention the need to adjust my Postman settings due to a new URL with every deployment.
To overcome this bottleneck, I've devised a simple solution that has saved me countless hours in testing and development. The key to this solution is a main.py program that runs my actual Python program as a subprocess. This setup allows me to upload the source code of my program via a web service call, after which main.py will restart, test the new program for healthiness, and run it. If the new code contains a bug, the system automatically reverts to the previous working version, much like over-the-air (OTA) updates for mobile phone firmware.
While CI/CD pipelines are valuable, they don't eliminate the waiting time associated with deployment. You would still need to wait for a new instance of AI Core's kserve to become available after which you need to adjust the postman settings. Moreover, complex models require download of e.g. model weights that can be in the GBs. Since there are no PVCs (persistence in K8S) this is a waste of resources, too.
My approach, on the other hand, has effectively reduced my development cycle from minutes to seconds saving hours in total keeping the existing environment with model setups stable.
The main.py script serves as the orchestrator. It starts the FastAPI application, listens for update signals, and performs health checks. When an update is posted to the /v1/update endpoint, main.py receives a signal to restart the application with the new code. If the health check fails, it restores the last working version from a backup.
The FastAPI application defined in app.py includes the /v1/update endpoint to receive the new code and the /health endpoint for health checks. The actual AI workload, such as model inference, would be implemented in additional endpoints within app.py.
I'm sharing the minimal set of main.py and app.py with you below.
import os, subprocess, signal, time, shutil, requests, logging
from signal import Signals
# Set the current working directory to the directory where main.py is located
os.chdir(os.path.dirname(os.path.abspath(__file__)))
app_process = None
backup_file = "app_backup.py"
app_file = "app.py"
host_port = int(os.environ.get("HOST_PORT",8085))
health_check_url = f"http://localhost:{str(host_port)}/health"
LOGLEVEL = int(os.environ.get("APPLOGLEVEL", logging.WARN))
logging.basicConfig(level=LOGLEVEL)
LOGGER = logging.getLogger(__name__)
def start_app():
""" Starts the workload in a subprocess """
global app_process
app_process = subprocess.Popen(['python', app_file])
def restart_app():
""" Handles app restarts after update or error """
global app_process
if app_process:
# Terminate the current app process
app_process.terminate()
app_process.wait()
start_app()
def health_check()->bool:
""" Implement a health check by making a request to the health check endpoint """
try:
LOGGER.info(f"Conducting health check on {health_check_url}...")
response = requests.get(health_check_url)
if response.status_code == 200:
LOGGER.info(f"Health check successful.")
return True
else: LOGGER.warning(f"Response code was {response.status_code}")
except requests.RequestException as e:
LOGGER.error(f"Health check failed: {e}")
return False
def signal_handler(signum: Signals, _):
""" Handle signals to restart the app """
if signum == signal.SIGUSR1:
LOGGER.info("Received signal to restart app")
restart_app()
not_up = True
loop_count = 1
while not_up and loop_count < 5:
time.sleep(3) # Give the app some time to start
if health_check():
not_up = False
else:
LOGGER.warning("Retrying health check.")
loop_count += 1
if not_up:
# Health check failed, restore the backup
LOGGER.info("Restoring the last working version")
shutil.copy(backup_file, app_file)
restart_app()
else:
# Update was successful, backup the new working version
LOGGER.info("New version running. Backing up.")
shutil.copy(app_file, backup_file)
def main()->None:
""" Main routine to start the framework """
# Register the signal handler
signal.signal(signal.SIGUSR1, signal_handler)
# Make an initial backup of the app
shutil.copy(app_file, backup_file)
LOGGER.info("Main routine started. Starting application.")
start_app()
# Keep the main program running
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
if app_process:
app_process.terminate()
if __name__ == "__main__":
main()
As for the app.py:
from fastapi import FastAPI, HTTPException, Depends, UploadFile, File
from fastapi.responses import FileResponse, StreamingResponse
from io import BytesIO
import uvicorn
import torch # Just an example for the AI workload you create
from typing import Optional
import re, os, logging, shutil, signal
LOGLEVEL = int(os.environ.get("APPLOGLEVEL", logging.WARN))
logging.basicConfig(level=LOGLEVEL)
LOGGER = logging.getLogger(__name__)
# Check for cuda availability
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
LOGGER.info(f"Computing unit detected: {DEVICE}")
# Initialize FastAPI app
app = FastAPI()
backup_file = 'app_backup.py'
app_file = 'app.py'
@app.post("/v1/update")
async def update(file: UploadFile = File(...)):
contents = await file.read()
# Save a backup of the current working version
shutil.copy(app_file, backup_file)
# Write the new code to the app.py file
with open(app_file, "wb") as f:
f.write(contents)
# Send a signal to the parent process to restart the app
os.kill(os.getppid(), signal.SIGUSR1)
return {"status": f"directory is {os.path.dirname(os.path.abspath(__file__))} - App update received, restarting."}
@app.get("/health")
async def health_check():
""" Health check for internal call only, can't be reached from outside """
return {"status": "healthy"}
# Below you put your actual program code e.g. like that>>>>>>
@app.post("/v1/inference")
async def my_model_inference():
return {"status": "Nothing here yet"}
# <<<<<<<< ... Add as much endpoints and logic as needed.
def main()->None:
host_port = int(os.environ.get("HOST_PORT",8085))
LOGGER.error("Starting application")
uvicorn.run(app, host="0.0.0.0", port=host_port)
# Run the server using Uvicorn
if __name__ == "__main__":
main()
Once you have this packed into a Docker image and deployed on AI Core you can start your development work. New code is uploaded through Postman or Insomnia (which I use) like so:
Just set the file parameter to your app.py once. If you want to test your development change, just press [Send] and wait a few seconds. You can then test your code change in AI Core. If there was a bug it will automatically revert and you can see the bug in the AI Core's log.
By implementing this dynamic code update mechanism, I've been able to focus more on developing and refining AI models rather than waiting for deployments. This has not only accelerated my development process but also allowed for a more iterative and responsive approach to building AI applications on SAP AI Core. Let me know what you think!
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.
User | Count |
---|---|
31 | |
13 | |
10 | |
10 | |
10 | |
9 | |
9 | |
9 | |
9 | |
7 |