Artificial Intelligence Blogs Posts
cancel
Showing results for 
Search instead for 
Did you mean: 
Kunal__Kumar
Product and Topic Expert
Product and Topic Expert
1,090

With the focous on bussiness AI scenarios and high throughput applications, fastapi has come to be the de facto for standard python developement due to it's native support for async I/0 and pydantic based validations ensuring type safety. However, deploying fastapi applications to SAP BTP CF enviroment gives us issues with observalibity.

 

the standard library provided by SAP, sap-cf-logging relies heavily on WSGI middleware and thread-local storage to manage requests contexts. in ASGI enviroment, a single thread can manage multiple concurrent requests via an event loop. 

relying on thread-local storage in this enviroment can lead to Context Leakage where logs from one Request A appear with Coorelation id of Request B, or worse context is lost entirely due to await calls.

 

to ensure this doesn't occur in our application and observalibity is not effected in Kibana without compromising on non-blocking operations, there needed to be a custom solution using python contextvars

 

Architecture: Context-Local vs Thread-Local

to maintain the integrity of correlation id across async boundaries, there was a need for moving away from global state and implement context aware logger. this has mainly three components:-

 

  1. context variables: to hold the state safley across asyncio event loop
  2. custom json formatter: to serialize logs into strict JSON format required by SAP cloud logging
  3. ASGI Middleware: to handle lifecycle management of the correlation id

 

1. Async-safe Context

first we need to define a ContextVar, unlike threading.local , contextvars are natively supported by asyncio. when a task awaits a coroutine then context is preserved for that specific chain of execution, ensuring concurrent reqeusts never corrupt each other's state.

 

loggger.py

import logging
import sys
import json
import uuid
from contextvars import ContextVar
from typing import Optional

# ContextVar to store Correlation ID safely (Async safe!)
# This works per-task, ensuring concurrent requests preserve their unique ID.
correlation_id_ctx: ContextVar[Optional[str]] = ContextVar("correlation_id", default=None)

 

2. JSON Formatter

SAP BTP's log ingestion pipeline expects specifc fields (mgs, level, correlation_id, written_at) to index logs correctly in Kibana. we need to extend the standard python logging.Formatter to intercept every log record adn inject the current context dynamically.

 

logger.py

class JSONFormatter(logging.Formatter):
    """
    Custom formatter that outputs logs as a JSON string.
    Automatically injects the Correlation ID from the async context.
    """
    def format(self, record):
        # Retrieve the current correlation ID (or None if outside a request)
        cid = correlation_id_ctx.get()

        # Build the structured log dictionary required by SAP Cloud Logging
        log_record = {
            "level": record.levelname,
            "msg": record.getMessage(),
            "logger": record.name,
            "correlation_id": cid,
            "written_at": self.formatTime(record, self.datefmt),
        }

        # Include exception info if present (essential for stack traces in Kibana)
        if record.exc_info:
            log_record["exc_info"] = self.formatException(record.exc_info)

        return json.dumps(log_record)

def setup_logging():
    logger = logging.getLogger()
    if logger.handlers:
        logger.handlers = []
    
    # Stream to stdout is crucial for Cloud Foundry loggregator
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(JSONFormatter())
    
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)

 

3. Middleware injection

we need to implement a BaseHTTPMiddleware to intercept every incoming request. this middleware handles the extraction of the X-CorrelationID header or generates a new UUID V4 if one is missing.

 

logger.py

from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware

class CorrelationMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        # 1. Extract ID from header OR generate a new one
        correlation_id = request.headers.get("X-CorrelationID") or str(uuid.uuid4())
        
        # 2. Set the ID in the ContextVar and capture the Token
        token = correlation_id_ctx.set(correlation_id)
        
        try:
            # 3. Process the request
            response = await call_next(request)
            
            # 4. Propagate the ID back to the client (UI) via headers
            response.headers["X-CorrelationID"] = correlation_id
            return response
            
        finally:
            # 5. Clean up: Reset the context to prevent leakage in thread pooling
            correlation_id_ctx.reset(token)

 

4. "Background-Task" Conundrum

a specfic challenge encountered was handlign FASTAPI BackgroundTasks. since background tasks run after the HTTP reponse is returned, the middleware describes above has already finished execution and called reset(token). consequence being that background tasks start wtih an empty context, causing logs to show correlation_id: null

to solve this, we need to implementa Context Injection Pattern. we need to explicitily capture the ID while the requrst is still active and pass it to a wrapper function that re-hydrates the context inside the background thread.

 

api.py

_router.post("/extract")
def extract(data: ExtractRequest, background_tasks: BackgroundTasks):
    # Capture the ID from the current context before the response is sent
    current_cid = correlation_id_ctx.get()
    
    # Pass it explicitly to the background task wrapper
    background_tasks.add_task(process_contract_sync, data.contract_id, current_cid)
    
    return {"message": f"Extracting data for contract ID: {data.contract_id}"}

def process_contract_sync(contract_id: str, correlation_id: str):
    # RE-INJECT the ID into the context for this isolated thread
    token = correlation_id_ctx.set(correlation_id)
    
    try:
        # Run the business logic with full logging context
        asyncio.run(process_contract(contract_id))
    except Exception as e:
        logging.exception(f"Critical failure in background task for {contract_id}")
    finally:
        # Strict cleanup using the token
        correlation_id_ctx.reset(token)

 

Conclusion

by leaving syncchronous sap-cf-logging library in favour of a native contextvars approach, we acheived:

1. full traceability: every log entry, including those in disconnected background tasks is tagged with a consistent Correlation ID

2. Stability: eliminated "socket hang up" errors caused by WSGI middleware incompabilties

3. Observability: logs appear in Kibana as structured JSON, allowing for deep filtering on specific correlation id or error states.

 

this approach offers a blueprint for any team aiming to utilize FastAPI alongside the strict enterprise observability requirements of SAP BTP.

1 Comment