homelab-brain/homelab-ai-bot/memory_client.py

184 lines
5.8 KiB
Python

"""Client fuer den Memory-Service (CT 117).
Stellt Session-Management und Memory-Zugriff bereit.
Kein Import von Bot- oder LLM-Logik — reiner HTTP-Client.
"""
import logging
import time
import uuid
from typing import Optional
import requests
from core import config
log = logging.getLogger("memory_client")
_cfg = None
_base_url = None
_token = None
SESSION_TIMEOUT = 1800 # 30 Minuten Inaktivitaet = neue Session
_active_sessions: dict[str, dict] = {} # channel_key -> {id, last_activity}
def _ensure_config():
global _cfg, _base_url, _token
if _base_url:
return
_cfg = config.parse_config()
_base_url = _cfg.raw.get("MEMORY_API_URL", "").rstrip("/")
_token = _cfg.raw.get("MEMORY_API_TOKEN", "")
if not _base_url or not _token:
log.warning("MEMORY_API_URL oder MEMORY_API_TOKEN nicht in homelab.conf")
def _headers():
return {"Authorization": f"Bearer {_token}", "Content-Type": "application/json"}
def _post(path: str, data: dict) -> Optional[dict]:
_ensure_config()
if not _base_url:
return None
try:
r = requests.post(f"{_base_url}{path}", json=data, headers=_headers(), timeout=5)
if r.ok:
return r.json()
log.warning("Memory API %s: %s %s", path, r.status_code, r.text[:200])
except Exception as e:
log.warning("Memory API %s: %s", path, e)
return None
def _get(path: str, params: dict = None) -> Optional[dict]:
_ensure_config()
if not _base_url:
return None
try:
r = requests.get(f"{_base_url}{path}", params=params, headers=_headers(), timeout=5)
if r.ok:
return r.json()
log.warning("Memory API %s: %s %s", path, r.status_code, r.text[:200])
except Exception as e:
log.warning("Memory API %s: %s", path, e)
return None
def get_or_create_session(channel_key: str, source: str = "telegram") -> Optional[str]:
"""Gibt eine aktive Session-ID zurueck oder erstellt eine neue."""
now = time.time()
cached = _active_sessions.get(channel_key)
if cached and (now - cached["last_activity"]) < SESSION_TIMEOUT:
cached["last_activity"] = now
return cached["id"]
result = _post("/sessions", {"source": source, "channel_key": channel_key})
if result and "id" in result:
_active_sessions[channel_key] = {"id": result["id"], "last_activity": now}
return result["id"]
return None
def log_message(session_id: str, role: str, content: str, source: str = None, meta: str = None):
"""Speichert eine Nachricht in der Session."""
if not session_id or not content:
return
data = {"role": role, "content": content}
if source:
data["source"] = source
if meta:
data["meta_json"] = meta
_post(f"/sessions/{session_id}/messages", data)
def get_active_memory() -> list[dict]:
"""Holt alle aktiven Memory-Items fuer den System-Prompt."""
result = _get("/memory", {"status": "active", "limit": 100})
if result and "items" in result:
return result["items"]
return []
def format_memory_for_prompt(items: list[dict]) -> str:
"""Formatiert Memory-Items als Text-Block fuer den System-Prompt."""
if not items:
return ""
lines = ["", "=== GEDAECHTNIS (persistente Fakten) ==="]
for item in items:
prefix = f"[{item['scope']}/{item['kind']}]"
lines.append(f"{prefix} {item['content']}")
lines.append("=== ENDE GEDAECHTNIS ===")
return "\n".join(lines)
def get_session_messages(session_id: str, limit: int = 10) -> list[dict]:
"""Holt die letzten N Messages einer Session fuer den LLM-Kontext."""
if not session_id:
return []
result = _get(f"/sessions/{session_id}/messages", {"limit": limit})
if result and "messages" in result:
return result["messages"]
return []
def get_session_summary(session_id: str, limit: int = 20, topic: str = None) -> str:
"""Kompakte Zusammenfassung der aktuellen Session, optional nach Thema gefiltert."""
if not session_id:
return "Keine aktive Session."
messages = get_session_messages(session_id, limit=limit)
if not messages:
return "Noch keine Nachrichten in dieser Session."
exchanges = []
current_q = None
for msg in messages:
role = msg.get("role", "")
content = (msg.get("content") or "").strip()
if not content:
continue
if role == "user":
current_q = content[:200]
elif role == "assistant" and current_q:
exchanges.append((current_q, content[:200]))
current_q = None
if current_q:
exchanges.append((current_q, None))
if not exchanges:
return "Keine Themen in dieser Session."
if topic:
topic_lower = topic.lower()
matching = []
other_topics = []
for q, a in exchanges:
combined = (q + " " + (a or "")).lower()
if topic_lower in combined:
matching.append((q, a))
else:
other_topics.append(q[:80])
lines = []
if matching:
lines.append("Zum Thema '" + topic + "' (" + str(len(matching)) + " Punkte):")
for i, (q, a) in enumerate(matching, 1):
line = str(i) + ". Frage: " + q
if a:
line += "\n Antwort: " + a
lines.append(line)
else:
lines.append("Zum Thema '" + topic + "' wurde nichts direkt besprochen.")
if other_topics:
lines.append("\nSonstige Themen: " + ", ".join(other_topics))
return "\n".join(lines)
lines = ["Session (" + str(len(exchanges)) + " Themen):"]
for i, (q, a) in enumerate(exchanges, 1):
line = str(i) + ". Frage: " + q
if a:
line += "\n Antwort: " + a
lines.append(line)
return "\n".join(lines)