homelab-brain/homelab-ai-bot/context.py

159 lines
5 KiB
Python

"""Intelligente Kontext-Sammlung für den Hausmeister-Bot.
Entscheidet anhand der Frage welche Datenquellen abgefragt werden."""
import sys
import os
import re
sys.path.insert(0, os.path.dirname(__file__))
from core import config, loki_client, proxmox_client
def _load_config():
return config.parse_config()
def _get_tokens(cfg):
tokens = {}
tn = cfg.raw.get("PVE_TOKEN_HETZNER_NAME", "")
tv = cfg.raw.get("PVE_TOKEN_HETZNER_VALUE", "")
if tn and tv:
tokens["pve-hetzner"] = {"name": tn, "value": tv}
return tokens
def _get_passwords(cfg):
return {
"pve-hetzner": cfg.passwords.get("hetzner", ""),
"pve1": cfg.passwords.get("default", ""),
"pve3": cfg.passwords.get("default", ""),
"default": cfg.passwords.get("default", ""),
}
def gather_status() -> str:
"""Komplett-Status aller Container für /status."""
cfg = _load_config()
containers = proxmox_client.get_all_containers(
_get_passwords(cfg), _get_tokens(cfg)
)
return proxmox_client.format_containers(containers)
def gather_errors(hours: float = 2) -> str:
"""Aktuelle Fehler aus Loki für /errors."""
entries = loki_client.get_errors(hours=hours, limit=30)
return loki_client.format_logs(entries)
def gather_container_status(query: str) -> str:
"""Status eines einzelnen Containers."""
cfg = _load_config()
vmid = None
name = None
m = re.search(r'\b(\d{3})\b', query)
if m:
vmid = int(m.group(1))
else:
name = query.strip()
ct = config.get_container(cfg, vmid=vmid, name=name)
if not ct:
return f"Container nicht gefunden: {query}"
host_ip = proxmox_client.PROXMOX_HOSTS.get(ct.host)
if not host_ip:
return f"Host nicht erreichbar: {ct.host}"
token = _get_tokens(cfg).get(ct.host, {})
pw = _get_passwords(cfg).get(ct.host, "")
try:
client = proxmox_client.ProxmoxClient(
host_ip, password=pw,
token_name=token.get("name", ""),
token_value=token.get("value", ""),
)
status = client.get_container_status(ct.vmid)
except Exception as e:
return f"Proxmox-Fehler: {e}"
mem_mb = status.get("mem", 0) // (1024 * 1024)
maxmem_mb = status.get("maxmem", 0) // (1024 * 1024)
uptime_h = status.get("uptime", 0) // 3600
return (
f"CT {ct.vmid}{ct.name}\n"
f"Host: {ct.host}\n"
f"Status: {status.get('status', '?')}\n"
f"RAM: {mem_mb}/{maxmem_mb} MB\n"
f"CPU: {status.get('cpus', '?')} Kerne\n"
f"Uptime: {uptime_h}h\n"
f"Tailscale: {ct.tailscale_ip or ''}\n"
f"Dienste: {ct.services}"
)
def gather_logs(container: str, hours: float = 1) -> str:
"""Logs eines Containers aus Loki."""
entries = loki_client.query_logs(
f'{{host="{container}"}}', hours=hours, limit=20
)
return loki_client.format_logs(entries)
def gather_health(container: str) -> str:
"""Health-Check eines Containers."""
health = loki_client.get_health(container, hours=24)
status_emoji = {"healthy": "", "warning": "⚠️", "critical": "🔴"}.get(
health.get("status", ""), ""
)
return (
f"{status_emoji} {health.get('host', container)}\n"
f"Status: {health.get('status', '?')}\n"
f"Fehler (24h): {health.get('errors_last_{hours}h', '?')}\n"
f"Sendet Logs: {'ja' if health.get('sending_logs') else 'nein'}"
)
def gather_silence() -> str:
"""Welche Hosts senden keine Logs?"""
silent = loki_client.check_silence(minutes=35)
if not silent:
return "✅ Alle Hosts senden Logs."
if silent and "error" in silent[0]:
return f"Fehler: {silent[0]['error']}"
lines = ["⚠️ Stille Hosts (keine Logs seit 35+ Min):\n"]
for s in silent:
lines.append(f"{s['host']}")
return "\n".join(lines)
def gather_context_for_question(question: str) -> str:
"""Sammelt relevanten Kontext für eine Freitext-Frage."""
q = question.lower()
parts = []
if any(w in q for w in ["fehler", "error", "problem", "kaputt", "down"]):
parts.append("=== Aktuelle Fehler ===\n" + gather_errors(hours=2))
if any(w in q for w in ["status", "läuft", "container", "übersicht", "alles"]):
parts.append("=== Container Status ===\n" + gather_status())
if any(w in q for w in ["still", "silence", "stumm", "logs"]):
parts.append("=== Stille Hosts ===\n" + gather_silence())
ct_match = re.search(r'\bct[- ]?(\d{3})\b', q)
if ct_match:
parts.append(f"=== CT {ct_match.group(1)} ===\n" + gather_container_status(ct_match.group(1)))
for name in ["wordpress", "rss", "seafile", "forgejo", "portainer",
"fuenfvoracht", "redax", "flugscanner", "edelmetall"]:
if name in q:
parts.append(f"=== {name} ===\n" + gather_container_status(name))
if not parts:
parts.append("=== Container Status ===\n" + gather_status())
parts.append("=== Aktuelle Fehler ===\n" + gather_errors(hours=1))
return "\n\n".join(parts)