"""Client fuer den Memory-Service (CT 117). Stellt Session-Management und Memory-Zugriff bereit. Kein Import von Bot- oder LLM-Logik — reiner HTTP-Client. """ import logging import re import time import uuid from datetime import datetime, timedelta from typing import Optional import requests from core import config log = logging.getLogger("memory_client") _cfg = None _base_url = None _token = None SESSION_TIMEOUT = 1800 # 30 Minuten Inaktivitaet = neue Session _active_sessions: dict[str, dict] = {} # channel_key -> {id, last_activity} TOPIC_ALIASES = { "container": ["containers", "lxc", "lxc-container", "lxc container", "ct"], "server": ["servers", "proxmox", "pve", "host", "hosts"], "backup": ["backups", "pbs", "snapshot", "snapshots"], "projekt": ["projekte", "project", "projects"], "mail": ["mails", "email", "e-mail"], } def _normalize(text: str) -> str: """Normalisiert Text fuer robustes Matching.""" t = text.lower().strip() for old, new in [("ä", "ae"), ("ö", "oe"), ("ü", "ue"), ("ß", "ss")]: t = t.replace(old, new) t = t.replace("-", " ").replace("_", " ") return t def _topic_matches(text: str, topic: str) -> bool: """Prueft ob ein Topic im normalisierten Text vorkommt (inkl. Aliase).""" norm_text = _normalize(text) norm_topic = _normalize(topic) if norm_topic in norm_text: return True aliases = TOPIC_ALIASES.get(norm_topic, []) for alias in aliases: if _normalize(alias) in norm_text: return True for base, alias_list in TOPIC_ALIASES.items(): if norm_topic in [_normalize(a) for a in alias_list] or norm_topic == base: if base in norm_text: return True for a in alias_list: if _normalize(a) in norm_text: return True return False def _ensure_config(): global _cfg, _base_url, _token if _base_url: return _cfg = config.parse_config() _base_url = _cfg.raw.get("MEMORY_API_URL", "").rstrip("/") _token = _cfg.raw.get("MEMORY_API_TOKEN", "") if not _base_url or not _token: log.warning("MEMORY_API_URL oder MEMORY_API_TOKEN nicht in homelab.conf") def _headers(): return {"Authorization": f"Bearer {_token}", "Content-Type": "application/json"} def _post(path: str, data: dict) -> Optional[dict]: _ensure_config() if not _base_url: return None try: r = requests.post(f"{_base_url}{path}", json=data, headers=_headers(), timeout=5) if r.ok: return r.json() log.warning("Memory API %s: %s %s", path, r.status_code, r.text[:200]) except Exception as e: log.warning("Memory API %s: %s", path, e) return None def _get(path: str, params: dict = None) -> Optional[dict]: _ensure_config() if not _base_url: return None try: r = requests.get(f"{_base_url}{path}", params=params, headers=_headers(), timeout=5) if r.ok: return r.json() log.warning("Memory API %s: %s %s", path, r.status_code, r.text[:200]) except Exception as e: log.warning("Memory API %s: %s", path, e) return None def _patch(path: str, data: dict) -> Optional[dict]: _ensure_config() if not _base_url: return None try: r = requests.patch(f"{_base_url}{path}", json=data, headers=_headers(), timeout=5) if r.ok: return r.json() log.warning("Memory API PATCH %s: %s %s", path, r.status_code, r.text[:200]) except Exception as e: log.warning("Memory API PATCH %s: %s", path, e) return None def get_or_create_session(channel_key: str, source: str = "telegram") -> Optional[str]: """Gibt eine aktive Session-ID zurueck oder erstellt eine neue. Ueberlebt Bot-Restarts durch API-Lookup der letzten Session.""" now = time.time() cached = _active_sessions.get(channel_key) if cached and (now - cached["last_activity"]) < SESSION_TIMEOUT: cached["last_activity"] = now return cached["id"] # Nach Restart: letzte Session vom API holen latest = _get("/sessions/latest", {"channel_key": channel_key}) if latest and latest.get("id"): last_at = latest.get("last_activity_at", 0) if (now - last_at) < SESSION_TIMEOUT: _active_sessions[channel_key] = {"id": latest["id"], "last_activity": now} _patch(f"/sessions/{latest['id']}", {}) log.info("Session wiederhergestellt nach Restart: %s", latest["id"][:12]) return latest["id"] result = _post("/sessions", {"source": source, "channel_key": channel_key}) if result and "id" in result: _active_sessions[channel_key] = {"id": result["id"], "last_activity": now} return result["id"] return None def log_message(session_id: str, role: str, content: str, source: str = None, meta: str = None): """Speichert eine Nachricht in der Session.""" if not session_id or not content: return data = {"role": role, "content": content} if source: data["source"] = source if meta: data["meta_json"] = meta _post(f"/sessions/{session_id}/messages", data) DEFAULT_TEMP_DAYS = 14 _WEEKDAYS_DE = { "montag": 0, "dienstag": 1, "mittwoch": 2, "donnerstag": 3, "freitag": 4, "samstag": 5, "sonntag": 6, } def parse_expires_from_text(text: str) -> Optional[int]: """Versucht aus deutschem Text ein absolutes Ablaufdatum (epoch) zu extrahieren. Gibt None zurueck wenn keine belastbare Zeit erkannt wird.""" t = text.lower().strip() now = datetime.now() if "morgen" in t and "uebermorgen" not in t and "übermorgen" not in t: dt = now + timedelta(days=1) return int(dt.replace(hour=23, minute=59, second=59).timestamp()) if "uebermorgen" in t or "übermorgen" in t: dt = now + timedelta(days=2) return int(dt.replace(hour=23, minute=59, second=59).timestamp()) for day_name, day_num in _WEEKDAYS_DE.items(): if day_name in t: days_ahead = (day_num - now.weekday()) % 7 if days_ahead == 0: days_ahead = 7 dt = now + timedelta(days=days_ahead) return int(dt.replace(hour=23, minute=59, second=59).timestamp()) if "naechste woche" in t or "nächste woche" in t or "naechster woche" in t or "nächster woche" in t: dt = now + timedelta(days=7) return int(dt.replace(hour=23, minute=59, second=59).timestamp()) if "naechsten monat" in t or "nächsten monat" in t: dt = now + timedelta(days=30) return int(dt.replace(hour=23, minute=59, second=59).timestamp()) if "heute" in t: return int(now.replace(hour=23, minute=59, second=59).timestamp()) m = re.search(r"(\d{1,2})\.(\d{1,2})\.(\d{4})", t) if m: try: dt = datetime(int(m.group(3)), int(m.group(2)), int(m.group(1)), 23, 59, 59) return int(dt.timestamp()) except ValueError: pass m = re.search(r"(\d{4})-(\d{2})-(\d{2})", t) if m: try: dt = datetime(int(m.group(1)), int(m.group(2)), int(m.group(3)), 23, 59, 59) return int(dt.timestamp()) except ValueError: pass return None def default_expires() -> int: """Konservatives Default-Ablaufdatum fuer temporaere Items ohne erkannte Zeitangabe.""" return int((datetime.now() + timedelta(days=DEFAULT_TEMP_DAYS)).timestamp()) def get_candidates() -> list[dict]: """Holt alle offenen Memory-Kandidaten.""" result = _get("/memory", {"status": "candidate", "limit": 20}) if result and "items" in result: return result["items"] return [] def activate_candidate(item_id: int, memory_type: str = "permanent", expires_at: int = None) -> bool: """Setzt einen Kandidaten auf aktiv mit Typ.""" data = {"status": "active", "memory_type": memory_type} if expires_at: data["expires_at"] = expires_at result = _patch(f"/memory/{item_id}", data) return bool(result and result.get("ok")) def delete_candidate(item_id: int) -> bool: """Loescht einen Kandidaten.""" _ensure_config() if not _base_url: return False try: r = requests.delete(f"{_base_url}/memory/{item_id}", headers=_headers(), timeout=5) return r.ok except Exception: return False def get_active_memory() -> list[dict]: """Holt alle aktiven Memory-Items (ohne abgelaufene). Fuer /memory-Befehl.""" result = _get("/memory", {"status": "active", "limit": 100}) if not result or "items" not in result: return [] now_ts = int(time.time()) items = [] for item in result["items"]: exp = item.get("expires_at") if exp and exp < now_ts: _patch(f"/memory/{item['id']}", {"status": "archived"}) continue items.append(item) return items def get_relevant_memory(query: str, top_k: int = 10) -> list[dict]: """RAG-Suche: Holt die relevantesten Memory-Items per Vektor-Aehnlichkeit.""" result = _get("/memory/search", {"q": query, "top_k": top_k, "status": "active"}) if not result or "items" not in result: return get_active_memory()[:top_k] return result["items"] def format_memory_for_prompt(items: list[dict]) -> str: """Formatiert Memory-Items als Text-Block fuer den System-Prompt.""" if not items: return "" TYPE_ICON = {"fact": "📌", "preference": "⭐", "relationship": "👤", "plan": "📅", "temporary": "🕒", "uncertain": "❓"} lines = ["", "=== GEDAECHTNIS (relevante Fakten) ==="] for item in items: mtype = item.get("memory_type", "fact") icon = TYPE_ICON.get(mtype, "•") conf = item.get("confidence", "high") exp = item.get("expires_at") parts = [f"{icon} [{mtype}]"] if conf != "high": parts.append(f"({conf})") parts.append(item["content"]) if exp: parts.append(f"(bis {datetime.fromtimestamp(exp).strftime('%d.%m.%Y')})") lines.append(" ".join(parts)) lines.append("=== ENDE GEDAECHTNIS ===") return "\n".join(lines) def get_session_messages(session_id: str, limit: int = 10) -> list[dict]: """Holt die letzten N Messages einer Session fuer den LLM-Kontext.""" if not session_id: return [] result = _get(f"/sessions/{session_id}/messages", {"limit": limit}) if result and "messages" in result: return result["messages"] return [] def get_session_summary(session_id: str, limit: int = 20, topic: str = None) -> str: """Kompakte Zusammenfassung der aktuellen Session, optional nach Thema gefiltert.""" if not session_id: return "Keine aktive Session." messages = get_session_messages(session_id, limit=limit) if not messages: return "Noch keine Nachrichten in dieser Session." exchanges = [] current_q = None for msg in messages: role = msg.get("role", "") content = (msg.get("content") or "").strip() if not content: continue if role == "user": current_q = content[:200] elif role == "assistant" and current_q: exchanges.append((current_q, content[:200])) current_q = None if current_q: exchanges.append((current_q, None)) if not exchanges: return "Keine Themen in dieser Session." if topic: matching_pairs = [] matching_singles = [] other_topics = [] for q, a in exchanges: combined = q + " " + (a or "") if _topic_matches(combined, topic): matching_pairs.append((q, a)) else: other_topics.append(q[:80]) # Fallback: einzelne Messages pruefen (falls Pairing unvollstaendig) if not matching_pairs: for msg in messages: content = (msg.get("content") or "").strip() if content and _topic_matches(content, topic): role = msg.get("role", "?") matching_singles.append((role, content[:200])) lines = [] if matching_pairs: lines.append("Zum Thema '" + topic + "' (" + str(len(matching_pairs)) + " Treffer):") for i, (q, a) in enumerate(matching_pairs, 1): line = str(i) + ". Frage: " + q if a: line += "\n Antwort: " + a lines.append(line) elif matching_singles: lines.append("Zum Thema '" + topic + "' (" + str(len(matching_singles)) + " relevante Messages):") for role, content in matching_singles: lines.append(" [" + role + "] " + content) else: lines.append("Zum Thema '" + topic + "' wurde in dieser Session nichts besprochen.") if other_topics: lines.append("\nSonstige Themen der Session: " + ", ".join(other_topics)) return "\n".join(lines) lines = ["Session (" + str(len(exchanges)) + " Themen):"] for i, (q, a) in enumerate(exchanges, 1): line = str(i) + ". Frage: " + q if a: line += "\n Antwort: " + a lines.append(line) return "\n".join(lines)