my_agent/memory.py

124 lines
4.1 KiB
Python
Raw Permalink Normal View History

from __future__ import annotations
import hashlib
import json
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from pathlib import Path
import re
from typing import Dict, List, Optional
_FENCE_TAG_RE = re.compile(r"</?\s*memory-context\s*>", re.IGNORECASE)
_INTERNAL_CONTEXT_RE = re.compile(
r"<\s*memory-context\s*>[\s\S]*?</\s*memory-context\s*>",
re.IGNORECASE,
)
_INTERNAL_NOTE_RE = re.compile(
r"\[System note:\s*The following is recalled memory context,[^\]]*\]\s*",
re.IGNORECASE,
)
@dataclass(slots=True)
class MemoryEntry:
id: str
content: str
kind: str = "memory"
created_at: str = ""
def sanitize_context(text: str) -> str:
text = _INTERNAL_CONTEXT_RE.sub("", text)
text = _INTERNAL_NOTE_RE.sub("", text)
text = _FENCE_TAG_RE.sub("", text)
return text
def build_memory_context_block(raw_context: str) -> str:
if not raw_context or not raw_context.strip():
return ""
clean = sanitize_context(raw_context)
return (
"<memory-context>\n"
"[System note: The following is recalled memory context, "
"NOT new user input. Treat as authoritative reference data "
"that should inform your response when relevant.]\n\n"
f"{clean}\n"
"</memory-context>"
)
class SimpleMemoryStore:
"""Tiny persistent memory store for durable preferences and project facts."""
def __init__(self, path: str | Path, *, char_limit: int = 2200) -> None:
self.path = Path(path).resolve()
self.char_limit = char_limit
self.entries: List[MemoryEntry] = []
self.load()
def load(self) -> None:
if not self.path.is_file():
self.entries = []
return
raw = json.loads(self.path.read_text(encoding="utf-8"))
items = raw.get("entries", []) if isinstance(raw, dict) else []
self.entries = [
MemoryEntry(
id=str(item.get("id", "")),
content=str(item.get("content", "")).strip(),
kind=str(item.get("kind", "memory")).strip() or "memory",
created_at=str(item.get("created_at", "")).strip(),
)
for item in items
if str(item.get("content", "")).strip()
]
def save(self) -> None:
self.path.parent.mkdir(parents=True, exist_ok=True)
payload = {"entries": [asdict(entry) for entry in self.entries]}
self.path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
def add(self, content: str, *, kind: str = "memory") -> MemoryEntry:
entry = MemoryEntry(
id=f"mem_{len(self.entries) + 1}",
content=sanitize_context(content).strip(),
kind=kind,
created_at=datetime.now(timezone.utc).isoformat(),
)
self.entries.append(entry)
self.save()
return entry
def add_if_new(self, content: str, *, kind: str = "memory") -> Optional[MemoryEntry]:
clean = sanitize_context(content).strip()
if not clean:
return None
fingerprint = hashlib.sha1(f"{kind}:{clean}".encode("utf-8")).hexdigest()
for entry in self.entries:
existing = hashlib.sha1(f"{entry.kind}:{entry.content}".encode("utf-8")).hexdigest()
if existing == fingerprint:
return None
return self.add(clean, kind=kind)
def list_entries(self, *, kind: Optional[str] = None) -> List[Dict[str, str]]:
items = self.entries
if kind:
items = [entry for entry in items if entry.kind == kind]
return [asdict(entry) for entry in items]
def render_context(self) -> str:
chunks: List[str] = []
total = 0
for entry in reversed(self.entries):
line = f"- [{entry.kind}] {entry.content}"
if total + len(line) > self.char_limit:
break
chunks.append(line)
total += len(line)
if not chunks:
return ""
chunks.reverse()
return build_memory_context_block("\n".join(chunks))