Technology Blogs by SAP
Learn how to extend and personalize SAP applications. Follow the SAP technology blog for insights into SAP BTP, ABAP, SAP Analytics Cloud, SAP HANA, and more.
cancel
Showing results for 
Search instead for 
Did you mean: 
Gunter
Product and Topic Expert
Product and Topic Expert
671

スクリーンショット 2024-06-06 110948.png

The Waiting Game: Deployment Delays in Cloud AI

Developing AI applications for the cloud can be a time-consuming process, especially when it comes to testing and deploying code changes. As an SAP AI Core developer, I've faced the challenge of packaging my code into a Docker image, pushing it to Docker Hub, and then triggering a kserve deployment on SAP AI Core to test my changes on GPU-powered hardware. This process can take upwards of 10 minutes for each iteration, not to mention the need to adjust my Postman settings due to a new URL with every deployment.

Beyond CI/CD: Real-Time Code Deployment with main.py

To overcome this bottleneck, I've devised a simple solution that has saved me countless hours in testing and development. The key to this solution is a main.py program that runs my actual Python program as a subprocess. This setup allows me to upload the source code of my program via a web service call, after which main.py will restart, test the new program for healthiness, and run it. If the new code contains a bug, the system automatically reverts to the previous working version, much like over-the-air (OTA) updates for mobile phone firmware.

While CI/CD pipelines are valuable, they don't eliminate the waiting time associated with deployment. You would still need to wait for a new instance of AI Core's kserve to become available after which you need to adjust the postman settings. Moreover, complex models require download of e.g. model weights that can be in the GBs. Since there are no PVCs (persistence in K8S) this is a waste of resources, too.

My approach, on the other hand, has effectively reduced my development cycle from minutes to seconds saving hours in total keeping the existing environment with model setups stable.

How It Works

The main.py script serves as the orchestrator. It starts the FastAPI application, listens for update signals, and performs health checks. When an update is posted to the /v1/update endpoint, main.py receives a signal to restart the application with the new code. If the health check fails, it restores the last working version from a backup.

The FastAPI application defined in app.py includes the /v1/update endpoint to receive the new code and the /health endpoint for health checks. The actual AI workload, such as model inference, would be implemented in additional endpoints within app.py.

Implementation template

I'm sharing the minimal set of main.py and app.py with you below.

 

import os, subprocess, signal, time, shutil, requests, logging
from signal import Signals

# Set the current working directory to the directory where main.py is located
os.chdir(os.path.dirname(os.path.abspath(__file__)))

app_process = None
backup_file = "app_backup.py"
app_file = "app.py"
host_port = int(os.environ.get("HOST_PORT",8085)) 
health_check_url = f"http://localhost:{str(host_port)}/health"

LOGLEVEL = int(os.environ.get("APPLOGLEVEL", logging.WARN))
logging.basicConfig(level=LOGLEVEL)
LOGGER = logging.getLogger(__name__)

def start_app():
    """ Starts the workload in a subprocess """
    global app_process
    app_process = subprocess.Popen(['python', app_file])
        

def restart_app():
    """ Handles app restarts after update or error """
    global app_process
    if app_process:
        # Terminate the current app process
        app_process.terminate()
        app_process.wait()
    start_app()
    

def health_check()->bool:
    """ Implement a health check by making a request to the health check endpoint """
    try:
        LOGGER.info(f"Conducting health check on {health_check_url}...")
        response = requests.get(health_check_url)
        if response.status_code == 200:
            LOGGER.info(f"Health check successful.")
            return True
        else: LOGGER.warning(f"Response code was {response.status_code}")
    except requests.RequestException as e:
        LOGGER.error(f"Health check failed: {e}")
    return False


def signal_handler(signum: Signals, _):
    """ Handle signals to restart the app """
    if signum == signal.SIGUSR1:
        LOGGER.info("Received signal to restart app")
        restart_app()
        not_up = True
        loop_count = 1
        while not_up and loop_count < 5:  
            time.sleep(3)  # Give the app some time to start 
            if health_check():
                not_up = False
            else:
                LOGGER.warning("Retrying health check.")        
            loop_count += 1
            
        if not_up:           
            # Health check failed, restore the backup
            LOGGER.info("Restoring the last working version")
            shutil.copy(backup_file, app_file)
            restart_app()
        else:
            # Update was successful, backup the new working version
            LOGGER.info("New version running. Backing up.")
            shutil.copy(app_file, backup_file)


def main()->None:
    """ Main routine to start the framework """
    # Register the signal handler
    signal.signal(signal.SIGUSR1, signal_handler)
    # Make an initial backup of the app
    shutil.copy(app_file, backup_file)
    LOGGER.info("Main routine started. Starting application.")
    start_app()

    # Keep the main program running
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        pass
    finally:
        if app_process:
            app_process.terminate()
            
if __name__ == "__main__":
    main()

 

As for the app.py:

 

from fastapi import FastAPI, HTTPException, Depends, UploadFile, File
from fastapi.responses import FileResponse, StreamingResponse
from io import BytesIO
import uvicorn

import torch # Just an example for the AI workload you create

from typing import Optional

import re, os, logging, shutil, signal

LOGLEVEL = int(os.environ.get("APPLOGLEVEL", logging.WARN))
logging.basicConfig(level=LOGLEVEL)
LOGGER = logging.getLogger(__name__)

# Check for cuda availability
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
LOGGER.info(f"Computing unit detected: {DEVICE}")

    
# Initialize FastAPI app
app = FastAPI()
backup_file = 'app_backup.py'
app_file = 'app.py'

@app.post("/v1/update")
async def update(file: UploadFile = File(...)):
    contents = await file.read()
    # Save a backup of the current working version
    shutil.copy(app_file, backup_file)
    # Write the new code to the app.py file
    with open(app_file, "wb") as f:
        f.write(contents)
    # Send a signal to the parent process to restart the app
    os.kill(os.getppid(), signal.SIGUSR1)
    return {"status": f"directory is {os.path.dirname(os.path.abspath(__file__))} - App update received, restarting."}


@app.get("/health")
async def health_check():
	""" Health check for internal call only, can't be reached from outside """
    return {"status": "healthy"}
	

# Below you put your actual program code e.g. like that>>>>>>
@app.post("/v1/inference")
async def my_model_inference():
	return {"status": "Nothing here yet"}
	
# <<<<<<<< ... Add as much endpoints and logic as needed.

def main()->None:
    host_port = int(os.environ.get("HOST_PORT",8085))
    LOGGER.error("Starting application")
    uvicorn.run(app, host="0.0.0.0", port=host_port)

# Run the server using Uvicorn
if __name__ == "__main__":
    main()

 

Once you have this packed into a Docker image and deployed on AI Core you can start your development work. New code is uploaded through Postman or Insomnia (which I use) like so:

スクリーンショット 2024-06-06 103816.png

Just set the file parameter to your app.py once. If you want to test your development change, just press [Send] and wait a few seconds. You can then test your code change in AI Core. If there was a bug it will automatically revert and you can see the bug in the AI Core's log.

スクリーンショット 2024-06-06 104341.png

Benefits of This Approach

  • Rapid Iteration: Code changes can be tested almost immediately, without the need to rebuild and redeploy Docker images.
  • Resilience: If an update introduces a bug, the system automatically reverts to the last known good state, ensuring stability.
  • Efficiency: This method significantly reduces the time spent waiting for cloud resources to provision and services to start up.
  • Resource saving: No need to reiterate through gigabytes of model weights download or such.
  • Security: The security concept remains unchanged with OAuth2 in front of every API call. For a production deployment the update function could be removed (as a one-time OTA update 😉).

Conclusion

スクリーンショット 2024-06-06 111632.png

By implementing this dynamic code update mechanism, I've been able to focus more on developing and refining AI models rather than waiting for deployments. This has not only accelerated my development process but also allowed for a more iterative and responsive approach to building AI applications on SAP AI Core. Let me know what you think!

1 Comment