Artificial Intelligence Blogs Posts
cancel
Showing results for 
Search instead for 
Did you mean: 
I310202
Product and Topic Expert
Product and Topic Expert
0 Likes
198

Who this is for: Python developers on SAP BTP who want to build a smart document Q&A system. No prior knowledge of knowledge graphs or SPARQL needed.

What you will build: A FastAPI service that reads PDFs, stores the data in two ways (a knowledge graph and a vector store) inside SAP HANA Cloud, and answers questions using both stores at the same time — deployed to Cloud Foundry.

Stack: Python 3.11 · FastAPI · LangChain · SAP HANA Cloud · SAP AI Core (Claude via Bedrock) · Cloud Foundry

Agentic Hybrid RAG Architecture

Figure 1 — Agentic Hybrid RAG Architecture


Step 1 — Understanding the Problem

Before building, let’s look at why existing approaches fall short when you need accurate answers from documents.

1.1 Why Pure Vector RAG Breaks on Exact Facts

What vector RAG actually does

Vector RAG works like this: it cuts your documents into pieces (chunks), converts each piece into a list of numbers (a vector), and when you ask a question, it finds the pieces whose numbers are closest to your question’s numbers.

The problem: it finds text that talks about the topic — not necessarily the exact fact you asked for.

The chunk boundary problem — a real example

Imagine your MSDS document contains this section:

Section 7 — Handling and Storage

The product must be stored below 25°C in a well-ventilated area.
Keep away from sources of ignition. Flash point: 42°C.
Containers must be grounded to prevent static discharge.

When you split this document into 500-token pieces, this text ends up mixed with the previous section about composition percentages. The flash point 42°C is in there somewhere, but buried in noise.

Now ask: “What is the flash point of this product?”

The top-5 retrieved chunks might look like this:

[score=0.91] "...The product must be stored below 25°C in a well-ventilated
             area. Keep away from sources of ignition. Flash point: 42°C.
             Containers must be grounded..."
[score=0.84] "...Flammability classification: Flam. Liq. 3. See Section 9
             for physical and chemical properties. Refer to SDS for complete
             hazard data..."
[score=0.79] "...Physical state: liquid. Colour: clear. Odour: mild petroleum.
             Boiling point: 155°C. Melting point: <-20°C. Vapour pressure..."
[score=0.74] "...Section 9 — Physical and Chemical Properties. All values
             measured at 20°C unless otherwise stated..."
[score=0.71] "...For fire-fighting: CO2, dry powder, foam. Do not use water
             jets directly on burning liquid. Flash point determined by..."

The top chunk contains the answer — but only because this test PDF happened to keep that section together. Change the PDF, change the page layout, or add one more sentence before the flash point line, and it shifts into a different chunk. The answer becomes:

“The product has flammability properties as classified under GHS. Refer to Section 9 for specific physical and chemical data.”

Useless. The LLM has nothing concrete to work with, so it makes up a reasonable-sounding answer — this is hallucination, and it’s the biggest risk with vector-only RAG.

When vector RAG works well

Vector RAG is genuinely good at: - Procedures and narratives — “What should I do if I inhale this product?” retrieves first-aid paragraphs correctly because the question and the answer share vocabulary. - Fuzzy questions — “What protective gear do I need?” matches chunks containing “gloves”, “goggles”, “respirator” even though those exact words weren’t in the query. - Multi-sentence explanations — questions that require a paragraph-length answer rather than a single value.

The failure mode is specific: exact values, codes, and structured facts that may not survive the chunking process.

1.2 Why Pure Knowledge Graph RAG Is Brittle

What KG RAG does

A knowledge graph stores facts as simple three-part statements: (Subject, Predicate, Object). For example, “Acetone causes skin irritation” becomes:

(Acetone) → CAUSES → (Skin_irritation)

When you ask a question, the LLM writes a SPARQL query to look up those stored facts. SPARQL is exact-match — it only finds facts that match the query pattern exactly.

The predicate vocabulary problem — a real example

Suppose the document says “Acetone requires the use of chemical-resistant gloves.” The KG extraction pipeline (with our MSDS ontology) stores:

<Acetone>  <REQUIRES>  <Chemical_Resistant_Gloves>

Now a user asks: “What PPE is needed for Acetone?”

The LLM generates this SPARQL:

SELECT ?s ?p ?o WHERE {
  GRAPH <MSDS_Graph/MAT001> {
    ?s ?p ?o .
    FILTER(REGEX(str(?p), "HAS_PPE|HAS_PROTECTION|PPE_REQUIREMENT", "i"))
  }
}

Result: 0 rows.

The predicate in the graph is REQUIRES — not HAS_PPE. The LLM guessed the wrong name. SPARQL returns nothing. No error. The system just silently gives you an empty answer.

This is the main weakness of knowledge graph RAG: the LLM has to guess the exact word that was used when the data was stored. Without help, it’s basically a guessing game.

The fix is simple: limit the vocabulary to a small list (13 predicates in our case) when storing data, and give the same list to the query generator. That way it can’t guess wrong. This is what the ontology does (Step 3.2).

When KG RAG works well

KG retrieval is genuinely excellent at: - Exact attribute values — GHS codes, concentration limits, flash points stored as triples are returned verbatim, not approximated. - Typed relationship traversal — “What hazards are classified as Flam. Liq.?” traces a typed relationship path. - Grounded answers — every fact in the response is a stored triple. The LLM cannot hallucinate values it did not retrieve.


Step 2 — The Solution: Hybrid RAG with ReAct Architecture

2.1 The Hybrid Concept — Run Both in Parallel

The two failure modes are complementary:

Failure mode Vector RAG KG RAG

Chunk boundary sensitivityYesNo
Predicate vocabulary mismatchNoYes
Hallucination on sparse contextYesNo
Misses narrative/procedural textNoYes

The solution: run both at the same time and combine their answers. This covers both failure modes:

  • When KG returns the right facts but misses the narrative context → vector fills the gap.
  • When vector returns vague prose but KG has the exact value → KG grounds the answer.
  • When KG fails silently (wrong predicate) → vector’s answer is used directly.
  • When both succeed → an LLM synthesis call merges them into one answer.

2.2 ReAct — Agentic Architecture

ReAct (Reason + Act) is what makes this system smart instead of just a chain of steps. The idea is simple: after each action, the system looks at what it got back, thinks about whether it’s good enough, and decides what to do next.

This guide uses LangChain with Python’s ThreadPoolExecutor. But the pattern works with any framework — LangGraph, CrewAI, n8n, AutoGen — anything that lets you build an observe → reason → act loop.

The three agentic behaviours in this codebase

This system has three places where it makes decisions based on what it observes:

# Where What it does File

1KG self-correctionDetects empty SPARQL result, reasons about why (vocabulary mismatch), generates a different query with broadened patternsagents/kg_chain.py
2Supervisor dispatchOrchestrator runs both chains, observes both results, routes the merge strategy based on which chains succeededagents/orchestrator.py
3Graceful degradationWhen one chain fails, the system does not return an error — it reasons that one result is better than none and serves it directlyagents/orchestrator.py

Let’s walk through each one.


Agentic Behaviour 1 — KG Self-Correction Loop

This is the most important one. It is what separates this system from a static hybrid pipeline.

The problem it solves: When the LLM writes a SPARQL query with the wrong word, the database returns zero results — silently. A basic system would just return “no answer found”. Our system detects this and tries again with a better query.

How it works:

Observe:   SPARQL returned 0 <result> elements
Reason:    Likely cause is predicate vocabulary mismatch — LLM guessed the
           wrong name. Retrying with the same prompt will produce the same wrong name.
           I need to change the prompt to force broader matching.
Act:       Re-invoke write_query() with an explicit instruction to use
           REGEX alternation (UNION of possible predicate names) to
           widen the search net.
Observe:   New result — if non-empty, continue. If still empty, give up
           and signal the orchestrator to fall back to vector.

Here is exactly where this loop lives in agents/kg_chain.py:

# KG Agent — the self-correction loop

# Step 1: Generate SPARQL and execute
sparql_query = kg_srv.write_query(llm, question, graph_name)
response_xml, _ = hdb_srv.execute_sparql_query(conn, sparql_query)

# Step 2: OBSERVE — did the query return anything?
if not response_xml or _is_empty_sparql_result(response_xml):

    # Step 3: REASON — the cause is almost always vocabulary mismatch.
    # Don't retry with the same prompt; change the instruction.
    logger.info("KG chain: first attempt returned empty — retrying with relaxed query")
    relaxed_question = (
        question
        + " (use broader REGEX patterns and UNION to maximise recall)"
        #   ↑ this instruction changes the LLM's behaviour on the retry
    )

    # Step 4: ACT — re-invoke with the broadened prompt
    retry_sparql = kg_srv.write_query(llm, relaxed_question, graph_name)
    if retry_sparql and not retry_sparql.startswith("Error"):
        sparql_query = retry_sparql
        response_xml, _ = hdb_srv.execute_sparql_query(conn, sparql_query)

# Step 5: OBSERVE again — if still empty, return None (signals orchestrator)

What the broadened prompt actually produces:

-- First attempt (LLM guessed one predicate name):
FILTER(REGEX(str(?p), "HAS_PPE", "i"))
-- Result: 0 rows

-- After retry instruction "use broader REGEX and UNION":
FILTER(
  REGEX(str(?p), "REQUIRES|HAS_PPE|HAS_PROTECTION|HAS_SAFETY|SAFETY", "i")
)
-- Result: matches REQUIRES → returns the PPE triples

The LLM changed its own output in response to observed failure. That is the definition of a self-correction loop.

Why only one retry: Each retry costs one LLM call. One retry fixes about 60% of wrong-word failures — the LLM knows multiple possible names and just needs to be told to try all of them. If one retry still returns nothing, the fact probably isn’t in the graph at all — more retries won’t help.


Agentic Behaviour 2 — Supervisor Agent Pattern

The orchestrator is a supervisor: it sends the question to both agents, waits for their answers, and decides what to do based on what came back.

A basic system would just blindly combine both results. The supervisor actually checks: did the KG agent find something? Did the vector agent find something? Then it picks the best strategy.

# Orchestrator — the supervisor's decision logic

def merge_results(llm, question, kg_result, vec_result) -> dict:
    kg_answer  = kg_result.get("answer")   # None = chain failed or returned nothing
    vec_answer = vec_result.get("answer")  # None = chain failed or returned nothing

    # OBSERVE: what did each chain return?
    # REASON + ACT: route based on observed state

    if kg_answer and vec_answer:
        # Both succeeded — synthesise into a single grounded answer
        # Reason: two independent sources agreeing strengthens confidence;
        # KG provides the exact fact, vector provides the context
        return {"answer": _synthesise(llm, question, kg_answer, vec_answer),
                "sources": ["kg", "vector"]}

    if kg_answer:
        # Only KG succeeded — serve directly without synthesis
        # Reason: no point calling _synthesise() with one empty source;
        # it would just add an LLM call for no benefit
        return {"answer": kg_answer, "sources": ["kg"]}

    if vec_answer:
        # Only vector succeeded — serve directly
        # Reason: KG failed (empty after retry) but vector has something;
        # a partial answer is better than no answer
        return {"answer": vec_answer, "sources": ["vector"]}

    # Both failed — return structured error so the caller can distinguish
    # "no answer" from "answer is actually empty" (different monitoring cases)
    return {"answer": None, "sources": [], "error": {
        "kg":     kg_result.get("error"),
        "vector": vec_result.get("error"),
    }}

The sources field in the response is not cosmetic. It is the supervisor’s audit trail — it records which agents contributed to the answer. When you see "sources": ["vector"] in your logs, that is the supervisor telling you: “KG failed on this query. You are getting a vector-only answer. Check the KG chain.”


Agentic Behaviour 3 — Graceful Degradation

When one agent fails, the system doesn’t crash — it just uses the other agent’s answer. This means your users always get something useful, and your logs tell you exactly which agent failed so you can fix it.

The degradation ladder:

Both chains succeed         → Full hybrid synthesis    (best quality)
       ↓  (KG fails)
KG fails, vector succeeds   → Vector-only answer       (lower precision, no exact facts)
       ↓  (vector also fails)
Both chains fail            → Structured error dict    ({"kg": "timeout", "vector": "..."})

The structured error dict in Case 4 is itself agentic behaviour: instead of raising an exception and losing context, the orchestrator captures the per-chain failure reason and returns it as data. Downstream systems can inspect error.kg and error.vector independently to understand what went wrong.


Step 3 — Implementation

3.1 Prerequisites and Environment Setup

Services you need

Service What it does in this system Where to create it

SAP HANA CloudStores both the RDF knowledge graph and the vector embeddingsBTP Cockpit → HANA Cloud
SAP AI CoreHosts the LLM (Claude via AWS Bedrock) and the embedding modelBTP Cockpit → AI Core

You do not need a separate vector database (no Pinecone, Chroma, or Weaviate). HANA Cloud handles both stores with its SPARQL_EXECUTE stored procedure and REAL_VECTOR column type.

Python dependencies

cd backend
python -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt

Key packages and why each is needed:

fastapi, uvicorn           # HTTP server
hdbcli==2.23.26            # SAP HANA Python driver
langchain==0.3.27          # LLM orchestration framework
langchain-experimental     # LLMGraphTransformer (KG extraction from text)
langchain-openai           # OpenAIEmbeddings (used via AI Core proxy)
langchain-aws              # ChatBedrock (Claude via AI Core / Bedrock)
generative-ai-hub-sdk      # SAP AI Core authentication + proxy client
rdflib==7.1.4              # Build and serialise RDF graphs in Python
pypdf                      # Load PDFs into LangChain Document objects

Environment variables

Create backend/.env:

# HANA Cloud
HANA_HOST=your-instance.hanacloud.ondemand.com
HANA_PORT=443
HANA_USER=DBADMIN
HANA_PASSWORD=your-password

# SAP AI Core — from the service key in BTP
AICORE_AUTH_URL=https://your-subaccount.authentication.eu10.hana.ondemand.com
AICORE_CLIENT_ID=sb-xxxxx
AICORE_CLIENT_SECRET=xxxxx
AICORE_BASE_URL=https://api.ai.xxx.aws.ml.hana.ondemand.com

# LLM deployment
AICORE_DEPLOYMENT_ID=dxxxxxxxxxxxxxxx
AICORE_BEDROCK_MODEL_ID=anthropic.claude-3-5-sonnet-20241022-v2:0
LLM_MODEL_NAME=claude-3-5-sonnet

# Embedding model (deployed via AI Core)
EMBEDDING_MODEL_NAME=text-embedding-3-small

Verify connectivity before touching any RAG code

# 1. Test HANA connection
python - <<'EOF'
from hdbcli import dbapi
conn = dbapi.connect(
    address="your-instance.hanacloud.ondemand.com",
    port=443, user="DBADMIN", password="your-password",
    encrypt=True, sslValidateCertificate=False
)
print("HANA OK:", conn.cursor().execute("SELECT 1 FROM DUMMY").fetchone())
EOF

# 2. Test AI Core LLM
python - <<'EOF'
from dotenv import load_dotenv; load_dotenv()
import srv.aic_srv as aic_srv
llm = aic_srv.get_llm()
print(llm.invoke("Say hello in one word").content)
EOF

# 3. Test SPARQL stored procedure exists
python - <<'EOF'
from dotenv import load_dotenv; load_dotenv()
import srv.hdb_srv as hdb_srv
conn = hdb_srv.get_hana_connection()
result, _ = hdb_srv.execute_sparql_query(conn, "SELECT ?s ?p ?o WHERE { ?s ?p ?o } LIMIT 1")
print("SPARQL OK, result:", result)
EOF

Thread Safety — HANA Connections

HANA database connections cannot be shared between threads — if two threads use the same connection at the same time, you get random errors. Since our system runs multiple threads (KG agent, vector agent, ingestion), each thread needs its own connection. Python’s threading.local() solves this:

# HANA connection service
_local = threading.local()  # one instance per thread, invisible to other threads

def get_hana_connection():
    conn = getattr(_local, 'conn', None)
    if conn is not None:
        try:
            conn.cursor().execute("SELECT 1 FROM DUMMY")  # liveness check
            return conn
        except Exception:
            _local.conn = None  # stale — reconnect

    user, password, address, port = _resolve_credentials()
    conn = dbapi.connect(address=address, port=int(port),
                         user=user, password=password,
                         encrypt=True, sslValidateCertificate=False)
    _local.conn = conn
    return conn

Every thread that calls get_hana_connection() gets its own connection, created the first time it’s needed. Rule: never pass a connection between threads. All code in this guide follows this rule.

3.2 Design Your Ontology

For this guide, we use MSDS (Material Safety Data Sheets) as our example. MSDS documents are one of the most common types of unstructured enterprise data — every company that handles chemicals has hundreds of these PDFs. They contain a mix of structured facts (flash points, GHS codes) and narrative text (handling procedures, first aid), which makes them a perfect test case for hybrid RAG.

You can apply the same approach to any document type — contracts, policies, technical manuals, compliance reports.

Why this step cannot be skipped

LLMGraphTransformer is the LangChain tool that pulls facts out of text. Without rules, it picks whatever words it likes — requires, is_required_for, necessitates, demands. The problem? The query generator will never guess these exact words.

The ontology is a shared word list. The data-writing side and the data-reading side both use the same list. That’s how they stay in sync.

Structure of the ontology

The ontology is a Turtle (.ttl) RDF file. It defines OWL classes (node types) and OWL object properties (predicate types):

@prefix owl:  <http://www.w3.org/2002/07/owl#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix msds: <http://msds.knowledge-graph.org/> .

# ── Node types (OWL Classes) ─────────────────────────────────────────────────

msds:Chemical
    a owl:Class ;
    rdfs:label "Chemical" .          # ← rdfs:label is what LLMGraphTransformer sees

msds:Hazard
    a owl:Class ;
    rdfs:label "Hazard" .

msds:HazardClassification
    a owl:Class ;
    rdfs:subClassOf msds:Hazard ;
    rdfs:label "Hazard Classification" .

msds:PersonalProtectiveEquipment
    a owl:Class ;
    rdfs:subClassOf msds:SafetyMeasure ;
    rdfs:label "Personal Protective Equipment" .

# ── Predicates (OWL Object Properties) ──────────────────────────────────────

msds:requires
    a owl:ObjectProperty ;
    rdfs:label "REQUIRES" .          # ← UPPER_SNAKE_CASE matches SPARQL REGEX patterns

msds:hasHazard
    a owl:ObjectProperty ;
    rdfs:label "HAS_HAZARD" .

msds:causes
    a owl:ObjectProperty ;
    rdfs:label "CAUSES" .

doc_srv.py loads this file at module import time using rdflib:

# Ontology loader — runs once at startup
def _load_ontology(ttl_path: str):
    g = RDFGraph()
    g.parse(ttl_path, format="turtle")

    # Extract rdfs:label of every OWL Class → allowed node types
    nodes = [
        str(g.value(c, RDFS.label))
        for c in g.subjects(RDF.type, OWL.Class)
        if g.value(c, RDFS.label)
    ]

    # Extract rdfs:label of every OWL ObjectProperty → allowed predicates
    # Labels are UPPER_SNAKE_CASE (e.g. "REQUIRES", "HAS_HAZARD")
    rels = [
        str(g.value(p, RDFS.label)).replace(" ", "_").upper()
        for p in g.subjects(RDF.type, OWL.ObjectProperty)
        if g.value(p, RDFS.label)
    ]
    return nodes, rels

_ONTOLOGY_ALLOWED_NODES, _ONTOLOGY_ALLOWED_RELATIONSHIPS = _load_ontology(_ontology_ttl)
# → _ONTOLOGY_ALLOWED_NODES = ["Chemical", "Hazard", "Hazard Classification", ...]
# → _ONTOLOGY_ALLOWED_RELATIONSHIPS = ["REQUIRES", "HAS_HAZARD", "CAUSES", ...]

These lists are passed to LLMGraphTransformer at ingestion time. The LLM is instructed: “Extract triples using only these node types: [Chemical, Hazard, …] and relationships: [REQUIRES, HAS_HAZARD, …]”. Without these constraints, LLMGraphTransformer generates arbitrary predicate names that SPARQL can never match — see Step 4.2, Pitfall 1. The same predicate list is injected into the SPARQL query generation prompt, closing the vocabulary loop.

Designing your own ontology

For a domain other than MSDS, follow this checklist:

  1. What are your entity types? (Product, Person, Regulation, Policy, Contract…)
  2. What relationships connect them? Keep predicates to 10–20. Fewer is better.
  3. Use UPPER_SNAKE_CASE for predicates. The SPARQL query generator is primed with this pattern.
  4. Every predicate the query generator might produce must exist in the ontology. If users will ask “what does X contain?”, add a CONTAINS or CONTAINED_IN predicate.
  5. Test by running LLMGraphTransformer on 3–5 sample documents and checking the generated predicates match your list.

Before running Step 2: The ontology file (01-MSDS_Ontology.ttl) must exist at the path configured in doc_srv.py. It is loaded once at module import time via _load_ontology(). Verify the file is in place before starting ingestion — a missing ontology file means LLMGraphTransformer will generate unconstrained predicates that SPARQL can never match (see Step 4.2, Pitfall 1).

3.3 Build the Ingestion Pipeline

The concept in 10 lines

Before reading the production code, here is the minimal concept:

# Concept (not the real file — see doc_srv.py)
def ingest(pdf_path, material_id, llm, hana_conn):
    chunks = chunk_pdf(pdf_path)          # Load PDF → split into text chunks

    # Thread A: KG extraction
    graph_transformer = LLMGraphTransformer(llm=llm,
        allowed_nodes=ALLOWED_NODES, allowed_relationships=ALLOWED_RELS)
    graph_docs = graph_transformer.convert_to_graph_documents(chunks)
    triples = build_rdf_graph(graph_docs)
    sparql_insert = build_sparql_insert(triples, f"MSDS_Graph/{material_id}")
    execute_sparql_update(hana_conn, sparql_insert)  # → HANA named graph

    # Thread B: Vector embedding
    embeddings = [embed(chunk) for chunk in chunks]
    for chunk, vec in zip(chunks, embeddings):
        hana_conn.cursor().execute(
            "INSERT INTO MSDS_VECTORS VALUES (?, ?, TO_REAL_VECTOR(?), ?)",
            (material_id, chunk.page_content, vec_str, idx)
        )

Both paths read the same chunks list. They write to different HANA stores. They run in parallel threads.

The real code: srv/doc_srv.py

Chunking the PDF

# Document service
def create_chunks(documents, chunk_size=500, chunk_overlap=50):
    text_splitter = TokenTextSplitter(chunk_size=chunk_size, chunk_overlap=50)
    chunks = []
    for doc in documents:
        cleaned_text = clean_text(doc.page_content)  # strips quotes, newlines
        doc_chunks = text_splitter.split_text(cleaned_text)
        for chunk in doc_chunks:
            chunks.append(Document(page_content=chunk, metadata=doc.metadata))
    return chunks

TokenTextSplitter splits by token count (not characters). The chunk_overlap=50 means consecutive chunks share 50 tokens at the edges — so if a fact falls right at a split point, it appears in both chunks and won’t get lost.

Thread A — KG extraction: _extract_graph()

# Ingestion — Thread A (KG extraction)
def _extract_graph(file_path, llm, material_number, chunks) -> dict:
    named_graph = f"MSDS_Graph/{material_number}"
    conn = hdb_srv.get_hana_connection()  # ← thread-local; explained in Step 3.4

    # 1. Create the LLM-based graph transformer with ontology constraints
    llm_transformer = LLMGraphTransformer(
        llm=llm,
        allowed_nodes=_ONTOLOGY_ALLOWED_NODES,        # e.g. ["Chemical", "Hazard", ...]
        allowed_relationships=_ONTOLOGY_ALLOWED_RELATIONSHIPS,  # e.g. ["REQUIRES", ...]
    )

    # 2. Convert all chunks to GraphDocument objects in a thread pool
    #    Each chunk produces a list of Node and Relationship objects
    graph_document_list = _convert_chunks_to_graph_documents(llm_transformer, chunks)

    # 3. Build a single rdflib Graph from all nodes and relationships
    g = _build_rdf_graph(graph_document_list)

    # 4. Serialize to a SPARQL INSERT DATA query targeting the named graph
    rdf_triples, sparql_insert_query = _build_sparql_insert(g, named_graph)

    # 5. Write to HANA
    hdb_srv.execute_sparql_update(conn, sparql_insert_query)
    return {"pipeline": "graph", "triples_count": len(rdf_triples)}

The most important line is the LLMGraphTransformer constructor. Without allowed_nodes and allowed_relationships, it generates free-form predicates that SPARQL can never match. With them, every stored triple uses a predicate from your 13-item list.

Building a URI-safe node name

HANA SPARQL has a quirk: local names in URIs cannot start with a digit or underscore. The safe_uri() helper handles this:

# Document service
_NON_URI_SAFE = re.compile(r"[^A-Za-z0-9_\-]")

def safe_uri(string: str) -> URIRef:
    local = string.replace(" ", "_").replace("/", "_")
    local = _NON_URI_SAFE.sub("", local)     # strip all other special chars
    if local[0].isdigit() or local[0] == "_":
        local = "id_" + local                # prefix to avoid HANA parse error
    return URIRef(EX_NAMESPACE[local])       # → <http://msds.knowledge-graph.org/Acetone>

Thread B — vector embedding: _embed_and_store_background()

# Ingestion — Thread B (vector embedding)
def _embed_and_store_background(llm, chunks, material_number,
                                 file_hash=None, file_content=None):
    conn = hdb_srv.get_hana_connection()    # ← own thread-local connection

    # Dedup: skip if this exact file was already embedded for this material
    if file_hash and hdb_srv.check_hash_exists(conn, material_number, file_hash):
        return {"pipeline": "vector", "vectors_count": 0, "skipped": True}

    # Delegate to vector_srv (see Step 3.4)
    count = vector_srv.embed_and_store(llm, conn, chunks, material_number)
    return {"pipeline": "vector", "vectors_count": count}

The entry point: process_document()

# Entry point — called from POST /process-upload
def process_document(file_path, llm, conn, file_name=None,
                     attachment_id=None, file_hash=None) -> dict:
    material_number = file_name or os.path.basename(file_path)

    # Validate: must be a real PDF
    with open(file_path, "rb") as f:
        if f.read(5) != b"%PDF-":
            raise ValueError("Not a valid PDF")

    with open(file_path, "rb") as f:
        file_content = f.read()

    loader = PyPDFLoader(file_path)
    documents = loader.load()
    chunks = create_chunks(documents)     # shared between both threads

    # Thread A — KG extraction
    thread_graph = threading.Thread(
        target=_extract_graph,
        args=(file_path, llm, material_number, chunks),
        daemon=True, name=f"graph-{material_number}",
    )

    # Thread B — vector embedding
    thread_vector = threading.Thread(
        target=_embed_and_store_background,
        kwargs={"llm": llm, "chunks": chunks, "material_number": material_number,
                "file_hash": file_hash, "file_content": file_content},
        daemon=True, name=f"vector-{material_number}",
    )

    thread_graph.start()
    thread_vector.start()
    # Returns immediately — HTTP response does not wait for threads to finish
    return {"status": "processing", "materialNumber": material_number,
            "pipelines": ["graph", "vector"]}

The HTTP endpoint returns 202 Accepted immediately. Ingestion runs in the background. KG extraction typically takes 30–120 seconds (one LLM call per chunk). Vector embedding is faster (5–20 seconds). Because they run concurrently in separate threads, the total wall-clock time is max(t_KG, t_vector).

3.4 Build the KG Agent with Retry Loop

The concept in 8 lines

# Concept (not the real file — see agents/kg_chain.py)
def kg_agent(llm, question, graph_name):
    sparql = write_query(llm, question, graph_name)   # LLM generates SPARQL
    result = execute_sparql(conn, sparql)              # Run it in HANA

    if is_empty(result):                               # ← the agentic part
        sparql = write_query(llm, question + " (use UNION and broader REGEX)")
        result = execute_sparql(conn, sparql)

    return summarize(llm, question, result)            # LLM summarises triples

The “agentic” label is earned by the if is_empty branch. A static pipeline would return nothing. The agent detects the failure and re-invokes the LLM with a broadened instruction. It does not just retry the same call — it changes the prompt.

The SPARQL query generator: srv/kg_srv.py

The write_query() function sends the LLM a prompt that includes: 1. The named graph URI (so all triples are scoped to the right material) 2. The complete list of allowed predicates from the ontology 3. Several worked examples 4. REGEX usage rules (HANA SPARQL requires full URIs, no PREFIX declarations)

# SPARQL query generator
def write_query(llm, question: str, named_graph: str) -> str:
    _validate_graph_name(named_graph.split("/")[-1])  # block SPARQL injection

    template = '''You are a SPARQL query expert.
    Named graph: {graph}

    Known predicates: CAUSES, AFFECTS, REQUIRES, HAS_HAZARD, HAS_EFFECT,
      HAS_PRECAUTION, HAS_PROPERTY, HAS_CODE, CONTAINED_IN

    RULES:
    1. ALL triples inside GRAPH <{graph}> {{ ... }}
    2. Use REGEX(str(?var), "pattern", "i") for case-insensitive matching
    3. No PREFIX declarations — always full URIs or REGEX
    4. Output ONLY the SPARQL SELECT block

    Question: {input}
    SPARQL Query:'''

    prompt = PromptTemplate.from_template(template).invoke({
        "input": question, "graph": named_graph
    })
    raw = llm.invoke(prompt)
    return _extract_sparql(raw.content)   # regex extracts SELECT block

_validate_graph_name() rejects any materialNumber that contains characters outside [A-Za-z0-9_-]. This prevents a user from passing ../evil as a material name and producing a SPARQL query that operates on a different graph.

_extract_sparql() handles the common LLM output patterns: a bare SELECT block, or a SELECT block wrapped in a ```sparql ``` code fence.

HANA SPARQL execution: srv/hdb_srv.py

HANA Cloud has no native SPARQL endpoint. All SPARQL operations go through the SPARQL_EXECUTE stored procedure. The Content-Type header in parameter 2 tells HANA whether to interpret the payload as a SELECT or an INSERT:

# HANA connection service
def execute_sparql_query(conn, sparql_query):
    """READ — SELECT queries."""
    cursor = conn.cursor()
    resp = cursor.callproc("SPARQL_EXECUTE", (
        sparql_query,
        "Accept: application/sparql-results+xml "
        "Content-Type: application/sparql-query",   # ← SELECT header
        "?",
        None,
    ))
    return resp[2], resp[3]   # (xml_result_string, metadata)

def execute_sparql_update(conn, sparql_update):
    """WRITE — INSERT DATA / DROP GRAPH."""
    cursor = conn.cursor()
    resp = cursor.callproc("SPARQL_EXECUTE", (
        sparql_update,
        "Accept: application/sparql-results+xml "
        "Content-Type: application/sparql-update",  # ← UPDATE header
        "?",
        None,
    ))
    return resp[2], resp[3]

The result of a SELECT comes back as an XML string in SPARQL Results Format. The KG chain parses this XML to extract the fact rows.

The full KG chain with retry: agents/kg_chain.py

# kg_agent.py
def run(llm, question: str, graph_name: str, history: list[dict]) -> dict:
    conn = hdb_srv.get_hana_connection()
    try:
        # 1. Generate SPARQL from the question
        sparql_query = kg_srv.write_query(llm, question, graph_name)
        if not sparql_query or sparql_query.startswith("Error"):
            return {"answer": None, "error": sparql_query, ...}

        # 2. Execute against HANA
        response_xml, _ = hdb_srv.execute_sparql_query(conn, sparql_query)

        # 3. ── Agentic retry on empty result ─────────────────────────────
        if not response_xml or _is_empty_sparql_result(response_xml):
            logger.info("Empty result — retrying with relaxed query")
            relaxed_question = (
                question
                + " (use broader REGEX patterns and UNION to maximise recall)"
            )
            retry_sparql = kg_srv.write_query(llm, relaxed_question, graph_name)
            if retry_sparql and not retry_sparql.startswith("Error"):
                sparql_query = retry_sparql
                response_xml, _ = hdb_srv.execute_sparql_query(conn, sparql_query)
        # ─────────────────────────────────────────────────────────────────

        # 4. Summarise the XML fact rows into a natural language answer
        answer = kg_srv.summarize_info(llm, question, response_xml)
        facts = _parse_sparql_xml_to_list(response_xml) if response_xml else []

        # 5. Treat "no information found" as a null result (signals vector fallback)
        if answer and answer.lower().startswith("no information found"):
            answer = None

        return {"answer": answer, "sparql_query": sparql_query, "facts": facts,
                "source": "kg", "error": None}

    except Exception as exc:
        logger.exception("KG chain error")
        return {"answer": None, "error": str(exc), ...}

Why the retry works: The broadened prompt includes the phrase "use UNION and broader REGEX patterns". The LLM responds by generating a FILTER with REGEX alternation:

-- First attempt (narrow):
FILTER(REGEX(str(?p), "HAS_PPE", "i"))
-- Returns 0 rows because predicate is REQUIRES, not HAS_PPE

-- Retry (broadened):
FILTER(REGEX(str(?p), "REQUIRES|HAS_PPE|HAS_PROTECTION|SAFETY", "i"))
-- Matches REQUIRES → returns the PPE triples

The retry costs exactly one additional LLM call. It is bounded — there is no loop, no exponential backoff. If the retry also returns empty, answer = None and the orchestrator falls back to the vector result.

Parsing the XML result:

# kg_agent.py
def _is_empty_sparql_result(xml_text: str) -> bool:
    """True when the SPARQL XML result set contains zero <result> elements."""
    import xml.etree.ElementTree as ET
    _NS = "http://www.w3.org/2005/sparql-results#"
    root = ET.fromstring(xml_text)
    return len(root.findall(f".//{{{_NS}}}result")) == 0

def _parse_sparql_xml_to_list(xml_text: str) -> list[dict]:
    """Convert SPARQL XML rows into [{name: value, ...}, ...] dicts."""
    _NS = "http://www.w3.org/2005/sparql-results#"
    root = ET.fromstring(xml_text)
    rows = []
    for result in root.findall(f".//{{{_NS}}}result"):
        row = {}
        for binding in result.findall(f"{{{_NS}}}binding"):
            name = binding.get("name")
            uri = binding.find(f"{{{_NS}}}uri")
            lit = binding.find(f"{{{_NS}}}literal")
            node = uri if uri is not None else lit
            row[name] = node.text if node is not None else ""
        rows.append(row)
    return rows

3.5 Build the Vector Agent

Embedding the question: srv/vector_srv.py

# Vector service
def _get_embeddings() -> OpenAIEmbeddings:
    """Return an OpenAIEmbeddings instance pointing at the AI Core proxy."""
    import os
    model = os.getenv("EMBEDDING_MODEL_NAME", "text-embedding-3-small")
    return OpenAIEmbeddings(proxy_model_name=model)  # SAP AI Core proxy

def embed_question(question: str) -> list[float]:
    embeddings = _get_embeddings()
    return embeddings.embed_query(question)  # → list of 1536 floats

OpenAIEmbeddings(proxy_model_name=...) from generative-ai-hub-sdk routes through SAP AI Core’s OpenAI-compatible proxy. The return value is a plain Python list of floats — the same type that gets stored in HANA and used at query time.

Lazy vector table creation: srv/hdb_srv.py

The vector table does not need to exist before first use. ensure_vector_table() checks SYS.TABLES and creates it only if absent. The embedding dimension is baked into the REAL_VECTOR column at creation time:

# HANA connection service
def ensure_vector_table(conn, dim: int) -> None:
    cursor = conn.cursor()

    # HANA Cloud does not support CREATE TABLE IF NOT EXISTS — check SYS.TABLES first
    cursor.execute(
        "SELECT COUNT(*) FROM SYS.TABLES "
        "WHERE TABLE_NAME = 'MSDS_VECTORS' AND SCHEMA_NAME = CURRENT_SCHEMA"
    )
    if cursor.fetchone()[0] > 0:
        return  # already exists

    ddl = f"""CREATE TABLE MSDS_VECTORS (
        ID               NVARCHAR(36)    NOT NULL DEFAULT SYSUUID,
        MATERIAL_NUMBER  NVARCHAR(40)    NOT NULL,
        CHUNK_TEXT       NCLOB           NOT NULL,
        EMBEDDING        REAL_VECTOR({dim}),      -- ← dimension from first embed call
        CHUNK_INDEX      INTEGER         NOT NULL,
        PRIMARY KEY (ID)
    )"""
    cursor.execute(ddl)
    conn.commit()

text-embedding-3-small produces 1536-dimensional vectors. If you switch to text-embedding-3-large (3072 dimensions), the table is automatically created with the correct dimension on first use. No schema migration needed.

Storing embeddings: srv/vector_srv.py

# Vector service
def embed_and_store(llm, conn, chunks: list, material_number: str) -> int:
    embeddings = _get_embeddings()

    # Embed first chunk to determine dimension, then create table if needed
    first_vec = embeddings.embed_query(chunks[0].page_content)
    dim = len(first_vec)
    hdb_srv.ensure_vector_table(conn, dim)

    # Batch insert — serialize the float list as "[0.123, -0.456, ...]"
    insert_sql = (
        "INSERT INTO MSDS_VECTORS (MATERIAL_NUMBER, CHUNK_TEXT, EMBEDDING, CHUNK_INDEX) "
        "VALUES (?, ?, TO_REAL_VECTOR(?), ?)"
    )
    vectors = [first_vec] + [embeddings.embed_query(c.page_content) for c in chunks[1:]]
    cursor = conn.cursor()
    for idx, (chunk, vec) in enumerate(zip(chunks, vectors)):
        vec_str = "[" + ",".join(str(v) for v in vec) + "]"
        cursor.execute(insert_sql, (material_number, chunk.page_content, vec_str, idx))
    conn.commit()
    return len(chunks)

TO_REAL_VECTOR(?) is a HANA function that converts the string "[0.123, ...]" into a native REAL_VECTOR value for storage. At search time, the same conversion applies to the query vector.

Cosine similarity search: srv/vector_srv.py

# Vector service
def search_vectors(conn, embedding: list[float], material_number: str,
                   top_k: int = 5) -> list[dict]:
    vec_str = "[" + ",".join(str(v) for v in embedding) + "]"
    sql = (
        "SELECT TOP ? CHUNK_TEXT, "
        "COSINE_SIMILARITY(EMBEDDING, TO_REAL_VECTOR(?)) AS SIM "
        "FROM MSDS_VECTORS "
        "WHERE MATERIAL_NUMBER = ? "    # ← isolate by material
        "ORDER BY SIM DESC"
    )
    cursor = conn.cursor()
    cursor.execute(sql, (top_k, vec_str, material_number))
    return [{"chunk_text": row[0], "similarity_score": float(row[1])}
            for row in cursor.fetchall()]

COSINE_SIMILARITY(EMBEDDING, TO_REAL_VECTOR(?)) is a native HANA function. The WHERE MATERIAL_NUMBER = ? filter is critical — without it, a query for material MAT001 would retrieve chunks from all other ingested documents.

Thread safety — the HANA connection rule

hdbcli connections are not thread-safe for shared use. The KG chain, vector chain, and two ingestion threads run concurrently. Each must have its own connection.

The solution is threading.local():

# HANA connection service
_local = threading.local()  # one instance per thread, invisible to other threads

def get_hana_connection():
    conn = getattr(_local, 'conn', None)
    if conn is not None:
        try:
            conn.cursor().execute("SELECT 1 FROM DUMMY")  # liveness check
            return conn
        except Exception:
            _local.conn = None  # stale — reconnect

    user, password, address, port = _resolve_credentials()
    conn = dbapi.connect(address=address, port=int(port),
                         user=user, password=password,
                         encrypt=True, sslValidateCertificate=False)
    _local.conn = conn
    return conn

Every thread that calls get_hana_connection() gets its own connection, created lazily on first call. The liveness check (SELECT 1 FROM DUMMY) reconnects automatically if the connection was dropped by HANA’s idle timeout.

3.6 Build the Orchestrator

The real orchestrator: agents/orchestrator.py

# orchestrator.py
_TIMEOUT_SECONDS = 30

def run(llm, question: str, graph_name: str, history: list[dict]) -> dict:
    material_number = graph_name.split("/")[-1]  # "MSDS_Graph/MAT001" → "MAT001"
    start_time = time.monotonic()

    with ThreadPoolExecutor(max_workers=2) as executor:
        kg_future  = executor.submit(kg_chain.run,     llm, question, graph_name,     history)
        vec_future = executor.submit(vector_chain.run, llm, question, material_number, history)

        # Wait for KG chain first
        try:
            kg_result = kg_future.result(timeout=_TIMEOUT_SECONDS)
        except FuturesTimeoutError:
            kg_result = {"answer": None, "error": "timeout", ...}
        except Exception as exc:
            kg_result = {"answer": None, "error": str(exc), ...}

        # Give vector chain whatever budget is left in the 30s window
        elapsed = time.monotonic() - start_time
        try:
            vec_result = vec_future.result(timeout=max(1.0, _TIMEOUT_SECONDS - elapsed))
            #                                     ↑
            #   max(1.0, ...) ensures the vector chain always gets at least 1 second
            #   even if the KG chain used 29.5 seconds
        except FuturesTimeoutError:
            vec_result = {"answer": None, "error": "timeout", ...}

    return merge_results(llm, question, kg_result, vec_result)

The adaptive timeout max(1.0, 30 - elapsed) is subtle but important. If the KG chain takes 25 seconds, the vector chain still gets 5 seconds (not zero). If the KG chain takes 31 seconds (which triggers a timeout exception), the vector chain still gets max(1.0, 30 - 31) = 1 second minimum. Without the max(1.0, ...), a KG chain that exceeds the budget would also kill the vector chain’s remaining time.

The merge logic

# orchestrator.py
def merge_results(llm, question, kg_result, vec_result) -> dict:
    kg_answer  = kg_result.get("answer")
    vec_answer = vec_result.get("answer")

    # Case 1: both chains returned an answer → synthesise with LLM
    if kg_answer and vec_answer:
        return {"answer": _synthesise(llm, question, kg_answer, vec_answer),
                "sources": ["kg", "vector"], ...}

    # Case 2: only KG answered (vector failed or returned nothing)
    if kg_answer:
        return {"answer": kg_answer, "sources": ["kg"], ...}

    # Case 3: only vector answered (KG failed or retry also empty)
    if vec_answer:
        return {"answer": vec_answer, "sources": ["vector"], ...}

    # Case 4: both failed → structured error for observability
    return {"answer": None, "sources": [], "error": {
        "kg":     kg_result.get("error"),
        "vector": vec_result.get("error"),
    }}

The sources list in the response tells you — and your monitoring dashboard — which chains contributed. ["kg", "vector"] means full hybrid synthesis. ["vector"] means the KG chain failed and you are serving a pure vector answer. This distinction matters for debugging quality regressions.

The synthesis prompt

Note: _synthesise() is distinct from kg_srv.summarize_info(). The latter converts raw SPARQL XML rows into a natural language sentence. _synthesise() takes two already-summarised answers and merges them into one grounded response.

# orchestrator.py
def _synthesise(llm, question, kg_answer, vec_answer) -> str:
    prompt = (
        f"Two sources answered the question: {question}\n"
        f"Knowledge graph: {kg_answer}\n"
        f"Vector search: {vec_answer}\n"
        "Give one direct, concise answer (1-3 sentences, no markdown, no bullet points) "
        "combining only what is factually stated above."   # ← grounding constraint
    )
    response = llm.invoke(prompt)
    return _strip_markdown(response.content)

The phrase “combining only what is factually stated above” is important. It tells the LLM: just combine what you were given, don’t add anything from your own knowledge. Without this, the LLM will fill in details that sound right but aren’t from your documents.

What the response envelope looks like

Every /query response includes full retrieval provenance:

{
  "answer":         "Chemical-resistant gloves, safety goggles, and a respirator are required. Keep away from heat sources; flash point is 42°C.",
  "sources":        ["kg", "vector"],
  "kg_sparql":      "SELECT ?s ?p ?o WHERE { GRAPH <MSDS_Graph/MAT001> { ?s ?p ?o . FILTER(...) } }",
  "kg_facts":       [{"s": "Acetone", "p": "REQUIRES", "o": "Chemical_Resistant_Gloves"}, ...],
  "vector_chunks":  3,
  "error":          null
}

When debugging a wrong answer, check sources: if it says ["vector"] when you expected ["kg", "vector"], the KG chain failed — look at kg_sparql to see what query was generated and why it returned nothing.


Step 4 — Production Readiness

4.1 What to Instrument

Add these five metrics to your structured logs. The JSON log format from srv/log_config.py already includes correlation_id, module, and timestamp — add these fields to key events:

1. Source distribution — are both chains firing?

# After merge_results() in orchestrator.py
logger.info("query_complete", extra={
    "correlation_id": correlation_id,
    "sources": merged["sources"],          # ["kg", "vector"] or ["kg"] or ["vector"]
    "elapsed_ms": int((time.monotonic() - start_time) * 1000),
    "material": material_number,
})

Alert if sources == [] rate exceeds 2%. That means both chains failed on the same query.

2. KG retry rate — is vocabulary mismatch common?

# In kg_chain.py, before the retry call
logger.info("kg_retry", extra={
    "correlation_id": correlation_id,
    "material": material_number,
    "retry_reason": "empty_result",
})

Normal: retry rate 10–30%. Higher than 30% means your ontology predicates and your SPARQL prompt vocabulary have drifted apart.

3. Cosine similarity distribution — are retrieved chunks relevant?

# In vector_srv.py, after search_vectors()
top_score = chunks[0]["similarity_score"] if chunks else 0.0
logger.info("vector_search", extra={
    "material": material_number,
    "top_score": round(top_score, 4),
    "chunks_returned": len(chunks),
})

Alert if top_score < 0.50 on most queries. That means the embedded question does not match anything in the corpus — likely the document was not ingested yet, or was ingested with a different embedding model.

4. Chain latency — where is time being spent?

# In kg_chain.py and vector_chain.py
t0 = time.monotonic()
# ... chain logic ...
logger.info("chain_latency", extra={
    "chain": "kg",      # or "vector"
    "elapsed_ms": int((time.monotonic() - t0) * 1000),
})

Normal: KG chain p50 ≈ 1,500–2,500 ms (one LLM call for query generation + SPARQL execution + summarisation). Vector chain p50 ≈ 800–1,500 ms (one embed call + HANA search + summarisation). If KG p95 exceeds 8,000 ms, the SPARQL prompt or graph complexity is the bottleneck.

5. Grounding check — can the answer be traced to a source?

For production monitoring, a lightweight check: does the answer string contain any word from the retrieved kg_facts?

# In orchestrator.py, after merge_results()
kg_subjects = {f["s"].split("/")[-1].lower() for f in merged.get("kg_facts", [])}
answer_words = set(merged["answer"].lower().split()) if merged["answer"] else set()
is_grounded = bool(kg_subjects & answer_words) or "vector" in merged.get("sources", [])
logger.info("grounding_check", extra={"grounded": is_grounded, "sources": merged["sources"]})

This is a rough heuristic, not a guarantee — but it catches the most obvious cases where the LLM ignored the retrieved context entirely.

4.2 Common Pitfalls

Pitfall 1: Open-schema predicates break SPARQL permanently

Symptom: KG chain always returns empty results, retry does not help, kg_facts is always [].

Cause: LLMGraphTransformer was run without allowed_relationships, so the stored predicates are arbitrary LLM inventions (requires_ppe, is_classified_under, mandates_use_of) that no SPARQL pattern will ever match.

Fix: Drop the graph (DROP GRAPH <MSDS_Graph/MAT001>), re-ingest with ontology constraints enabled. There is no way to patch stored predicates — the data must be re-extracted.

# Check what predicates are actually in the graph:
sparql = """
SELECT DISTINCT ?p WHERE {
  GRAPH <MSDS_Graph/MAT001> { ?s ?p ?o }
}
"""
# If this returns things like "requires_ppe", "is_classified_under" → re-ingest needed

Pitfall 2: Shared HANA connections across threads

Symptom: Intermittent Connection is closed or cursor is closed errors. Always happens when load increases (two simultaneous queries, or a query while an ingestion is running).

Cause: A connection object was created on the main thread and passed to worker threads. hdbcli connections are not thread-safe.

Fix: Each thread must call hdb_srv.get_hana_connection() independently. Never pass a connection across thread boundaries. The threading.local() pattern in hdb_srv.py ensures this automatically — but only if all code goes through get_hana_connection() rather than holding a module-level conn variable.

# ❌ Wrong — module-level connection shared by all threads
conn = dbapi.connect(...)

def kg_chain_run(...):
    cursor = conn.cursor()  # another thread might be using conn right now

# ✅ Correct — each thread gets its own connection
def kg_chain_run(...):
    conn = hdb_srv.get_hana_connection()  # thread-local, always safe
    cursor = conn.cursor()

Pitfall 3 (Future): Growing history list will cause prompt bloat

Note: The history parameter is accepted by the chains but not yet integrated into query generation (see TODO in kg_chain.py). When you implement multi-turn context, watch for this pitfall.

Symptom: LLM latency increases with conversation length. Eventually the LLM starts ignoring the question and summarising the conversation history instead.

Cause: The history parameter is passed to the KG and vector chains but not truncated. After 20+ turns it can exceed the LLM context window or push the actual question out of the attention window.

Fix: Truncate history to the last N turns before passing it to the chains:

# In orchestrator.py before submitting to the thread pool
MAX_HISTORY_TURNS = 10
trimmed_history = history[-MAX_HISTORY_TURNS:] if history else []

kg_future  = executor.submit(kg_chain.run,     llm, question, graph_name,     trimmed_history)
vec_future = executor.submit(vector_chain.run, llm, question, material_number, trimmed_history)

In the current production code, history is accepted as a parameter but not yet threaded into the summarisation prompts — see the TODO comment in kg_chain.py. When you add multi-turn context, always add the truncation at the same time.


Summary

Here is the complete module map for the RAG pipeline:

The recommended project layout:

your-project/
├── ontology.ttl                ← vocabulary contract between ingestion and retrieval
├── main.py                     ← FastAPI: /process-upload, /query endpoints
│
├── agents/
│   ├── orchestrator.py         ← supervisor: parallel dispatch + merge results
│   ├── kg_agent.py             ← KG agent: generate SPARQL → execute → retry → summarize
│   └── vector_agent.py         ← vector agent: embed → cosine search → summarize
│
└── services/
    ├── document_srv.py         ← ingestion: PDF → chunks → Thread A + Thread B
    ├── kg_srv.py               ← SPARQL generation + summarization
    ├── vector_srv.py           ← embedding + vector search + summarization
    ├── hana_srv.py             ← HANA connections (threading.local), SPARQL_EXECUTE
    └── llm_srv.py              ← LLM initialization (cached, thread-safe)