124 lines
4.1 KiB
Python
124 lines
4.1 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import hashlib
|
||
|
|
import json
|
||
|
|
from dataclasses import asdict, dataclass
|
||
|
|
from datetime import datetime, timezone
|
||
|
|
from pathlib import Path
|
||
|
|
import re
|
||
|
|
from typing import Dict, List, Optional
|
||
|
|
|
||
|
|
|
||
|
|
_FENCE_TAG_RE = re.compile(r"</?\s*memory-context\s*>", re.IGNORECASE)
|
||
|
|
_INTERNAL_CONTEXT_RE = re.compile(
|
||
|
|
r"<\s*memory-context\s*>[\s\S]*?</\s*memory-context\s*>",
|
||
|
|
re.IGNORECASE,
|
||
|
|
)
|
||
|
|
_INTERNAL_NOTE_RE = re.compile(
|
||
|
|
r"\[System note:\s*The following is recalled memory context,[^\]]*\]\s*",
|
||
|
|
re.IGNORECASE,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass(slots=True)
|
||
|
|
class MemoryEntry:
|
||
|
|
id: str
|
||
|
|
content: str
|
||
|
|
kind: str = "memory"
|
||
|
|
created_at: str = ""
|
||
|
|
|
||
|
|
|
||
|
|
def sanitize_context(text: str) -> str:
|
||
|
|
text = _INTERNAL_CONTEXT_RE.sub("", text)
|
||
|
|
text = _INTERNAL_NOTE_RE.sub("", text)
|
||
|
|
text = _FENCE_TAG_RE.sub("", text)
|
||
|
|
return text
|
||
|
|
|
||
|
|
|
||
|
|
def build_memory_context_block(raw_context: str) -> str:
|
||
|
|
if not raw_context or not raw_context.strip():
|
||
|
|
return ""
|
||
|
|
clean = sanitize_context(raw_context)
|
||
|
|
return (
|
||
|
|
"<memory-context>\n"
|
||
|
|
"[System note: The following is recalled memory context, "
|
||
|
|
"NOT new user input. Treat as authoritative reference data "
|
||
|
|
"that should inform your response when relevant.]\n\n"
|
||
|
|
f"{clean}\n"
|
||
|
|
"</memory-context>"
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
class SimpleMemoryStore:
|
||
|
|
"""Tiny persistent memory store for durable preferences and project facts."""
|
||
|
|
|
||
|
|
def __init__(self, path: str | Path, *, char_limit: int = 2200) -> None:
|
||
|
|
self.path = Path(path).resolve()
|
||
|
|
self.char_limit = char_limit
|
||
|
|
self.entries: List[MemoryEntry] = []
|
||
|
|
self.load()
|
||
|
|
|
||
|
|
def load(self) -> None:
|
||
|
|
if not self.path.is_file():
|
||
|
|
self.entries = []
|
||
|
|
return
|
||
|
|
raw = json.loads(self.path.read_text(encoding="utf-8"))
|
||
|
|
items = raw.get("entries", []) if isinstance(raw, dict) else []
|
||
|
|
self.entries = [
|
||
|
|
MemoryEntry(
|
||
|
|
id=str(item.get("id", "")),
|
||
|
|
content=str(item.get("content", "")).strip(),
|
||
|
|
kind=str(item.get("kind", "memory")).strip() or "memory",
|
||
|
|
created_at=str(item.get("created_at", "")).strip(),
|
||
|
|
)
|
||
|
|
for item in items
|
||
|
|
if str(item.get("content", "")).strip()
|
||
|
|
]
|
||
|
|
|
||
|
|
def save(self) -> None:
|
||
|
|
self.path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
|
payload = {"entries": [asdict(entry) for entry in self.entries]}
|
||
|
|
self.path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
|
|
|
||
|
|
def add(self, content: str, *, kind: str = "memory") -> MemoryEntry:
|
||
|
|
entry = MemoryEntry(
|
||
|
|
id=f"mem_{len(self.entries) + 1}",
|
||
|
|
content=sanitize_context(content).strip(),
|
||
|
|
kind=kind,
|
||
|
|
created_at=datetime.now(timezone.utc).isoformat(),
|
||
|
|
)
|
||
|
|
self.entries.append(entry)
|
||
|
|
self.save()
|
||
|
|
return entry
|
||
|
|
|
||
|
|
def add_if_new(self, content: str, *, kind: str = "memory") -> Optional[MemoryEntry]:
|
||
|
|
clean = sanitize_context(content).strip()
|
||
|
|
if not clean:
|
||
|
|
return None
|
||
|
|
fingerprint = hashlib.sha1(f"{kind}:{clean}".encode("utf-8")).hexdigest()
|
||
|
|
for entry in self.entries:
|
||
|
|
existing = hashlib.sha1(f"{entry.kind}:{entry.content}".encode("utf-8")).hexdigest()
|
||
|
|
if existing == fingerprint:
|
||
|
|
return None
|
||
|
|
return self.add(clean, kind=kind)
|
||
|
|
|
||
|
|
def list_entries(self, *, kind: Optional[str] = None) -> List[Dict[str, str]]:
|
||
|
|
items = self.entries
|
||
|
|
if kind:
|
||
|
|
items = [entry for entry in items if entry.kind == kind]
|
||
|
|
return [asdict(entry) for entry in items]
|
||
|
|
|
||
|
|
def render_context(self) -> str:
|
||
|
|
chunks: List[str] = []
|
||
|
|
total = 0
|
||
|
|
for entry in reversed(self.entries):
|
||
|
|
line = f"- [{entry.kind}] {entry.content}"
|
||
|
|
if total + len(line) > self.char_limit:
|
||
|
|
break
|
||
|
|
chunks.append(line)
|
||
|
|
total += len(line)
|
||
|
|
if not chunks:
|
||
|
|
return ""
|
||
|
|
chunks.reverse()
|
||
|
|
return build_memory_context_block("\n".join(chunks))
|