If you've worked with LLM-powered agents — whether they handle support tickets, automate procurement, or assist with SAP operations you've hit this wall: every conversation starts from scratch.
A user tells the agent they're running SAP S/4HANA Cloud 2023. They mention their month-end close keeps timing out. They say they prefer getting troubleshooting steps over email. Next session? The agent has no idea. The user repeats themselves. The experience feels broken.
This isn't a prompt engineering problem. It's an architecture problem. LLMs don't have persistent memory. LangGraph gives us powerful orchestration, but out of the box, each graph invocation is stateless. Conversation history lives only within a single session.
For SAP enterprise workflows, this is a dealbreaker. Users interact with agents over days and weeks. They build context. They expect the agent to remember — just like a human colleague would.
agent-memory-layer is a reusable Python library that gives LangGraph agents persistent memory across sessions. It works like this:
The library plugs directly into any LangGraph application as two nodes: inject_memory at the start and record_memory at the end.
inject_memory → your_agent → record_memory → END
│ │
▼ ▼
Reads facts Saves conversation
from PostgreSQL to PostgreSQL
Let me walk you through the complete lifecycle, from recording a conversation to injecting memories in the next session. This is where the engineering decisions matter.
┌─────────────────────────────────────────────┐
│ Stage 1: INJECT │
│ Load relevant facts into agent state │
├─────────────────────────────────────────────┤
│ Stage 2: RECORD │
│ Save raw conversation as an episode │
├─────────────────────────────────────────────┤
│ Stage 3: CONSOLIDATE │
│ LLM extracts reusable facts from episodes │
├─────────────────────────────────────────────┤
│ Stage 4: DECAY │
│ Gradually reduce importance of old facts │
└─────────────────────────────────────────────┘Let's trace through each stage with the actual code flow.
When a LangGraph session starts, the inject_node runs first. It reads the user/agent scope from the state, queries PostgreSQL for the most important facts, and adds them to the state as a WorkingMemoryFrame.
async def inject_node(state):
user_id = state.get("user_id")
agent_id = state.get("agent_id")
workflow_id = state.get("workflow_id")
# fetch top facts by importance (default: min 0.1, max 20 facts)
items = await memory_repo.fetch_by_scope(
user_id=user_id,
agent_id=agent_id,
workflow_id=workflow_id,
min_importance=0.1,
limit=20,
)
frame = WorkingMemoryFrame(
user_id=user_id,
agent_id=agent_id,
workflow_id=workflow_id,
injected_facts=[item.content for item in items],
)
return {**state, "working_memory": frame.model_dump()}
The SQL behind fetch_by_scope orders facts by importance descending:
SELECT * FROM sap_agent_memory.memory_items
WHERE user_id IS NOT DISTINCT FROM $1
AND agent_id IS NOT DISTINCT FROM $2
AND workflow_id IS NOT DISTINCT FROM $3
AND importance >= $4
ORDER BY importance DESC
LIMIT $5
The IS NOT DISTINCT FROM operator is critical here, it handles NULL scope fields correctly, which standard = does not.
Your agent node can then use these injected facts in its system prompt:
async def your_agent_node(state):
facts = state.get("working_memory", {}).get("injected_facts", [])
system = "You are a helpful assistant."
if facts:
system += "\n\nYou know these facts about the user:\n"
system += "\n".join(f"- {f}" for f in facts)
# call LLM with enriched system prompt...
After the agent responds, the record_node saves the entire conversation as an EpisodeRecord:
async def record_node(state):
messages = state.get("messages", [])
if not messages:
return state
episode = EpisodeRecord(
user_id=state.get("user_id"),
agent_id=state.get("agent_id"),
workflow_id=state.get("workflow_id"),
messages=messages, # raw conversation
consolidated=False, # not yet processed
)
await episode_repo.save(episode)
return state
The episode is stored with consolidated=False. It sits in the database waiting for the consolidation pipeline to process it.
The database uses a partial index on unconsolidated episodes for fast lookups:
CREATE INDEX idx_episodes_unconsolidated
ON sap_agent_memory.episodes (consolidated)
WHERE consolidated = FALSE;
This index only covers rows where consolidated=FALSE, making it much smaller and faster than a full index.
This is the core intelligence of the system. A background scheduler runs the consolidation pipeline periodically (default: every hour). Here's the step-by-step flow:
Step 1 : Load unconsolidated episodes:
episodes = await episode_repo.get_unconsolidated(
user_id=user_id,
agent_id=agent_id,
workflow_id=workflow_id,
)
Step 2 : Idempotency check via SHA-256 checksum:
checksum = hashlib.sha256(
json.dumps([e.messages for e in episodes], sort_keys=True).encode()
).hexdigest()
last_job = await job_repo.get_last_for_scope(...)
if last_job and last_job.content_checksum == checksum:
return last_job # Nothing new — skip the LLM call
This is a critical optimization. If the same set of episodes was already processed, the pipeline skips the LLM call entirely. No wasted tokens, no duplicate facts.
Step 3: Format episodes and call the LLM:
The episodes are formatted into a readable text block:
------Episode 1 ---------
HUMAN: Hi! I'm a Python developer working on SAP integrations.
AI: That sounds great. PostgreSQL is a robust choice for your projects.
------Episode 2 ---------
HUMAN: What language should I use for my next REST API?
AI: Given your Python expertise, Flask or FastAPI would be excellent choices.
This is sent to SAP AI Core with a carefully crafted system prompt that instructs the LLM to:
content and importance (0.0–1.0) fields[] if no useful facts existllm = LLM(name="gpt-4o", parameters={"temperature": 0.0, "max_tokens": 2048})
service = OrchestrationService(
config=OrchestrationConfig(llm=llm, template=template)
)
response = service.run()
Step 4 : Parse and store facts:
The LLM returns something like:
[
{"content": "User is a Python developer working on SAP integrations.", "importance": 0.85},
{"content": "User prefers async code and PostgreSQL.", "importance": 0.75}
]
Each fact becomes a MemoryItem and is upserted into PostgreSQL:
for fact in facts:
item = MemoryItem(
user_id=user_id,
agent_id=agent_id,
workflow_id=workflow_id,
content=fact["content"],
importance=fact["importance"],
)
await memory_repo.upsert(item)
Step 5 : Mark episodes as consolidated and update the job record:
await episode_repo.mark_consolidated([e.id for e in episodes])
await job_repo.update_status(
job.id,
status="completed",
facts_created=len(facts),
content_checksum=checksum,
)
The entire pipeline is wrapped in error handling. If anything fails, the job is marked as failed with the error message, and the episodes remain unconsolidated for the next run.
Not all memories should live forever. A daily background job decays importance scores:
# phase 1: reduce importance
UPDATE sap_agent_memory.memory_items
SET importance = importance * 0.95
WHERE importance * 0.95 >= 0.05
AND scope matches;
# phase 2: delete memories below threshold
DELETE FROM sap_agent_memory.memory_items
WHERE importance < 0.05
AND scope matches;
With a decay factor of 0.95 per day:
Day Importance
| 0 | 0.85 |
| 10 | 0.51 |
| 30 | 0.18 |
| 50 | 0.06 |
| 55 | 0.05 → deleted |
This means a fact with importance 0.85 survives roughly 55 days before being cleaned up. High-importance facts (0.9+) live longer. Low-importance facts fade quickly. The system self-cleans without manual intervention.
Memory is isolated by scope, meaning - a combination of user_id, agent_id, and workflow_id. All three are optional. This gives you flexible isolation:
Use Case Scope
| Per-user memory | user_id only |
| Per-agent memory | agent_id only |
| Per user+agent pair | user_id + agent_id |
| Fully isolated runs | All three fields |
Different users talking to the same agent get their own memory. Different agents for the same user can share or isolate — you decide.
pip install "agent-memory-layer[langgraph]"
MemoryManager is the single entry point that handles everything , database connection, schema migration, node creation, and background scheduling:
from agent_memory_layer import MemoryManager
manager = await MemoryManager.create(
db_url="postgresql://user:pass@host/db",
aicore_client_id="...",
aicore_client_secret="...",
aicore_auth_url="...",
aicore_base_url="...",
)
# wire into your LangGraph
graph.add_node("inject_memory", manager.inject_node)
graph.add_node("agent", your_agent_node)
graph.add_node("record_memory", manager.record_node)
graph.set_entry_point("inject_memory")
graph.add_edge("inject_memory", "agent")
graph.add_edge("agent", "record_memory")
graph.add_edge("record_memory", END)
# when done
await manager.close()
That's it. Three nodes. Persistent memory across sessions.
When you call MemoryManager.create(), it:
inject_node and record_node functionsWhen you call manager.close(), it stops the scheduler and closes the pool.
Three tables power the system:
┌──────────────────────────────────────────────────────┐
│ episodes │
│ id, user_id, agent_id, workflow_id, │
│ messages (JSONB), created_at, consolidated (bool) │
├──────────────────────────────────────────────────────┤
│ memory_items │
│ id, user_id, agent_id, workflow_id, │
│ content, importance, memory_types, │
│ created_at, updated_at │
├──────────────────────────────────────────────────────┤
│ consolidation_jobs │
│ id, scope_*, status, episodes_processed, │
│ facts_created, content_checksum, │
│ started_at, finished_at, error │
└──────────────────────────────────────────────────────┘
Key design decisions: - JSONB for messages - flexible schema, no rigid column structure for conversation data - Partial index on unconsolidated episodes - only indexes consolidated=FALSE rows for fast consolidation queries - Importance index on memory_items - enables efficient decay operations - Content checksum on jobs — SHA-256 idempotency prevents duplicate LLM calls
The library includes a self-contained E2E test that builds a mini LangGraph, runs two conversation turns, and proves memories carry across:
=== agent-memory-layer E2E Test ===
[1/8] Starting MemoryManager... ✓ OK
[2/8] Building mini LangGraph... ✓ OK
[3/8] Turn 1: "I'm a Python developer..."
→ Injected facts: 0 (first turn, empty)
→ AI responds ✓ OK
[4/8] Consolidating episodes...
→ Facts created: 5 ✓ OK
[5/8] Turn 2: "What language for my project?"
→ Injected facts: 5 (memories from Turn 1!)
→ AI responds with context ✓ OK
[6/8] Verifying memory injection... ✓ OK
[7/8] Testing decay...
→ Before: [0.90, 0.80, 0.70, 0.70, 0.60]
→ After: [0.72, 0.64, 0.56, 0.56, 0.48] ✓ OK
[8/8] Closing MemoryManager... ✓ OK
Turn 1 has zero injected facts - the agent starts with a blank slate. After consolidation extracts facts, Turn 2 receives all five facts from Turn 1. The agent now has context. The decay step confirms importance scores decrease as expected.
Enterprise AI agents aren't chatbots. They're tools people use daily for complex, ongoing tasks:
Without persistent memory, every interaction resets. With agent-memory-layer, agents build understanding over time , just like the human experts they assist.
The library is open source, available on PyPI (pip install agent-memory-layer), and designed to plug into any LangGraph application with minimal setup. It uses SAP AI Core for LLM calls and PostgreSQL for storage - infrastructure most SAP teams already have.
pip install "agent-memory-layer[langgraph]"
Check out the GitHub repository for full documentation, configuration reference, and the E2E test script.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.
| User | Count |
|---|---|
| 27 | |
| 17 | |
| 12 | |
| 12 | |
| 6 | |
| 6 | |
| 6 | |
| 6 | |
| 5 | |
| 5 |