Who this is for: Python developers on SAP BTP who want to build a smart document Q&A system. No prior knowledge of knowledge graphs or SPARQL needed.
What you will build: A FastAPI service that reads PDFs, stores the data in two ways (a knowledge graph and a vector store) inside SAP HANA Cloud, and answers questions using both stores at the same time — deployed to Cloud Foundry.
Stack: Python 3.11 · FastAPI · LangChain · SAP HANA Cloud · SAP AI Core (Claude via Bedrock) · Cloud Foundry
Figure 1 — Agentic Hybrid RAG Architecture
Before building, let’s look at why existing approaches fall short when you need accurate answers from documents.
Vector RAG works like this: it cuts your documents into pieces (chunks), converts each piece into a list of numbers (a vector), and when you ask a question, it finds the pieces whose numbers are closest to your question’s numbers.
The problem: it finds text that talks about the topic — not necessarily the exact fact you asked for.
Imagine your MSDS document contains this section:
Section 7 — Handling and Storage
The product must be stored below 25°C in a well-ventilated area.
Keep away from sources of ignition. Flash point: 42°C.
Containers must be grounded to prevent static discharge.When you split this document into 500-token pieces, this text ends up mixed with the previous section about composition percentages. The flash point 42°C is in there somewhere, but buried in noise.
Now ask: “What is the flash point of this product?”
The top-5 retrieved chunks might look like this:
[score=0.91] "...The product must be stored below 25°C in a well-ventilated
area. Keep away from sources of ignition. Flash point: 42°C.
Containers must be grounded..."
[score=0.84] "...Flammability classification: Flam. Liq. 3. See Section 9
for physical and chemical properties. Refer to SDS for complete
hazard data..."
[score=0.79] "...Physical state: liquid. Colour: clear. Odour: mild petroleum.
Boiling point: 155°C. Melting point: <-20°C. Vapour pressure..."
[score=0.74] "...Section 9 — Physical and Chemical Properties. All values
measured at 20°C unless otherwise stated..."
[score=0.71] "...For fire-fighting: CO2, dry powder, foam. Do not use water
jets directly on burning liquid. Flash point determined by..."The top chunk contains the answer — but only because this test PDF happened to keep that section together. Change the PDF, change the page layout, or add one more sentence before the flash point line, and it shifts into a different chunk. The answer becomes:
“The product has flammability properties as classified under GHS. Refer to Section 9 for specific physical and chemical data.”
Useless. The LLM has nothing concrete to work with, so it makes up a reasonable-sounding answer — this is hallucination, and it’s the biggest risk with vector-only RAG.
Vector RAG is genuinely good at: - Procedures and narratives — “What should I do if I inhale this product?” retrieves first-aid paragraphs correctly because the question and the answer share vocabulary. - Fuzzy questions — “What protective gear do I need?” matches chunks containing “gloves”, “goggles”, “respirator” even though those exact words weren’t in the query. - Multi-sentence explanations — questions that require a paragraph-length answer rather than a single value.
The failure mode is specific: exact values, codes, and structured facts that may not survive the chunking process.
A knowledge graph stores facts as simple three-part statements: (Subject, Predicate, Object). For example, “Acetone causes skin irritation” becomes:
(Acetone) → CAUSES → (Skin_irritation)When you ask a question, the LLM writes a SPARQL query to look up those stored facts. SPARQL is exact-match — it only finds facts that match the query pattern exactly.
Suppose the document says “Acetone requires the use of chemical-resistant gloves.” The KG extraction pipeline (with our MSDS ontology) stores:
<Acetone> <REQUIRES> <Chemical_Resistant_Gloves>Now a user asks: “What PPE is needed for Acetone?”
The LLM generates this SPARQL:
SELECT ?s ?p ?o WHERE {
GRAPH <MSDS_Graph/MAT001> {
?s ?p ?o .
FILTER(REGEX(str(?p), "HAS_PPE|HAS_PROTECTION|PPE_REQUIREMENT", "i"))
}
}Result: 0 rows.
The predicate in the graph is REQUIRES — not HAS_PPE. The LLM guessed the wrong name. SPARQL returns nothing. No error. The system just silently gives you an empty answer.
This is the main weakness of knowledge graph RAG: the LLM has to guess the exact word that was used when the data was stored. Without help, it’s basically a guessing game.
The fix is simple: limit the vocabulary to a small list (13 predicates in our case) when storing data, and give the same list to the query generator. That way it can’t guess wrong. This is what the ontology does (Step 3.2).
KG retrieval is genuinely excellent at: - Exact attribute values — GHS codes, concentration limits, flash points stored as triples are returned verbatim, not approximated. - Typed relationship traversal — “What hazards are classified as Flam. Liq.?” traces a typed relationship path. - Grounded answers — every fact in the response is a stored triple. The LLM cannot hallucinate values it did not retrieve.
The two failure modes are complementary:
Failure mode Vector RAG KG RAG
| Chunk boundary sensitivity | Yes | No |
| Predicate vocabulary mismatch | No | Yes |
| Hallucination on sparse context | Yes | No |
| Misses narrative/procedural text | No | Yes |
The solution: run both at the same time and combine their answers. This covers both failure modes:
ReAct (Reason + Act) is what makes this system smart instead of just a chain of steps. The idea is simple: after each action, the system looks at what it got back, thinks about whether it’s good enough, and decides what to do next.
This guide uses LangChain with Python’s ThreadPoolExecutor. But the pattern works with any framework — LangGraph, CrewAI, n8n, AutoGen — anything that lets you build an observe → reason → act loop.
This system has three places where it makes decisions based on what it observes:
# Where What it does File
| 1 | KG self-correction | Detects empty SPARQL result, reasons about why (vocabulary mismatch), generates a different query with broadened patterns | agents/kg_chain.py |
| 2 | Supervisor dispatch | Orchestrator runs both chains, observes both results, routes the merge strategy based on which chains succeeded | agents/orchestrator.py |
| 3 | Graceful degradation | When one chain fails, the system does not return an error — it reasons that one result is better than none and serves it directly | agents/orchestrator.py |
Let’s walk through each one.
This is the most important one. It is what separates this system from a static hybrid pipeline.
The problem it solves: When the LLM writes a SPARQL query with the wrong word, the database returns zero results — silently. A basic system would just return “no answer found”. Our system detects this and tries again with a better query.
How it works:
Observe: SPARQL returned 0 <result> elements
Reason: Likely cause is predicate vocabulary mismatch — LLM guessed the
wrong name. Retrying with the same prompt will produce the same wrong name.
I need to change the prompt to force broader matching.
Act: Re-invoke write_query() with an explicit instruction to use
REGEX alternation (UNION of possible predicate names) to
widen the search net.
Observe: New result — if non-empty, continue. If still empty, give up
and signal the orchestrator to fall back to vector.Here is exactly where this loop lives in agents/kg_chain.py:
# KG Agent — the self-correction loop
# Step 1: Generate SPARQL and execute
sparql_query = kg_srv.write_query(llm, question, graph_name)
response_xml, _ = hdb_srv.execute_sparql_query(conn, sparql_query)
# Step 2: OBSERVE — did the query return anything?
if not response_xml or _is_empty_sparql_result(response_xml):
# Step 3: REASON — the cause is almost always vocabulary mismatch.
# Don't retry with the same prompt; change the instruction.
logger.info("KG chain: first attempt returned empty — retrying with relaxed query")
relaxed_question = (
question
+ " (use broader REGEX patterns and UNION to maximise recall)"
# ↑ this instruction changes the LLM's behaviour on the retry
)
# Step 4: ACT — re-invoke with the broadened prompt
retry_sparql = kg_srv.write_query(llm, relaxed_question, graph_name)
if retry_sparql and not retry_sparql.startswith("Error"):
sparql_query = retry_sparql
response_xml, _ = hdb_srv.execute_sparql_query(conn, sparql_query)
# Step 5: OBSERVE again — if still empty, return None (signals orchestrator)What the broadened prompt actually produces:
-- First attempt (LLM guessed one predicate name):
FILTER(REGEX(str(?p), "HAS_PPE", "i"))
-- Result: 0 rows
-- After retry instruction "use broader REGEX and UNION":
FILTER(
REGEX(str(?p), "REQUIRES|HAS_PPE|HAS_PROTECTION|HAS_SAFETY|SAFETY", "i")
)
-- Result: matches REQUIRES → returns the PPE triplesThe LLM changed its own output in response to observed failure. That is the definition of a self-correction loop.
Why only one retry: Each retry costs one LLM call. One retry fixes about 60% of wrong-word failures — the LLM knows multiple possible names and just needs to be told to try all of them. If one retry still returns nothing, the fact probably isn’t in the graph at all — more retries won’t help.
The orchestrator is a supervisor: it sends the question to both agents, waits for their answers, and decides what to do based on what came back.
A basic system would just blindly combine both results. The supervisor actually checks: did the KG agent find something? Did the vector agent find something? Then it picks the best strategy.
# Orchestrator — the supervisor's decision logic
def merge_results(llm, question, kg_result, vec_result) -> dict:
kg_answer = kg_result.get("answer") # None = chain failed or returned nothing
vec_answer = vec_result.get("answer") # None = chain failed or returned nothing
# OBSERVE: what did each chain return?
# REASON + ACT: route based on observed state
if kg_answer and vec_answer:
# Both succeeded — synthesise into a single grounded answer
# Reason: two independent sources agreeing strengthens confidence;
# KG provides the exact fact, vector provides the context
return {"answer": _synthesise(llm, question, kg_answer, vec_answer),
"sources": ["kg", "vector"]}
if kg_answer:
# Only KG succeeded — serve directly without synthesis
# Reason: no point calling _synthesise() with one empty source;
# it would just add an LLM call for no benefit
return {"answer": kg_answer, "sources": ["kg"]}
if vec_answer:
# Only vector succeeded — serve directly
# Reason: KG failed (empty after retry) but vector has something;
# a partial answer is better than no answer
return {"answer": vec_answer, "sources": ["vector"]}
# Both failed — return structured error so the caller can distinguish
# "no answer" from "answer is actually empty" (different monitoring cases)
return {"answer": None, "sources": [], "error": {
"kg": kg_result.get("error"),
"vector": vec_result.get("error"),
}}The sources field in the response is not cosmetic. It is the supervisor’s audit trail — it records which agents contributed to the answer. When you see "sources": ["vector"] in your logs, that is the supervisor telling you: “KG failed on this query. You are getting a vector-only answer. Check the KG chain.”
When one agent fails, the system doesn’t crash — it just uses the other agent’s answer. This means your users always get something useful, and your logs tell you exactly which agent failed so you can fix it.
The degradation ladder:
Both chains succeed → Full hybrid synthesis (best quality)
↓ (KG fails)
KG fails, vector succeeds → Vector-only answer (lower precision, no exact facts)
↓ (vector also fails)
Both chains fail → Structured error dict ({"kg": "timeout", "vector": "..."})The structured error dict in Case 4 is itself agentic behaviour: instead of raising an exception and losing context, the orchestrator captures the per-chain failure reason and returns it as data. Downstream systems can inspect error.kg and error.vector independently to understand what went wrong.
Service What it does in this system Where to create it
| SAP HANA Cloud | Stores both the RDF knowledge graph and the vector embeddings | BTP Cockpit → HANA Cloud |
| SAP AI Core | Hosts the LLM (Claude via AWS Bedrock) and the embedding model | BTP Cockpit → AI Core |
You do not need a separate vector database (no Pinecone, Chroma, or Weaviate). HANA Cloud handles both stores with its SPARQL_EXECUTE stored procedure and REAL_VECTOR column type.
cd backend
python -m venv .venv && source .venv/bin/activate
pip install -r requirements.txtKey packages and why each is needed:
fastapi, uvicorn # HTTP server
hdbcli==2.23.26 # SAP HANA Python driver
langchain==0.3.27 # LLM orchestration framework
langchain-experimental # LLMGraphTransformer (KG extraction from text)
langchain-openai # OpenAIEmbeddings (used via AI Core proxy)
langchain-aws # ChatBedrock (Claude via AI Core / Bedrock)
generative-ai-hub-sdk # SAP AI Core authentication + proxy client
rdflib==7.1.4 # Build and serialise RDF graphs in Python
pypdf # Load PDFs into LangChain Document objectsCreate backend/.env:
# HANA Cloud
HANA_HOST=your-instance.hanacloud.ondemand.com
HANA_PORT=443
HANA_USER=DBADMIN
HANA_PASSWORD=your-password
# SAP AI Core — from the service key in BTP
AICORE_AUTH_URL=https://your-subaccount.authentication.eu10.hana.ondemand.com
AICORE_CLIENT_ID=sb-xxxxx
AICORE_CLIENT_SECRET=xxxxx
AICORE_BASE_URL=https://api.ai.xxx.aws.ml.hana.ondemand.com
# LLM deployment
AICORE_DEPLOYMENT_ID=dxxxxxxxxxxxxxxx
AICORE_BEDROCK_MODEL_ID=anthropic.claude-3-5-sonnet-20241022-v2:0
LLM_MODEL_NAME=claude-3-5-sonnet
# Embedding model (deployed via AI Core)
EMBEDDING_MODEL_NAME=text-embedding-3-small# 1. Test HANA connection
python - <<'EOF'
from hdbcli import dbapi
conn = dbapi.connect(
address="your-instance.hanacloud.ondemand.com",
port=443, user="DBADMIN", password="your-password",
encrypt=True, sslValidateCertificate=False
)
print("HANA OK:", conn.cursor().execute("SELECT 1 FROM DUMMY").fetchone())
EOF
# 2. Test AI Core LLM
python - <<'EOF'
from dotenv import load_dotenv; load_dotenv()
import srv.aic_srv as aic_srv
llm = aic_srv.get_llm()
print(llm.invoke("Say hello in one word").content)
EOF
# 3. Test SPARQL stored procedure exists
python - <<'EOF'
from dotenv import load_dotenv; load_dotenv()
import srv.hdb_srv as hdb_srv
conn = hdb_srv.get_hana_connection()
result, _ = hdb_srv.execute_sparql_query(conn, "SELECT ?s ?p ?o WHERE { ?s ?p ?o } LIMIT 1")
print("SPARQL OK, result:", result)
EOFHANA database connections cannot be shared between threads — if two threads use the same connection at the same time, you get random errors. Since our system runs multiple threads (KG agent, vector agent, ingestion), each thread needs its own connection. Python’s threading.local() solves this:
# HANA connection service
_local = threading.local() # one instance per thread, invisible to other threads
def get_hana_connection():
conn = getattr(_local, 'conn', None)
if conn is not None:
try:
conn.cursor().execute("SELECT 1 FROM DUMMY") # liveness check
return conn
except Exception:
_local.conn = None # stale — reconnect
user, password, address, port = _resolve_credentials()
conn = dbapi.connect(address=address, port=int(port),
user=user, password=password,
encrypt=True, sslValidateCertificate=False)
_local.conn = conn
return connEvery thread that calls get_hana_connection() gets its own connection, created the first time it’s needed. Rule: never pass a connection between threads. All code in this guide follows this rule.
For this guide, we use MSDS (Material Safety Data Sheets) as our example. MSDS documents are one of the most common types of unstructured enterprise data — every company that handles chemicals has hundreds of these PDFs. They contain a mix of structured facts (flash points, GHS codes) and narrative text (handling procedures, first aid), which makes them a perfect test case for hybrid RAG.
You can apply the same approach to any document type — contracts, policies, technical manuals, compliance reports.
LLMGraphTransformer is the LangChain tool that pulls facts out of text. Without rules, it picks whatever words it likes — requires, is_required_for, necessitates, demands. The problem? The query generator will never guess these exact words.
The ontology is a shared word list. The data-writing side and the data-reading side both use the same list. That’s how they stay in sync.
The ontology is a Turtle (.ttl) RDF file. It defines OWL classes (node types) and OWL object properties (predicate types):
@prefix owl: <http://www.w3.org/2002/07/owl#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix msds: <http://msds.knowledge-graph.org/> .
# ── Node types (OWL Classes) ─────────────────────────────────────────────────
msds:Chemical
a owl:Class ;
rdfs:label "Chemical" . # ← rdfs:label is what LLMGraphTransformer sees
msds:Hazard
a owl:Class ;
rdfs:label "Hazard" .
msds:HazardClassification
a owl:Class ;
rdfs:subClassOf msds:Hazard ;
rdfs:label "Hazard Classification" .
msds:PersonalProtectiveEquipment
a owl:Class ;
rdfs:subClassOf msds:SafetyMeasure ;
rdfs:label "Personal Protective Equipment" .
# ── Predicates (OWL Object Properties) ──────────────────────────────────────
msds:requires
a owl:ObjectProperty ;
rdfs:label "REQUIRES" . # ← UPPER_SNAKE_CASE matches SPARQL REGEX patterns
msds:hasHazard
a owl:ObjectProperty ;
rdfs:label "HAS_HAZARD" .
msds:causes
a owl:ObjectProperty ;
rdfs:label "CAUSES" .doc_srv.py loads this file at module import time using rdflib:
# Ontology loader — runs once at startup
def _load_ontology(ttl_path: str):
g = RDFGraph()
g.parse(ttl_path, format="turtle")
# Extract rdfs:label of every OWL Class → allowed node types
nodes = [
str(g.value(c, RDFS.label))
for c in g.subjects(RDF.type, OWL.Class)
if g.value(c, RDFS.label)
]
# Extract rdfs:label of every OWL ObjectProperty → allowed predicates
# Labels are UPPER_SNAKE_CASE (e.g. "REQUIRES", "HAS_HAZARD")
rels = [
str(g.value(p, RDFS.label)).replace(" ", "_").upper()
for p in g.subjects(RDF.type, OWL.ObjectProperty)
if g.value(p, RDFS.label)
]
return nodes, rels
_ONTOLOGY_ALLOWED_NODES, _ONTOLOGY_ALLOWED_RELATIONSHIPS = _load_ontology(_ontology_ttl)
# → _ONTOLOGY_ALLOWED_NODES = ["Chemical", "Hazard", "Hazard Classification", ...]
# → _ONTOLOGY_ALLOWED_RELATIONSHIPS = ["REQUIRES", "HAS_HAZARD", "CAUSES", ...]These lists are passed to LLMGraphTransformer at ingestion time. The LLM is instructed: “Extract triples using only these node types: [Chemical, Hazard, …] and relationships: [REQUIRES, HAS_HAZARD, …]”. Without these constraints, LLMGraphTransformer generates arbitrary predicate names that SPARQL can never match — see Step 4.2, Pitfall 1. The same predicate list is injected into the SPARQL query generation prompt, closing the vocabulary loop.
For a domain other than MSDS, follow this checklist:
CONTAINS or CONTAINED_IN predicate.LLMGraphTransformer on 3–5 sample documents and checking the generated predicates match your list.Before running Step 2: The ontology file (
01-MSDS_Ontology.ttl) must exist at the path configured indoc_srv.py. It is loaded once at module import time via_load_ontology(). Verify the file is in place before starting ingestion — a missing ontology file meansLLMGraphTransformerwill generate unconstrained predicates that SPARQL can never match (see Step 4.2, Pitfall 1).
Before reading the production code, here is the minimal concept:
# Concept (not the real file — see doc_srv.py)
def ingest(pdf_path, material_id, llm, hana_conn):
chunks = chunk_pdf(pdf_path) # Load PDF → split into text chunks
# Thread A: KG extraction
graph_transformer = LLMGraphTransformer(llm=llm,
allowed_nodes=ALLOWED_NODES, allowed_relationships=ALLOWED_RELS)
graph_docs = graph_transformer.convert_to_graph_documents(chunks)
triples = build_rdf_graph(graph_docs)
sparql_insert = build_sparql_insert(triples, f"MSDS_Graph/{material_id}")
execute_sparql_update(hana_conn, sparql_insert) # → HANA named graph
# Thread B: Vector embedding
embeddings = [embed(chunk) for chunk in chunks]
for chunk, vec in zip(chunks, embeddings):
hana_conn.cursor().execute(
"INSERT INTO MSDS_VECTORS VALUES (?, ?, TO_REAL_VECTOR(?), ?)",
(material_id, chunk.page_content, vec_str, idx)
)Both paths read the same chunks list. They write to different HANA stores. They run in parallel threads.
srv/doc_srv.py# Document service
def create_chunks(documents, chunk_size=500, chunk_overlap=50):
text_splitter = TokenTextSplitter(chunk_size=chunk_size, chunk_overlap=50)
chunks = []
for doc in documents:
cleaned_text = clean_text(doc.page_content) # strips quotes, newlines
doc_chunks = text_splitter.split_text(cleaned_text)
for chunk in doc_chunks:
chunks.append(Document(page_content=chunk, metadata=doc.metadata))
return chunksTokenTextSplitter splits by token count (not characters). The chunk_overlap=50 means consecutive chunks share 50 tokens at the edges — so if a fact falls right at a split point, it appears in both chunks and won’t get lost.
_extract_graph()# Ingestion — Thread A (KG extraction)
def _extract_graph(file_path, llm, material_number, chunks) -> dict:
named_graph = f"MSDS_Graph/{material_number}"
conn = hdb_srv.get_hana_connection() # ← thread-local; explained in Step 3.4
# 1. Create the LLM-based graph transformer with ontology constraints
llm_transformer = LLMGraphTransformer(
llm=llm,
allowed_nodes=_ONTOLOGY_ALLOWED_NODES, # e.g. ["Chemical", "Hazard", ...]
allowed_relationships=_ONTOLOGY_ALLOWED_RELATIONSHIPS, # e.g. ["REQUIRES", ...]
)
# 2. Convert all chunks to GraphDocument objects in a thread pool
# Each chunk produces a list of Node and Relationship objects
graph_document_list = _convert_chunks_to_graph_documents(llm_transformer, chunks)
# 3. Build a single rdflib Graph from all nodes and relationships
g = _build_rdf_graph(graph_document_list)
# 4. Serialize to a SPARQL INSERT DATA query targeting the named graph
rdf_triples, sparql_insert_query = _build_sparql_insert(g, named_graph)
# 5. Write to HANA
hdb_srv.execute_sparql_update(conn, sparql_insert_query)
return {"pipeline": "graph", "triples_count": len(rdf_triples)}The most important line is the LLMGraphTransformer constructor. Without allowed_nodes and allowed_relationships, it generates free-form predicates that SPARQL can never match. With them, every stored triple uses a predicate from your 13-item list.
HANA SPARQL has a quirk: local names in URIs cannot start with a digit or underscore. The safe_uri() helper handles this:
# Document service
_NON_URI_SAFE = re.compile(r"[^A-Za-z0-9_\-]")
def safe_uri(string: str) -> URIRef:
local = string.replace(" ", "_").replace("/", "_")
local = _NON_URI_SAFE.sub("", local) # strip all other special chars
if local[0].isdigit() or local[0] == "_":
local = "id_" + local # prefix to avoid HANA parse error
return URIRef(EX_NAMESPACE[local]) # → <http://msds.knowledge-graph.org/Acetone>_embed_and_store_background()# Ingestion — Thread B (vector embedding)
def _embed_and_store_background(llm, chunks, material_number,
file_hash=None, file_content=None):
conn = hdb_srv.get_hana_connection() # ← own thread-local connection
# Dedup: skip if this exact file was already embedded for this material
if file_hash and hdb_srv.check_hash_exists(conn, material_number, file_hash):
return {"pipeline": "vector", "vectors_count": 0, "skipped": True}
# Delegate to vector_srv (see Step 3.4)
count = vector_srv.embed_and_store(llm, conn, chunks, material_number)
return {"pipeline": "vector", "vectors_count": count}process_document()# Entry point — called from POST /process-upload
def process_document(file_path, llm, conn, file_name=None,
attachment_id=None, file_hash=None) -> dict:
material_number = file_name or os.path.basename(file_path)
# Validate: must be a real PDF
with open(file_path, "rb") as f:
if f.read(5) != b"%PDF-":
raise ValueError("Not a valid PDF")
with open(file_path, "rb") as f:
file_content = f.read()
loader = PyPDFLoader(file_path)
documents = loader.load()
chunks = create_chunks(documents) # shared between both threads
# Thread A — KG extraction
thread_graph = threading.Thread(
target=_extract_graph,
args=(file_path, llm, material_number, chunks),
daemon=True, name=f"graph-{material_number}",
)
# Thread B — vector embedding
thread_vector = threading.Thread(
target=_embed_and_store_background,
kwargs={"llm": llm, "chunks": chunks, "material_number": material_number,
"file_hash": file_hash, "file_content": file_content},
daemon=True, name=f"vector-{material_number}",
)
thread_graph.start()
thread_vector.start()
# Returns immediately — HTTP response does not wait for threads to finish
return {"status": "processing", "materialNumber": material_number,
"pipelines": ["graph", "vector"]}The HTTP endpoint returns 202 Accepted immediately. Ingestion runs in the background. KG extraction typically takes 30–120 seconds (one LLM call per chunk). Vector embedding is faster (5–20 seconds). Because they run concurrently in separate threads, the total wall-clock time is max(t_KG, t_vector).
# Concept (not the real file — see agents/kg_chain.py)
def kg_agent(llm, question, graph_name):
sparql = write_query(llm, question, graph_name) # LLM generates SPARQL
result = execute_sparql(conn, sparql) # Run it in HANA
if is_empty(result): # ← the agentic part
sparql = write_query(llm, question + " (use UNION and broader REGEX)")
result = execute_sparql(conn, sparql)
return summarize(llm, question, result) # LLM summarises triplesThe “agentic” label is earned by the if is_empty branch. A static pipeline would return nothing. The agent detects the failure and re-invokes the LLM with a broadened instruction. It does not just retry the same call — it changes the prompt.
srv/kg_srv.pyThe write_query() function sends the LLM a prompt that includes: 1. The named graph URI (so all triples are scoped to the right material) 2. The complete list of allowed predicates from the ontology 3. Several worked examples 4. REGEX usage rules (HANA SPARQL requires full URIs, no PREFIX declarations)
# SPARQL query generator
def write_query(llm, question: str, named_graph: str) -> str:
_validate_graph_name(named_graph.split("/")[-1]) # block SPARQL injection
template = '''You are a SPARQL query expert.
Named graph: {graph}
Known predicates: CAUSES, AFFECTS, REQUIRES, HAS_HAZARD, HAS_EFFECT,
HAS_PRECAUTION, HAS_PROPERTY, HAS_CODE, CONTAINED_IN
RULES:
1. ALL triples inside GRAPH <{graph}> {{ ... }}
2. Use REGEX(str(?var), "pattern", "i") for case-insensitive matching
3. No PREFIX declarations — always full URIs or REGEX
4. Output ONLY the SPARQL SELECT block
Question: {input}
SPARQL Query:'''
prompt = PromptTemplate.from_template(template).invoke({
"input": question, "graph": named_graph
})
raw = llm.invoke(prompt)
return _extract_sparql(raw.content) # regex extracts SELECT block_validate_graph_name() rejects any materialNumber that contains characters outside [A-Za-z0-9_-]. This prevents a user from passing ../evil as a material name and producing a SPARQL query that operates on a different graph.
_extract_sparql() handles the common LLM output patterns: a bare SELECT block, or a SELECT block wrapped in a ```sparql ``` code fence.
srv/hdb_srv.pyHANA Cloud has no native SPARQL endpoint. All SPARQL operations go through the SPARQL_EXECUTE stored procedure. The Content-Type header in parameter 2 tells HANA whether to interpret the payload as a SELECT or an INSERT:
# HANA connection service
def execute_sparql_query(conn, sparql_query):
"""READ — SELECT queries."""
cursor = conn.cursor()
resp = cursor.callproc("SPARQL_EXECUTE", (
sparql_query,
"Accept: application/sparql-results+xml "
"Content-Type: application/sparql-query", # ← SELECT header
"?",
None,
))
return resp[2], resp[3] # (xml_result_string, metadata)
def execute_sparql_update(conn, sparql_update):
"""WRITE — INSERT DATA / DROP GRAPH."""
cursor = conn.cursor()
resp = cursor.callproc("SPARQL_EXECUTE", (
sparql_update,
"Accept: application/sparql-results+xml "
"Content-Type: application/sparql-update", # ← UPDATE header
"?",
None,
))
return resp[2], resp[3]The result of a SELECT comes back as an XML string in SPARQL Results Format. The KG chain parses this XML to extract the fact rows.
agents/kg_chain.py# kg_agent.py
def run(llm, question: str, graph_name: str, history: list[dict]) -> dict:
conn = hdb_srv.get_hana_connection()
try:
# 1. Generate SPARQL from the question
sparql_query = kg_srv.write_query(llm, question, graph_name)
if not sparql_query or sparql_query.startswith("Error"):
return {"answer": None, "error": sparql_query, ...}
# 2. Execute against HANA
response_xml, _ = hdb_srv.execute_sparql_query(conn, sparql_query)
# 3. ── Agentic retry on empty result ─────────────────────────────
if not response_xml or _is_empty_sparql_result(response_xml):
logger.info("Empty result — retrying with relaxed query")
relaxed_question = (
question
+ " (use broader REGEX patterns and UNION to maximise recall)"
)
retry_sparql = kg_srv.write_query(llm, relaxed_question, graph_name)
if retry_sparql and not retry_sparql.startswith("Error"):
sparql_query = retry_sparql
response_xml, _ = hdb_srv.execute_sparql_query(conn, sparql_query)
# ─────────────────────────────────────────────────────────────────
# 4. Summarise the XML fact rows into a natural language answer
answer = kg_srv.summarize_info(llm, question, response_xml)
facts = _parse_sparql_xml_to_list(response_xml) if response_xml else []
# 5. Treat "no information found" as a null result (signals vector fallback)
if answer and answer.lower().startswith("no information found"):
answer = None
return {"answer": answer, "sparql_query": sparql_query, "facts": facts,
"source": "kg", "error": None}
except Exception as exc:
logger.exception("KG chain error")
return {"answer": None, "error": str(exc), ...}Why the retry works: The broadened prompt includes the phrase "use UNION and broader REGEX patterns". The LLM responds by generating a FILTER with REGEX alternation:
-- First attempt (narrow):
FILTER(REGEX(str(?p), "HAS_PPE", "i"))
-- Returns 0 rows because predicate is REQUIRES, not HAS_PPE
-- Retry (broadened):
FILTER(REGEX(str(?p), "REQUIRES|HAS_PPE|HAS_PROTECTION|SAFETY", "i"))
-- Matches REQUIRES → returns the PPE triplesThe retry costs exactly one additional LLM call. It is bounded — there is no loop, no exponential backoff. If the retry also returns empty, answer = None and the orchestrator falls back to the vector result.
Parsing the XML result:
# kg_agent.py
def _is_empty_sparql_result(xml_text: str) -> bool:
"""True when the SPARQL XML result set contains zero <result> elements."""
import xml.etree.ElementTree as ET
_NS = "http://www.w3.org/2005/sparql-results#"
root = ET.fromstring(xml_text)
return len(root.findall(f".//{{{_NS}}}result")) == 0
def _parse_sparql_xml_to_list(xml_text: str) -> list[dict]:
"""Convert SPARQL XML rows into [{name: value, ...}, ...] dicts."""
_NS = "http://www.w3.org/2005/sparql-results#"
root = ET.fromstring(xml_text)
rows = []
for result in root.findall(f".//{{{_NS}}}result"):
row = {}
for binding in result.findall(f"{{{_NS}}}binding"):
name = binding.get("name")
uri = binding.find(f"{{{_NS}}}uri")
lit = binding.find(f"{{{_NS}}}literal")
node = uri if uri is not None else lit
row[name] = node.text if node is not None else ""
rows.append(row)
return rowssrv/vector_srv.py# Vector service
def _get_embeddings() -> OpenAIEmbeddings:
"""Return an OpenAIEmbeddings instance pointing at the AI Core proxy."""
import os
model = os.getenv("EMBEDDING_MODEL_NAME", "text-embedding-3-small")
return OpenAIEmbeddings(proxy_model_name=model) # SAP AI Core proxy
def embed_question(question: str) -> list[float]:
embeddings = _get_embeddings()
return embeddings.embed_query(question) # → list of 1536 floatsOpenAIEmbeddings(proxy_model_name=...) from generative-ai-hub-sdk routes through SAP AI Core’s OpenAI-compatible proxy. The return value is a plain Python list of floats — the same type that gets stored in HANA and used at query time.
srv/hdb_srv.pyThe vector table does not need to exist before first use. ensure_vector_table() checks SYS.TABLES and creates it only if absent. The embedding dimension is baked into the REAL_VECTOR column at creation time:
# HANA connection service
def ensure_vector_table(conn, dim: int) -> None:
cursor = conn.cursor()
# HANA Cloud does not support CREATE TABLE IF NOT EXISTS — check SYS.TABLES first
cursor.execute(
"SELECT COUNT(*) FROM SYS.TABLES "
"WHERE TABLE_NAME = 'MSDS_VECTORS' AND SCHEMA_NAME = CURRENT_SCHEMA"
)
if cursor.fetchone()[0] > 0:
return # already exists
ddl = f"""CREATE TABLE MSDS_VECTORS (
ID NVARCHAR(36) NOT NULL DEFAULT SYSUUID,
MATERIAL_NUMBER NVARCHAR(40) NOT NULL,
CHUNK_TEXT NCLOB NOT NULL,
EMBEDDING REAL_VECTOR({dim}), -- ← dimension from first embed call
CHUNK_INDEX INTEGER NOT NULL,
PRIMARY KEY (ID)
)"""
cursor.execute(ddl)
conn.commit()text-embedding-3-small produces 1536-dimensional vectors. If you switch to text-embedding-3-large (3072 dimensions), the table is automatically created with the correct dimension on first use. No schema migration needed.
srv/vector_srv.py# Vector service
def embed_and_store(llm, conn, chunks: list, material_number: str) -> int:
embeddings = _get_embeddings()
# Embed first chunk to determine dimension, then create table if needed
first_vec = embeddings.embed_query(chunks[0].page_content)
dim = len(first_vec)
hdb_srv.ensure_vector_table(conn, dim)
# Batch insert — serialize the float list as "[0.123, -0.456, ...]"
insert_sql = (
"INSERT INTO MSDS_VECTORS (MATERIAL_NUMBER, CHUNK_TEXT, EMBEDDING, CHUNK_INDEX) "
"VALUES (?, ?, TO_REAL_VECTOR(?), ?)"
)
vectors = [first_vec] + [embeddings.embed_query(c.page_content) for c in chunks[1:]]
cursor = conn.cursor()
for idx, (chunk, vec) in enumerate(zip(chunks, vectors)):
vec_str = "[" + ",".join(str(v) for v in vec) + "]"
cursor.execute(insert_sql, (material_number, chunk.page_content, vec_str, idx))
conn.commit()
return len(chunks)TO_REAL_VECTOR(?) is a HANA function that converts the string "[0.123, ...]" into a native REAL_VECTOR value for storage. At search time, the same conversion applies to the query vector.
srv/vector_srv.py# Vector service
def search_vectors(conn, embedding: list[float], material_number: str,
top_k: int = 5) -> list[dict]:
vec_str = "[" + ",".join(str(v) for v in embedding) + "]"
sql = (
"SELECT TOP ? CHUNK_TEXT, "
"COSINE_SIMILARITY(EMBEDDING, TO_REAL_VECTOR(?)) AS SIM "
"FROM MSDS_VECTORS "
"WHERE MATERIAL_NUMBER = ? " # ← isolate by material
"ORDER BY SIM DESC"
)
cursor = conn.cursor()
cursor.execute(sql, (top_k, vec_str, material_number))
return [{"chunk_text": row[0], "similarity_score": float(row[1])}
for row in cursor.fetchall()]COSINE_SIMILARITY(EMBEDDING, TO_REAL_VECTOR(?)) is a native HANA function. The WHERE MATERIAL_NUMBER = ? filter is critical — without it, a query for material MAT001 would retrieve chunks from all other ingested documents.
hdbcli connections are not thread-safe for shared use. The KG chain, vector chain, and two ingestion threads run concurrently. Each must have its own connection.
The solution is threading.local():
# HANA connection service
_local = threading.local() # one instance per thread, invisible to other threads
def get_hana_connection():
conn = getattr(_local, 'conn', None)
if conn is not None:
try:
conn.cursor().execute("SELECT 1 FROM DUMMY") # liveness check
return conn
except Exception:
_local.conn = None # stale — reconnect
user, password, address, port = _resolve_credentials()
conn = dbapi.connect(address=address, port=int(port),
user=user, password=password,
encrypt=True, sslValidateCertificate=False)
_local.conn = conn
return connEvery thread that calls get_hana_connection() gets its own connection, created lazily on first call. The liveness check (SELECT 1 FROM DUMMY) reconnects automatically if the connection was dropped by HANA’s idle timeout.
agents/orchestrator.py# orchestrator.py
_TIMEOUT_SECONDS = 30
def run(llm, question: str, graph_name: str, history: list[dict]) -> dict:
material_number = graph_name.split("/")[-1] # "MSDS_Graph/MAT001" → "MAT001"
start_time = time.monotonic()
with ThreadPoolExecutor(max_workers=2) as executor:
kg_future = executor.submit(kg_chain.run, llm, question, graph_name, history)
vec_future = executor.submit(vector_chain.run, llm, question, material_number, history)
# Wait for KG chain first
try:
kg_result = kg_future.result(timeout=_TIMEOUT_SECONDS)
except FuturesTimeoutError:
kg_result = {"answer": None, "error": "timeout", ...}
except Exception as exc:
kg_result = {"answer": None, "error": str(exc), ...}
# Give vector chain whatever budget is left in the 30s window
elapsed = time.monotonic() - start_time
try:
vec_result = vec_future.result(timeout=max(1.0, _TIMEOUT_SECONDS - elapsed))
# ↑
# max(1.0, ...) ensures the vector chain always gets at least 1 second
# even if the KG chain used 29.5 seconds
except FuturesTimeoutError:
vec_result = {"answer": None, "error": "timeout", ...}
return merge_results(llm, question, kg_result, vec_result)The adaptive timeout max(1.0, 30 - elapsed) is subtle but important. If the KG chain takes 25 seconds, the vector chain still gets 5 seconds (not zero). If the KG chain takes 31 seconds (which triggers a timeout exception), the vector chain still gets max(1.0, 30 - 31) = 1 second minimum. Without the max(1.0, ...), a KG chain that exceeds the budget would also kill the vector chain’s remaining time.
# orchestrator.py
def merge_results(llm, question, kg_result, vec_result) -> dict:
kg_answer = kg_result.get("answer")
vec_answer = vec_result.get("answer")
# Case 1: both chains returned an answer → synthesise with LLM
if kg_answer and vec_answer:
return {"answer": _synthesise(llm, question, kg_answer, vec_answer),
"sources": ["kg", "vector"], ...}
# Case 2: only KG answered (vector failed or returned nothing)
if kg_answer:
return {"answer": kg_answer, "sources": ["kg"], ...}
# Case 3: only vector answered (KG failed or retry also empty)
if vec_answer:
return {"answer": vec_answer, "sources": ["vector"], ...}
# Case 4: both failed → structured error for observability
return {"answer": None, "sources": [], "error": {
"kg": kg_result.get("error"),
"vector": vec_result.get("error"),
}}The sources list in the response tells you — and your monitoring dashboard — which chains contributed. ["kg", "vector"] means full hybrid synthesis. ["vector"] means the KG chain failed and you are serving a pure vector answer. This distinction matters for debugging quality regressions.
Note:
_synthesise()is distinct fromkg_srv.summarize_info(). The latter converts raw SPARQL XML rows into a natural language sentence._synthesise()takes two already-summarised answers and merges them into one grounded response.
# orchestrator.py
def _synthesise(llm, question, kg_answer, vec_answer) -> str:
prompt = (
f"Two sources answered the question: {question}\n"
f"Knowledge graph: {kg_answer}\n"
f"Vector search: {vec_answer}\n"
"Give one direct, concise answer (1-3 sentences, no markdown, no bullet points) "
"combining only what is factually stated above." # ← grounding constraint
)
response = llm.invoke(prompt)
return _strip_markdown(response.content)The phrase “combining only what is factually stated above” is important. It tells the LLM: just combine what you were given, don’t add anything from your own knowledge. Without this, the LLM will fill in details that sound right but aren’t from your documents.
Every /query response includes full retrieval provenance:
{
"answer": "Chemical-resistant gloves, safety goggles, and a respirator are required. Keep away from heat sources; flash point is 42°C.",
"sources": ["kg", "vector"],
"kg_sparql": "SELECT ?s ?p ?o WHERE { GRAPH <MSDS_Graph/MAT001> { ?s ?p ?o . FILTER(...) } }",
"kg_facts": [{"s": "Acetone", "p": "REQUIRES", "o": "Chemical_Resistant_Gloves"}, ...],
"vector_chunks": 3,
"error": null
}When debugging a wrong answer, check sources: if it says ["vector"] when you expected ["kg", "vector"], the KG chain failed — look at kg_sparql to see what query was generated and why it returned nothing.
Add these five metrics to your structured logs. The JSON log format from srv/log_config.py already includes correlation_id, module, and timestamp — add these fields to key events:
# After merge_results() in orchestrator.py
logger.info("query_complete", extra={
"correlation_id": correlation_id,
"sources": merged["sources"], # ["kg", "vector"] or ["kg"] or ["vector"]
"elapsed_ms": int((time.monotonic() - start_time) * 1000),
"material": material_number,
})Alert if sources == [] rate exceeds 2%. That means both chains failed on the same query.
# In kg_chain.py, before the retry call
logger.info("kg_retry", extra={
"correlation_id": correlation_id,
"material": material_number,
"retry_reason": "empty_result",
})Normal: retry rate 10–30%. Higher than 30% means your ontology predicates and your SPARQL prompt vocabulary have drifted apart.
# In vector_srv.py, after search_vectors()
top_score = chunks[0]["similarity_score"] if chunks else 0.0
logger.info("vector_search", extra={
"material": material_number,
"top_score": round(top_score, 4),
"chunks_returned": len(chunks),
})Alert if top_score < 0.50 on most queries. That means the embedded question does not match anything in the corpus — likely the document was not ingested yet, or was ingested with a different embedding model.
# In kg_chain.py and vector_chain.py
t0 = time.monotonic()
# ... chain logic ...
logger.info("chain_latency", extra={
"chain": "kg", # or "vector"
"elapsed_ms": int((time.monotonic() - t0) * 1000),
})Normal: KG chain p50 ≈ 1,500–2,500 ms (one LLM call for query generation + SPARQL execution + summarisation). Vector chain p50 ≈ 800–1,500 ms (one embed call + HANA search + summarisation). If KG p95 exceeds 8,000 ms, the SPARQL prompt or graph complexity is the bottleneck.
For production monitoring, a lightweight check: does the answer string contain any word from the retrieved kg_facts?
# In orchestrator.py, after merge_results()
kg_subjects = {f["s"].split("/")[-1].lower() for f in merged.get("kg_facts", [])}
answer_words = set(merged["answer"].lower().split()) if merged["answer"] else set()
is_grounded = bool(kg_subjects & answer_words) or "vector" in merged.get("sources", [])
logger.info("grounding_check", extra={"grounded": is_grounded, "sources": merged["sources"]})This is a rough heuristic, not a guarantee — but it catches the most obvious cases where the LLM ignored the retrieved context entirely.
Symptom: KG chain always returns empty results, retry does not help, kg_facts is always [].
Cause: LLMGraphTransformer was run without allowed_relationships, so the stored predicates are arbitrary LLM inventions (requires_ppe, is_classified_under, mandates_use_of) that no SPARQL pattern will ever match.
Fix: Drop the graph (DROP GRAPH <MSDS_Graph/MAT001>), re-ingest with ontology constraints enabled. There is no way to patch stored predicates — the data must be re-extracted.
# Check what predicates are actually in the graph:
sparql = """
SELECT DISTINCT ?p WHERE {
GRAPH <MSDS_Graph/MAT001> { ?s ?p ?o }
}
"""
# If this returns things like "requires_ppe", "is_classified_under" → re-ingest neededSymptom: Intermittent Connection is closed or cursor is closed errors. Always happens when load increases (two simultaneous queries, or a query while an ingestion is running).
Cause: A connection object was created on the main thread and passed to worker threads. hdbcli connections are not thread-safe.
Fix: Each thread must call hdb_srv.get_hana_connection() independently. Never pass a connection across thread boundaries. The threading.local() pattern in hdb_srv.py ensures this automatically — but only if all code goes through get_hana_connection() rather than holding a module-level conn variable.
# ❌ Wrong — module-level connection shared by all threads
conn = dbapi.connect(...)
def kg_chain_run(...):
cursor = conn.cursor() # another thread might be using conn right now
# ✅ Correct — each thread gets its own connection
def kg_chain_run(...):
conn = hdb_srv.get_hana_connection() # thread-local, always safe
cursor = conn.cursor()Note: The history parameter is accepted by the chains but not yet integrated into query generation (see TODO in kg_chain.py). When you implement multi-turn context, watch for this pitfall.
Symptom: LLM latency increases with conversation length. Eventually the LLM starts ignoring the question and summarising the conversation history instead.
Cause: The history parameter is passed to the KG and vector chains but not truncated. After 20+ turns it can exceed the LLM context window or push the actual question out of the attention window.
Fix: Truncate history to the last N turns before passing it to the chains:
# In orchestrator.py before submitting to the thread pool
MAX_HISTORY_TURNS = 10
trimmed_history = history[-MAX_HISTORY_TURNS:] if history else []
kg_future = executor.submit(kg_chain.run, llm, question, graph_name, trimmed_history)
vec_future = executor.submit(vector_chain.run, llm, question, material_number, trimmed_history)In the current production code, history is accepted as a parameter but not yet threaded into the summarisation prompts — see the TODO comment in kg_chain.py. When you add multi-turn context, always add the truncation at the same time.
Here is the complete module map for the RAG pipeline:
The recommended project layout:
your-project/
├── ontology.ttl ← vocabulary contract between ingestion and retrieval
├── main.py ← FastAPI: /process-upload, /query endpoints
│
├── agents/
│ ├── orchestrator.py ← supervisor: parallel dispatch + merge results
│ ├── kg_agent.py ← KG agent: generate SPARQL → execute → retry → summarize
│ └── vector_agent.py ← vector agent: embed → cosine search → summarize
│
└── services/
├── document_srv.py ← ingestion: PDF → chunks → Thread A + Thread B
├── kg_srv.py ← SPARQL generation + summarization
├── vector_srv.py ← embedding + vector search + summarization
├── hana_srv.py ← HANA connections (threading.local), SPARQL_EXECUTE
└── llm_srv.py ← LLM initialization (cached, thread-safe)You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.
| User | Count |
|---|---|
| 19 | |
| 17 | |
| 14 | |
| 10 | |
| 10 | |
| 10 | |
| 9 | |
| 8 | |
| 6 | |
| 5 |