Technology Blog Posts by SAP
Learn how to extend and personalize SAP applications. Follow the SAP technology blog for insights into SAP BTP, ABAP, SAP Analytics Cloud, SAP HANA, and more.
cancel
Showing results for 
Search instead for 
Did you mean: 
carlosbasto
Product and Topic Expert
Product and Topic Expert
2,182

In our previous blogs (Building an Agentic AI System with SAP Generative AI Hub and Building an Agentic AI System with Model Context Protocol (MCP) and SAP BTP Kyma), we showed how to build and scale an AI assistant using SAP Generative AI Hub, with tool orchestration powered by Model Context Protocol (MCP) and containerized deployment in SAP BTP Kyma. The result was a modular, cloud-native AI system capable of tool reasoning, retrieval-augmented generation (RAG), and dynamic responses.

But enterprise AI needs are growing fast. As we try to scale AI to handle diverse domains (weather, planning, ERP insights, document summarization, and more) it becomes difficult for a single assistant to do it all.

That’s where the Agent2Agent (A2A) Protocol enters the picture.

Recently introduced by Google, A2A is an open protocol for communication between agents, enabling agents built on different frameworks, hosted across different platforms, and owned by different teams or vendors to collaborate securely and interoperably.

Each agent describes its capabilities in an Agent Card, exposes a standard set of endpoints, and communicates via a shared schema. A2A allows agents to discover each other, delegate tasks, share artifacts, and stream real-time updates.

In this blog, we’ll walk you through how we extended our SAP-based assistant to speak A2A, making it capable of interacting with remote agents that live anywhere: in your own infrastructure, another team's container, or even another vendor’s platform. 

Let’s take a look at the full solution architecture.

 

A2A Agent Architecture on SAP BTP with MCP Tooling

The diagram below illustrates how we structured the solution to enable Agent2Agent (A2A) protocol-based communication within my AI assistant ecosystem. The focus of this setup is to configure and orchestrate interoperable agents (not to build a front-end application), making it broadly adaptable to other enterprise or research scenarios requiring multi-agent collaboration.

carlosbasto_0-1746447273935.png

Key Components

Front-End + Host Agent

A lightweight user interface captures user input and routes it to a Host Agent. This Host Agent is implemented using Google’s A2A sample host agent and acts as the central A2A client. It is responsible for:

  • Creating tasks
  • Delegating them to specialized agents
  • Streaming real-time updates

Remote Agents + A2A Clients

Each Remote Agent represents a specialized domain expert such as weather intelligence, planning assistance, or knowledge retrieval. These agents are also based on Google’s A2A samples and behave as both A2A clients and task processors, capable of:

  • Sending tasks to other agents
  • Receiving and fulfilling tasks independently

A2A Servers (Utilities Agent, SAP Agent)

These are domain-specific agents that expose fully A2A-compliant server endpoints to process and respond to incoming tasks:

  • Utilities Agent: Handles time and weather requests.
  • SAP Agent: Performs enterprise-grade search over structured and unstructured data using Retrieval-Augmented Generation (RAG) powered by SAP HANA Cloud’s Vector Engine.

Each server advertises its capabilities via its .well-known/agent.json (Agent Card), allowing them to be discovered and used dynamically by other A2A agents.

MCP Server on SAP BTP Kyma (Tool Execution Layer)

Domain logic is implemented as tools registered in an MCP (Model Context Protocol) server, deployed using FastAPI on SAP BTP Kyma. These tools encapsulate specialized functions and are invoked during reasoning:

  • get_weather: Returns weather data
  • get_time: Returns current time
  • retriever: Executes RAG search against SAP HANA Cloud Vector Engine

Agents access these tools by dynamically registering them as callable functions within their reasoning loop.

 

Adapting the A2A Structure for Enterprise Agents on SAP

To implement A2A protocol–based collaboration within our ecosystem, we began with the official Google A2A Python samples, which provide a reference structure for building agents that can act as A2A clients, servers, or both. The image below shows the original A2A agent directory provided by Google:

📁agents/ag2/

  • agent.py — Main agent logic
  • task_manager.py — Task execution management
  • __main__.py, __init__.py, pyproject.toml — Package initialization and dependencies

To bring this into a real-world enterprise setting, we extended this architecture to include multiple agents, each encapsulated in its own submodule. Our structure includes:

📁agents/

  • 📁sap/ — SAP Agent (exposes enterprise RAG capabilities)
  • 📁utils/ — Utilities Agent (weather + time tools)
  • 📁common/ — Shared logic if needed across agents

Each agent remains A2A-compliant and can independently register itself, handle tasks, and advertise its capabilities via an Agent Card.

SAP Agent: Enterprise Knowledge & Search

The sap agent is a domain-specialized A2A server that offers enterprise-grade Retrieval-Augmented Generation (RAG) over structured and unstructured data stored in SAP HANA Cloud.

Key files:

  • agent.py — Implements the A2A server logic, task processing loop, and how artifacts are returned.
  • task_manager.py — Connects to the MCP tool server (deployed on SAP BTP Kyma) and executes the retriever function, which performs semantic search.
  • pyproject.toml — Declares the agent as an independent runnable module with its own dependencies.
  • __main__.py — Allows this agent to be launched independently.

This agent exposes its capabilities in a .well-known/agent.json, registering itself as a RAG-capable endpoint. When invoked by the Host Agent (or any other agent), it executes the retrieval task and streams the result back using A2A's artifact and message parts schema.

By modularizing this agent and separating enterprise logic from generic utilities, we make the setup highly scalable. You can add more agents later (e.g., summarizers, planners, auditors) without touching the SAP agent logic because everything is loosely coupled through A2A.

 

Utilities Agent: Domain Tools (Weather & Time)

The utils agent complements the SAP Agent by handling more lightweight, utility-style tasks such as providing current time or weather conditions. Though simple in scope, it plays an important role in demonstrating how multiple agents can independently contribute to a broader conversation via the A2A protocol.

This agent also follows the same modular, self-contained structure as the others:

Key files:

  • agent.py — Registers this agent as an A2A server, handling inbound tasks.
  • task_manager.py — Routes each task to the appropriate MCP tool endpoint (e.g., get_time, get_weather) hosted in the Kyma-based MCP server.
  • pyproject.toml — Declares the agent as an independent runnable module with its own dependencies.
  • __main__.py — Allows this agent to be launched independently.

Each tool is registered dynamically within the agent's reasoning logic and invoked via MCP. For example:

  • Tool 1: get_weather → Fetches weather info for a given city.
  • Tool 2: get_time → Returns the current time.

These tools live behind the MCP server but are logically abstracted at the agent level. This separation ensures that other agents don’t need to know or care where the logic runs. They just send an A2A task, and the Utilities Agent handles the orchestration behind the scenes.

Like the SAP Agent, this Utilities Agent is fully A2A-compliant, can run independently, and can be discovered by any other agent via its agent card.

 

A2A Agents Internals: Architecture and Execution Flow

This section introduces the core files for building A2A compliant agents: defining its reasoning logic (agent.py), managing task interactions with the A2A runtime (task_manager.py), and launching the agent as an A2A-compatible server (__main__.py).

 

File: agent.py — SAP Agent Execution Engine for A2A Tasks

The agent.py file implements the core logic for the SAP Agent, a fully A2A-compliant server-side agent designed to process incoming tasks using the tasks/sendSubscribe streaming interface. This agent is focused on executing Retrieval-Augmented Generation (RAG) via a retriever tool registered in an MCP (Model Context Protocol) server, hosted on SAP BTP Kyma and exposed through Server-Sent Events (SSE). The agent is constructed around A2A’s principles of agent autonomy, schema-based communication, and structured task flow.

When the agent receives a user query, it dynamically generates an LLM prompt that includes a list of available tools fetched via the MCP session. This prompt is intended to guide the LLM in deciding whether tool invocation is necessary. The agent defines a strict JSON schema that the LLM must follow, ensuring that responses are interpretable and actionable, particularly for determining whether to call the retriever function and with which parameters.

If the LLM decides that a tool should be used, the agent executes it using session.call_tool(), and stores the results. After all tool calls are handled, the agent uses the returned artifacts (tool outputs) to generate a final natural-language response. This final response is created by invoking the LLM again with a new prompt that summarizes the tool results, following a reasoning pattern that separates tool selection from user-facing output generation.

The stream() method exposes this entire lifecycle as an A2A streaming-capable endpoint. It starts by yielding an initial “processing” message to the A2A task manager, then creates a new MCP session using the SSE transport. With the session active, it initializes the LLM (in this case, GPT-4o via SAP GenAI Hub), executes the full tool-orchestration pipeline, and finally streams back the completed result.

import json
import asyncio
from typing import AsyncIterable, Dict, Any

from mcp import ClientSession
from mcp.client.sse import sse_client
from gen_ai_hub.orchestration.models.llm import LLM
from gen_ai_hub.orchestration.models.message import SystemMessage, UserMessage, AssistantMessage
from gen_ai_hub.orchestration.models.template import Template
from gen_ai_hub.orchestration.models.config import OrchestrationConfig
from gen_ai_hub.orchestration.service import OrchestrationService
from gen_ai_hub.orchestration.models.response_format import ResponseFormatJsonSchema


class SAPAgent:
    """Agent focused on using the 'retriever' tool via SAP GenAI Hub and MCP session."""

    SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]

    def __init__(self):
        pass

    def _build_dynamic_schema(self) -> dict:
        """Define expected JSON schema for tool-calling reasoning."""
        return {
            "title": "ToolCalls",
            "type": "object",
            "properties": {
                "tool_calls": {
                    "type": "array",
                    "items": {
                        "type": "object",
                        "properties": {
                            "decision": {"type": "string"},
                            "reason": {"type": "string"},
                            "function": {"type": "string"},
                            "parameters": {"type": "object"},
                        },
                        "required": ["decision", "reason", "function", "parameters"],
                    }
                }
            },
            "required": ["tool_calls"]
        }

    async def _generate_instruction(self) -> str:
        """Generate dynamic system prompt based on available 'retriever' tool."""
        description = json.dumps(await self.list_tools(), indent=2)
        return f"""
        You are an intelligent AI assistant capable of deciding whether to invoke tools based on the user's request.

        Available tools:
        {description}

        Instructions:
        - Use the 'retriever' tool for any factual or document-related question.
        - Return a JSON entry with the function name and parameters.
        - If no tool is relevant, return an entry with decision = "no_tool".

        Return ONLY valid JSON like:
        {{
          "tool_calls": [
            {{
              "decision": "tool",
              "reason": "The user is asking a factual question.",
              "function": "retriever",
              "parameters": {{
                "question": "What is SAP Datasphere?"
              }}
            }}
          ]
        }}
        """

    async def list_tools(self) -> dict:
        """Fetch list of tools from the MCP session."""
        tools_result = await self.session.list_tools()
        return {tool.name: {"description": tool.description} for tool in tools_result.tools}

    async def _execute_tool(self, decision: dict) -> str:
        """Execute the 'retriever' tool based on LLM decision."""
        try:
            result = await self.session.call_tool(decision["function"], arguments=decision.get("parameters", {}))
            return result.content[0].text
        except Exception as e:
            return f"Error: {str(e)}"

    async def _finalize_response(self, user_query: str, tool_results: list, messages: list) -> str:
        """Generate final response after tool usage."""
        messages.append(SystemMessage(
            """
            You now have access to tool results. Use ONLY these results to answer the user's original question naturally.
            Do not ask for any data you already received. Be concise and helpful.
            One or more tools may have returned vague answers. If so, use your own knowledge to fill the gaps.
            """
        ))

        messages.append(UserMessage(f"User question: {user_query}"))

        summary = "\n".join([f"- Tool `{name}` returned: {json.dumps(result)}" for name, result in tool_results])
        messages.append(UserMessage(f"Tool Results:\n{summary}"))

        template = Template(messages=messages, response_format="text")
        config = OrchestrationConfig(template=template, llm=self.llm)
        response = OrchestrationService(config=config).run()
        return response.module_results.llm.choices[0].message.content

    async def run(self, user_query: str) -> str:
        """Main method to orchestrate LLM + retriever tool."""
        system_message = SystemMessage(await self._generate_instruction())
        prompt = UserMessage(user_query)
        messages = [system_message, prompt]

        template = Template(
            messages=messages,
            response_format=ResponseFormatJsonSchema(
                name="ToolCall",
                description="Tool execution format",
                schema=self._build_dynamic_schema()
            )
        )

        config = OrchestrationConfig(template=template, llm=self.llm)
        response = OrchestrationService(config=config).run()

        decisions_json = json.loads(response.module_results.llm.choices[0].message.content)
        tool_results = []

        for decision in decisions_json.get("tool_calls", []):
            if decision.get("decision") == "tool":
                tool_response = await self._execute_tool(decision)
                tool_results.append((decision["function"], tool_response))
                messages.append(AssistantMessage(json.dumps(decision)))
            else:
                messages.append(AssistantMessage(json.dumps(decision)))

        return await self._finalize_response(user_query, tool_results, messages)

    async def stream(self, query: str, sessionId: str) -> AsyncIterable[Dict[str, Any]]:
        """
        Entry point for streaming response to A2A task manager.
        Initializes MCP session and streams a final task result.
        """
        yield {
            "is_task_complete": False,
            "require_user_input": False,
            "content": "Processing your request..."
        }

        try:
            async with sse_client("https://<your-server>.kyma.ondemand.com/sse") as (read, write):
                async with ClientSession(read, write) as session:
                    await session.initialize()
                    self.session = session
                    self.llm = LLM(name="gpt-4o", version="latest", parameters={"max_tokens": 2000, "temperature": 0.2})
                    response_text = await self.run(query)

            yield {
                "is_task_complete": True,
                "require_user_input": False,
                "content": response_text
            }

        except Exception as e:
            yield {
                "is_task_complete": True,
                "require_user_input": True,
                "content": f"An error occurred: {str(e)}"
            }

    def invoke(self, query: str, sessionId: str) -> Dict[str, Any]:
        """Not implemented — this agent supports streaming only."""
        raise NotImplementedError("Use the streaming interface (tasks/sendSubscribe) instead.")

This implementation adheres to A2A's official design by:

  • Respecting the SUPPORTED_CONTENT_TYPES contract defined in the agent’s AgentCard.
  • Using the SSE-based streaming interface (tasks/sendSubscribe) for real-time task progression.
  • Managing the full lifecycle of dynamic reasoning, tool invocation, and output generation.
  • Isolating the orchestration logic cleanly within a single class.

The invoke() method is intentionally unimplemented, signaling that this agent is designed for streaming scenarios only, an acceptable configuration per A2A protocol. Overall, this file showcases how an SAP-native agent can participate in an interoperable, multi-agent ecosystem via open standards like A2A and MCP.

 

File: task_manager.py — Managing the A2A Task Lifecycle

The task_manager.py file defines the AgentTaskManager class, which extends Google’s sample InMemoryTaskManager to handle the full lifecycle of A2A tasks for the SAP Agent. This includes receiving tasks, validating requests, invoking the agent, updating task status, and returning results or streaming updates to the A2A runtime via Server-Sent Events (SSE).

from typing import AsyncIterable
from common.types import (
    SendTaskRequest,
    TaskSendParams,
    Message,
    TaskStatus,
    Artifact,
    TextPart,
    TaskState,
    SendTaskResponse,
    InternalError,
    JSONRPCResponse,
    SendTaskStreamingRequest,
    SendTaskStreamingResponse,
    TaskArtifactUpdateEvent,
    TaskStatusUpdateEvent
)
from common.server.task_manager import InMemoryTaskManager
from agent import SAPAgent
import common.server.utils as utils
import asyncio
import logging
import traceback

logger = logging.getLogger(__name__)


class AgentTaskManager(InMemoryTaskManager):
    """Agent Task Manager for SAP Agent, handles task lifecycle and packaging."""

    def __init__(self, agent: SAPAgent):
        super().__init__()
        self.agent = agent

   # -------------------------------------------------------------
    # Public API methods
    # -------------------------------------------------------------        

    async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:

        validation_error = self._validate_request(request)
        if validation_error:
            return SendTaskResponse(id=request.id, error=validation_error.error)
        
        await self.upsert_task(request.params)
        await self.update_store(
            request.params.id, TaskStatus(state=TaskState.WORKING), None
        )
        query = self._extract_user_query(request.params)
        
        try:
            agent_response = await self.agent.invoke(user_query=query)
            return await self._handle_send_task(request, agent_response)
        except Exception as e:
            logger.error(f"Error invoking agent: {e}")
            return SendTaskResponse(
                id=request.id,
                error=InternalError(message=f"Error during on_send_task: {str(e)}")
            )

    async def on_send_task_subscribe(
            self, request: SendTaskStreamingRequest
            ) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
        
        try:
            error = self._validate_request(request)
            if error:
                return error

            await self.upsert_task(request.params)
            
            task_send_params: TaskSendParams = request.params
            sse_event_queue = await self.setup_sse_consumer(task_send_params.id, False)            
            
            asyncio.create_task(self._handle_send_task_streaming(request))
            
            return self.dequeue_events_for_sse(
                request.id, request.params.id, sse_event_queue
                )
        
        except Exception as e:
            logger.error(f"Error in SSE stream: {e}")
            print(traceback.format_exc())
            return JSONRPCResponse(
                id=request.id,
                error=InternalError(
                    message="An error occurred while streaming the response"
                ),
            )

    # -------------------------------------------------------------
    # Agent response handlers
    # -------------------------------------------------------------

    async def _handle_send_task(
        self, request: SendTaskRequest, agent_response: dict
    ) -> SendTaskResponse:

        task_send_params: TaskSendParams = request.params
        task_id = task_send_params.id
        history_length = task_send_params.historyLength
        task_status = None

        parts = [TextPart(type="text", text=agent_response["content"])]
        artifact = None
        if agent_response["require_user_input"]:
            task_status = TaskStatus(
                state=TaskState.INPUT_REQUIRED,
                message=Message(role="agent", parts=parts),
            )
        else:
            task_status = TaskStatus(state=TaskState.COMPLETED)
            artifact = Artifact(parts=parts)
        
        updated_task = await self.update_store(
            task_id, task_status, None if artifact is None else [artifact]
        )
        
        task_result = self.append_task_history(updated_task, history_length)
        return SendTaskResponse(id=request.id, result=task_result)

    async def _handle_send_task_streaming(self, request: SendTaskStreamingRequest):
        
        task_send_params: TaskSendParams = request.params
        query = self._extract_user_query(task_send_params)

        try:
            async for item in self.agent.stream(query, task_send_params.sessionId):
                is_task_complete = item["is_task_complete"]
                require_user_input = item["require_user_input"]
                content = item["content"]
                
                logger.info(f"Stream item received: complete={is_task_complete}, require_input={require_user_input}, content_len={len(content)}")
                
                artifact = None
                message = None
                parts = [TextPart(type="text", text=content)]
                end_stream = False

                if not is_task_complete and not require_user_input:
                    # Processing message - working state
                    task_state = TaskState.WORKING
                    message = Message(role="agent", parts=parts)
                    logger.info(f"Sending WORKING status update")
                elif require_user_input:
                    # Requires user input - input required state
                    task_state = TaskState.INPUT_REQUIRED
                    message = Message(role="agent", parts=parts)
                    end_stream = True
                    logger.info(f"Sending INPUT_REQUIRED status update (final)")
                else:
                    # Task completed - completed state with artifact
                    task_state = TaskState.COMPLETED
                    artifact = Artifact(parts=parts, index=0, append=False)
                    end_stream = True
                    logger.info(f"Sending COMPLETED status with artifact (final)")

                # Update task store (return value not used)
                task_status = TaskStatus(state=task_state, message=message)
                await self.update_store(
                    task_send_params.id,
                    task_status,
                    None if artifact is None else [artifact],
                )

                # First send artifact if we have one
                if artifact:
                    logger.info(f"Sending artifact event for task {task_send_params.id}")
                    task_artifact_update_event = TaskArtifactUpdateEvent(
                        id=task_send_params.id, artifact=artifact
                    )
                    await self.enqueue_events_for_sse(
                        task_send_params.id, task_artifact_update_event
                    )                    
                
                # Then send status update
                logger.info(f"Sending status update for task {task_send_params.id}, state={task_state}, final={end_stream}")
                task_update_event = TaskStatusUpdateEvent(
                    id=task_send_params.id, status=task_status, final=end_stream
                )
                await self.enqueue_events_for_sse(
                    task_send_params.id, task_update_event
                )

        except Exception as e:
            logger.error(f"An error occurred while streaming the response: {e}")
            logger.error(traceback.format_exc())
            await self.enqueue_events_for_sse(
                task_send_params.id,
                InternalError(message=f"An error occurred while streaming the response: {e}")                
            )

    # -------------------------------------------------------------
    # Utility methods
    # -------------------------------------------------------------

    def _validate_request(
        self, request: SendTaskRequest | SendTaskStreamingRequest
    ) -> JSONRPCResponse | None:
        
        task_send_params: TaskSendParams = request.params
        if not utils.are_modalities_compatible(
            task_send_params.acceptedOutputModes, SAPAgent.SUPPORTED_CONTENT_TYPES
        ):
            logger.warning(
                "Unsupported output mode. Received %s, Support %s",
                task_send_params.acceptedOutputModes,
                SAPAgent.SUPPORTED_CONTENT_TYPES,
            )
            return utils.new_incompatible_types_error(request.id)
        return None
        
    def _extract_user_query(self, task_send_params: TaskSendParams) -> str:
        """
        Extract the user's text query from the task parameters.
        
        Extracts and returns the text content from the first part of the user's message.
        Currently only supports text parts.
        
        Args:
            task_send_params: The parameters of the task containing the user's message.
            
        Returns:
            str: The extracted text query.
            
        Raises:
            ValueError: If the message part is not a TextPart.
        """
        part = task_send_params.message.parts[0]
        if not isinstance(part, TextPart):
            raise ValueError("Only text parts are supported")
        return part.text

Key Responsibilities:

  • Task Validation & Creation: On receiving a task via on_send_task or on_send_task_subscribe, the manager validates the request and initializes the task store.

  • Agent Invocation: Delegates the user query to the SAP Agent’s invoke() or stream() method depending on whether the request is synchronous or streaming.

  • State Management: Translates agent responses into A2A-compatible TaskStatus and Artifact objects, updating the task state accordingly.

  • Streaming Support: Implements asynchronous event streaming via SSE using enqueue_events_for_sse() and dequeue_events_for_sse(), allowing the client to receive real-time progress, partial results, and final outputs, core to the A2A streaming spec (tasks/sendSubscribe).

  • Output Formatting: All messages returned to the A2A protocol are encoded using TextPart and Message objects per the JSON schema defined by A2A.

This design ensures full compliance with the A2A protocol’s expectations around TaskStatus, Artifact, and progressive output delivery, as described in the Agent-to-Agent Communication.

 

File: __main__.py — Bootstrapping the A2A SAP Agent

This file initializes and launches the SAP Agent as an A2A-compatible server by using Google’s A2AServer abstraction. It defines how the agent exposes its capabilities, endpoint, and metadata through the Agent Card, which is a key requirement of the Agent2Agent (A2A) protocol specification.

import click
import logging

from agent import SAPAgent
from task_manager import AgentTaskManager
from common.server.server import A2AServer
from common.types import AgentCard, AgentCapabilities, AgentSkill, MissingAPIKeyError

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@click.command()
@click.option("--host", "host", default="localhost")
@click.option("--port", "port", default=8001)

def main(host, port):
    """Starts the SAP Retriever Agent via A2A server."""

    try:
        capabilities = AgentCapabilities(streaming=True)

        skills = [
            AgentSkill(
                id="sap_agent",
                name="SAP Agent",
                description="Answers SAP-related questions using a retriever tool.",
                tags=["sap", "retriever", "knowledge"],
                examples=[
                    "What is SAP Datasphere?",
                    "Explain the role of SAP BTP.",
                    "How does SAP S/4HANA differ from ECC?"
                ]
            )
        ]

        agent_card = AgentCard(
            name="SAP Agent",
            description="Intelligent agent for answering SAP-related questions using knowledge retrieval.",
            url=f"http://{host}:{port}/",
            version="1.0.0",
            defaultInputModes=SAPAgent.SUPPORTED_CONTENT_TYPES,
            defaultOutputModes=SAPAgent.SUPPORTED_CONTENT_TYPES,
            capabilities=capabilities,
            skills=skills,
        )

        server = A2AServer(
            agent_card=agent_card,
            task_manager=AgentTaskManager(agent=SAPAgent()),
            host=host,
            port=port
        )

        logger.info(f"Starting SAP Retriever Agent on {host}:{port}")
        server.start()
        logger.info(f"Uvicorn should now be running at http://{host}:{port}")

    except MissingAPIKeyError as e:
        logger.error(f"API Key missing: {e}")
        exit(1)
    except Exception as e:
        logger.error(f"Startup error: {e}")
        exit(1)


if __name__ == "__main__":
    main()

Key Functions and Design:

  • Agent Card Declaration: The agent's public metadata is encapsulated in an AgentCard, including name, URL, supported modalities, skills (e.g., answering SAP-related queries), and version. This enables discovery by client agents via the /.well-known/agent.json endpoint.

  • Skill Definition: The agent declares a single skill (sap_agent) describing its domain expertise: SAP product knowledge retrieval.

  • Task Manager Binding: The A2AServer is configured with a custom AgentTaskManager that handles A2A task lifecycle and invokes the SAPAgent defined earlier.

  • Streaming Support: Capabilities are declared to indicate the agent supports streaming task updates (i.e., tasks/sendSubscribe), a core feature of A2A.

This entry point allows the agent to be deployed as a compliant A2A Server capable of securely receiving and responding to tasks over HTTP using the shared protocol, ensuring full interoperability with any A2A client.

 

Extending the Agent Mesh: Utility Agent and the Pattern for Domain-Specific Agents

Following the same structure as the SAP Agent, the Utility Agent demonstrates how to create another domain-specific A2A-compatible agent. In this case, one focused on answering weather and time-related queries using external tools.

Just like the SAP Agent, it includes:

  • agent.py – Encapsulates reasoning and tool-calling logic using GenAI Hub + MCP tools (get_weather, get_time_now).
  • task_manager.py – Manages the A2A task lifecycle (receiving, processing, streaming updates).
  • __main__.py – Boots up the agent with a proper Agent Card and A2A server configuration.

The main differences lie in:

  • The tool set (e.g., get_weather vs retriever).

        skills = [
            AgentSkill(
                id="utility_agent",
                name="Utility Agent",
                description="Answers weather and time-related questions using external tools.",
                tags=["weather", "time"],
                examples=[
                    "What time is it in Brazil?", "What is the weather like now?"
                    ]
                )
            ]
  • The instructions returned to the LLM for tool decisioning.

    async def _generate_instruction(self) -> str:
        """Generate dynamic system prompt based on registered tools."""
        description = json.dumps(await self.list_tools(), indent=2)
        return f"""
        You are an intelligent AI assistant capable of deciding whether to invoke tools based on the user's request.

        Available tools:
        {description}

        Instructions:
        - For each relevant tool, return a JSON entry with the function name and parameters.
        - If no tool is relevant, return an entry with decision = "no_tool".

        Return ONLY valid JSON like:
        {{
          "tool_calls": [
            {{
              "decision": "tool",
              "reason": "The user asked for weather.",
              "function": "get_weather",
              "parameters": {{
                "latitude": 48.8566,
                "longitude": 2.3522
              }}
            }},
            {{
              "decision": "tool",
              "reason": "The user asked for time.",
              "function": "get_time_now",
              "parameters": {{}}
            }}
          ]
        }}
        """
  • The skills metadata advertised in the Agent Card (weather, time vs SAP knowledge).

        agent_card = AgentCard(
            name="Utility Agent",
            description="Answers weather and time-related queries using tool results.",
            url=f"http://{host}:{port}/",
            version="1.0.0",
            defaultInputModes=UtilityAgent.SUPPORTED_CONTENT_TYPES,
            defaultOutputModes=UtilityAgent.SUPPORTED_CONTENT_TYPES,
            capabilities=capabilities,
            skills=skills,
        )

This pattern is reusable: to create any new agent for your own use case (be it finance, HR, manufacturing and others) you only need to adapt these same three files with your own domain tools, task logic, and descriptions. All remaining A2A and MCP mechanics remain consistent across agents.

 

Managing Dependencies and Workspaces with pyproject.toml

To standardize how A2A-compatible agents are built, run, and extended, each agent (e.g., SAP Agent, Utility Agent) is configured using a pyproject.toml file. This file handles dependency management, agent-specific metadata, and workspace definitions, making it easy to scale to additional agents later.

[project]
name = "a2a-agent"
version = "0.1.0"
description = "A2A-compatible Agent using MCP and SAP GenAI Hub"
requires-python = ">=3.12"

dependencies = [
  "ag2[mcp, openai]>=0.8.6",              # A2A framework, MCP client, OpenAI-compatible LLM support
  "generative-ai-hub-sdk[all]>=0.4.0",    # SAP GenAI Hub orchestration + LLM schemas
  "fastapi",                              # Required by A2AServer
  "uvicorn[standard]>=0.27.1",            # Local server runner
  "httpx",                                # HTTP + SSE client
  "click>=8.1.3"                          # For CLI option handling
]

[tool.hatch.build.targets.wheel]
packages = ["common"]

[tool.uv.workspace]
members = [
  "agents/sap",
  "agents/utils"
]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[dependency-groups]
dev = [
  "pytest>=8.3.5",
  "pytest-mock>=3.14.0",
  "ruff>=0.11.2"
]
  • [project]

    • Defines agent-specific metadata: name, version, description, and Python version.

    • Lists core dependencies for running A2A + MCP + SAP GenAI Hub orchestration:

      • ag2[mcp, openai]: A2A protocol, MCP support, and LLM-compatible interfaces.

      • generative-ai-hub-sdk[all]: SAP’s SDK for orchestration, schema validation, and LLMs.

      • fastapi, uvicorn[standard]: Required for serving A2A HTTP endpoints.

      • httpx: Needed by SSE clients and GenAI Hub HTTP logic.

      • click: Optional but used to simplify CLI startup with __main__.py.

  • [tool.uv.sources]

    • Used by uv for local development. This helps you run each agent individually with uv run ., while still sharing dependencies.

  • [tool.hatch.build.targets.wheel]

    • Ensures shared logic (like the common folder) is included when packaging agents.

  • [tool.uv.workspace]

    • Declares a multi-agent workspace for tools like uv to resolve dependencies across agents (e.g., agents/sap, agents/utils).

  • [build-system]

    • Specifies hatchling as the build backend. This is required for workspace coordination and dependency isolation.

  • [dependency-groups]

    • Optional section to include development tools (e.g., pytest, ruff) if testing and linting are needed.

This setup is highly extensible. To create a new agent, simply:

  1. Copy one of the agent folders (e.g., agents/sap).
  2. Rename and adapt the pyproject.toml.
  3. Update only the agent.py, task_manager.py, and __main__.py.

With this modular approach, you’re ready to scale your agent mesh without reinventing your development setup each time.

 

Running the Agents and Launching the Demo UI

To experience A2A in action, you can either build your own host application or use the official Google A2A Demo UI which is the approach we'll follow in this walkthrough. This web app acts as a visual interface for interacting with your Host and Remote agents, showcasing task orchestration, dynamic responses, and streaming task updates.

Step 1: Start Your A2A-Compatible Agents

First, start your custom agents (like SAP Agent and Utility Agent) locally. For example:

cd agents/sap
uv run .

carlosbasto_1-1746450349979.png

cd agents/utils
uv run .

carlosbasto_2-1746450378629.png

Ensure both agents expose valid AgentCards via the /.well-known/agent.json endpoint, and that they are reachable from your browser or backend app.

carlosbasto_4-1746450485198.png

carlosbasto_3-1746450457810.png

 

Step 2: Launch Google’s A2A Demo UI

The official A2A Demo Web App provides a conversation-centric UI for testing A2A agent collaboration.

Key Features:
  • Add agents dynamically by pasting their agent.json URL.
  • Talk to one or more agents via the Host Agent.
  • View detailed message/task history and streaming updates.

To get started:

cd demo/ui

Then, set up your environment variables for authentication (in our case, with Google AI Studio):

echo "GOOGLE_API_KEY=your_api_key_here" >> .env

Finally, launch the UI:

uv run main.py

carlosbasto_5-1746451135243.png

 

Setting Up Agents in the A2A Demo Application

Once both agents are running locally (in our case SAP Agent on localhost:8001 and Utility Agent on localhost:8002), you can use the Google A2A Demo UI to interact with them visually.

Step-by-Step: Register Remote Agents

  1. Access the Agents Page
    Navigate to http://0.0.0.0:12000/agents — this is where you'll register the remote agents.
     0a57b1bc-9a8b-443f-97cd-b72763acbf0b.png

  2. Add a New Remote Agent
    Click the upload icon. A dialog will appear prompting you to enter the Agent Address.
     

    carlosbasto_8-1746451745575.png 
  3. Register SAP Agent
    Enter localhost:8001 and click Read. The AgentCard metadata will be fetched:

    carlosbasto_10-1746451813050.png

  4. Register Utility Agent
    Repeat the process for the second agent using localhost:8002.

    carlosbasto_11-1746451904109.png
  5. Confirm Agent Registration
    You should now see both agents listed:

    carlosbasto_12-1746451952934.png

     

Scenario-Based Evaluation of A2A Agents

Here are the results of running the same questions as in our previous two blogs, but now using the A2A-enabled multi-agent system with dynamic routing and streaming UI.

"What time is it?"

carlosbasto_0-1746452182365.png

Handled by: Utility Agent
Analysis:
Compared to Blog #1 (basic agentic orchestration via GenAI Hub) and Blog #2 (MCP + Kyma), this result is functionally the same but here, the user sees it rendered instantly in the UI, and the agent is dynamically plugged in, not hardcoded.

 

"How’s the weather in Paris?"

carlosbasto_1-1746452247085.png

Handled by: Utility Agent
Analysis:
Same weather tool usage as in the previous blogs but now you see rich streaming and the agent auto-resolved via A2A discovery. You didn’t manually route the query, A2A did.

 

"What is SAP Business Data Cloud?"

carlosbasto_2-1746452331688.png

Handled by: SAP Agent
Analysis:
As in the previous blogs, this content is LLM-driven with SAP domain context. But now, you see that the LLM inside the SAP Agent was independently maintained and registered, showing how easy it is to compose modular domain agents.

 

Composite Query:

This example captures the power of modular reasoning, where tool-using agents handle real-time or specialized queries while the LLM addresses conceptual prompts, all seamlessly synthesized into a unified response.

"Explain what a blockchain is and also tell me: how’s the weather in Brazil, what time it is now, and what is the role of SAP Datasphere in SAP Business Data Cloud?"

carlosbasto_0-1746455579101.png

This compound query is intelligently decomposed and delegated behind the scenes:

Tool-Using Agents

  • Weather Agent provides:
    Weather in Brazil: 30.4°C with a wind speed of 7.6 m/s.

  • Time Agent provides:
    Current Time: 14:32.

  • SAP Agent provides:
    SAP Datasphere’s role in SAP Business Data Cloud.

LLM-Only Response

  • Blockchain explanation is handled directly by the base LLM. This shows that not every subtask requires a tool or external call.

This is just a simplified example of how A2A can be implemented. While it doesn’t cover all the architectural considerations, edge cases, or production-level variables, it provides a solid starting point for understanding the core concepts and possibilities.

 

Wrapping Up and Next Steps

In this blog, we expanded our SAP-based agentic architecture by adopting the Agent2Agent (A2A) protocol, an open standard that enables secure, interoperable communication between intelligent agents across teams, platforms, and domains.

Building on our earlier work with SAP Generative AI Hub, Model Context Protocol (MCP), and SAP BTP Kyma, we showcased how to scale beyond a single assistant by composing a mesh of specialized, A2A-compliant agents: each with its own skills, responsibilities, and autonomy. From retrieving SAP knowledge via the SAP Agent, to answering time and weather queries via the Utility Agent, every agent is discoverable, modular, and orchestrated via a central Host Agent.

This A2A-powered setup unlocks true composability and reuse in enterprise AI: agents are no longer hardcoded or monolithic, but instead self-describing, interoperable services that can be reused across use cases and extended independently.

Now it’s your turn to take this further.

  • Connect your multi-agent system to real SAP business data and enterprise tools.
  • Showcase high-impact use cases such as document summarization, financial analysis, planning support etc.
  • Explore how to monitor, secure, and govern your agents in production using SAP BTP.

With A2A and MCP in place, you have the foundation for a scalable, modular AI architecture that is ready to grow with your enterprise needs.

Hope that helps!

 

Further References