"""Client fuer den Memory-Service (CT 117). Stellt Session-Management und Memory-Zugriff bereit. Kein Import von Bot- oder LLM-Logik — reiner HTTP-Client. """ import logging import time import uuid from typing import Optional import requests from core import config log = logging.getLogger("memory_client") _cfg = None _base_url = None _token = None SESSION_TIMEOUT = 1800 # 30 Minuten Inaktivitaet = neue Session _active_sessions: dict[str, dict] = {} # channel_key -> {id, last_activity} def _ensure_config(): global _cfg, _base_url, _token if _base_url: return _cfg = config.parse_config() _base_url = _cfg.raw.get("MEMORY_API_URL", "").rstrip("/") _token = _cfg.raw.get("MEMORY_API_TOKEN", "") if not _base_url or not _token: log.warning("MEMORY_API_URL oder MEMORY_API_TOKEN nicht in homelab.conf") def _headers(): return {"Authorization": f"Bearer {_token}", "Content-Type": "application/json"} def _post(path: str, data: dict) -> Optional[dict]: _ensure_config() if not _base_url: return None try: r = requests.post(f"{_base_url}{path}", json=data, headers=_headers(), timeout=5) if r.ok: return r.json() log.warning("Memory API %s: %s %s", path, r.status_code, r.text[:200]) except Exception as e: log.warning("Memory API %s: %s", path, e) return None def _get(path: str, params: dict = None) -> Optional[dict]: _ensure_config() if not _base_url: return None try: r = requests.get(f"{_base_url}{path}", params=params, headers=_headers(), timeout=5) if r.ok: return r.json() log.warning("Memory API %s: %s %s", path, r.status_code, r.text[:200]) except Exception as e: log.warning("Memory API %s: %s", path, e) return None def get_or_create_session(channel_key: str, source: str = "telegram") -> Optional[str]: """Gibt eine aktive Session-ID zurueck oder erstellt eine neue.""" now = time.time() cached = _active_sessions.get(channel_key) if cached and (now - cached["last_activity"]) < SESSION_TIMEOUT: cached["last_activity"] = now return cached["id"] result = _post("/sessions", {"source": source, "channel_key": channel_key}) if result and "id" in result: _active_sessions[channel_key] = {"id": result["id"], "last_activity": now} return result["id"] return None def log_message(session_id: str, role: str, content: str, source: str = None, meta: str = None): """Speichert eine Nachricht in der Session.""" if not session_id or not content: return data = {"role": role, "content": content} if source: data["source"] = source if meta: data["meta_json"] = meta _post(f"/sessions/{session_id}/messages", data) def get_active_memory() -> list[dict]: """Holt alle aktiven Memory-Items fuer den System-Prompt.""" result = _get("/memory", {"status": "active", "limit": 100}) if result and "items" in result: return result["items"] return [] def format_memory_for_prompt(items: list[dict]) -> str: """Formatiert Memory-Items als Text-Block fuer den System-Prompt.""" if not items: return "" lines = ["", "=== GEDAECHTNIS (persistente Fakten) ==="] for item in items: prefix = f"[{item['scope']}/{item['kind']}]" lines.append(f"{prefix} {item['content']}") lines.append("=== ENDE GEDAECHTNIS ===") return "\n".join(lines)