"""KI-gestützte Systemvorhersage — analysiert Logs, Metriken und Container-Status.""" import json import requests from datetime import datetime, timezone, timedelta from core import prometheus_client, loki_client, config from core import proxmox_client OLLAMA_URL = "http://100.84.255.83:11434" FORECAST_MODEL = "qwen3:30b-a3b" TOOLS = [ { "type": "function", "function": { "name": "get_health_forecast", "description": ( "KI-gestützte Systemvorhersage für das Homelab. Analysiert Fehler-Logs, " "Disk-Trends, CPU/RAM-Auslastung und Container-Status. Gibt eine Prognose " "aus, ob sich Probleme anbahnen — z.B. voller Speicher, häufige Abstürze, " "steigende Fehlerquoten. Trigger: 'vorhersage', 'was bahnt sich an', " "'prognose', 'health forecast', 'system check', 'systemstatus'." ), "parameters": {"type": "object", "properties": {}, "required": []}, }, }, ] def _gather_prometheus() -> dict: result = {} try: result["warnings"] = prometheus_client.get_warnings() disk = prometheus_client.get_disk() result["disk_current"] = [ {"host": r["host"], "used_pct": round(r["value"], 1)} for r in disk ] trend = prometheus_client.range_query( 'max by (host) ((1 - node_filesystem_avail_bytes{mountpoint="/"} ' '/ node_filesystem_size_bytes{mountpoint="/"}) * 100)', hours=24, step="2h", ) trends = {} if trend.get("status") == "success": for r in trend.get("data", {}).get("result", []): h = r.get("metric", {}).get("host", "?") vals = [float(v[1]) for v in r.get("values", []) if v[1] != "NaN"] if len(vals) >= 2: delta = vals[-1] - vals[0] trends[h] = { "start_pct": round(vals[0], 1), "end_pct": round(vals[-1], 1), "delta_24h": round(delta, 2), } result["disk_trend_24h"] = trends mem = prometheus_client.get_memory() result["memory"] = [ {"host": r["host"], "used_pct": round(r["value"], 1)} for r in mem ] load = prometheus_client.get_load() result["load5"] = [ {"host": r["host"], "load5": round(r["value"], 2)} for r in load ] except Exception as e: result["prometheus_error"] = str(e) return result def _gather_loki() -> dict: result = {} try: hosts = loki_client.get_labels() error_counts = {} for host in hosts[:20]: errors = loki_client.get_errors(container=host, hours=24, limit=300) count = 0 if (len(errors) == 1 and "error" in errors[0]) else len(errors) if count > 0: error_counts[host] = count result["errors_24h"] = error_counts silent = loki_client.check_silence(minutes=60) result["silent_hosts"] = [s["host"] for s in silent if "host" in s] except Exception as e: result["loki_error"] = str(e) return result def _gather_proxmox() -> dict: result = {} try: cfg = config.parse_config() passwords = {} tokens = {} for pve_host in cfg.proxmox_hosts: name = pve_host.get("name", "") pw = pve_host.get("password", "") tok_name = pve_host.get("token_name", "") tok_val = pve_host.get("token_value", "") if pw: passwords[name] = pw if tok_name and tok_val: tokens[name] = {"name": tok_name, "value": tok_val} containers = proxmox_client.get_all_containers(passwords=passwords, tokens=tokens) stopped = [ {"id": c.get("vmid"), "name": c.get("name", "?")} for c in containers if c.get("status") == "stopped" and "error" not in c ] running = len([c for c in containers if c.get("status") == "running"]) result["total"] = len(containers) result["running"] = running result["stopped"] = stopped except Exception as e: result["proxmox_error"] = str(e) return result def _call_analysis_llm(data_summary: str) -> str: now_str = datetime.now().strftime("%d.%m.%Y %H:%M") prompt = ( f"Du bist ein Homelab-Monitoring-Experte. Heute ist der {now_str}.\n" "Analysiere die folgenden System-Rohdaten und erstelle eine kompakte Prognose.\n\n" "REGELN:\n" "- Nur echte Auffälligkeiten nennen (nicht jede normale Metrik)\n" "- Disk-Delta > 2% in 24h = Warnung\n" "- Disk > 80% = kritisch\n" "- RAM > 85% = Warnung\n" "- Fehler > 50 in 24h für einen Host = Warnung\n" "- Gestoppte Container = prüfen ob OK\n" "- Wenn alles normal: kurze Entwarnung genügt\n" "- Max 12 Zeilen, Emojis erlaubt, auf Deutsch\n" "- Klare Handlungsempfehlung wenn nötig\n\n" f"System-Daten:\n{data_summary}\n\n" "Prognose:" ) try: r = requests.post( f"{OLLAMA_URL}/api/chat", json={ "model": FORECAST_MODEL, "messages": [{"role": "user", "content": prompt + " /no_think"}], "stream": False, "options": {"num_predict": 700, "temperature": 0.3}, }, timeout=180, ) r.raise_for_status() content = r.json().get("message", {}).get("content", "") # Strip tags if present import re content = re.sub(r".*?", "", content, flags=re.DOTALL).strip() return content or "LLM-Analyse ergab kein Ergebnis." except Exception as e: return f"⚠️ LLM-Analyse nicht verfügbar: {e}" def handle_get_health_forecast(**kw) -> str: prom = _gather_prometheus() loki = _gather_loki() pve = _gather_proxmox() summary_parts = [] if prom.get("warnings"): summary_parts.append("AKTIVE WARNUNGEN: " + ", ".join(prom["warnings"])) else: summary_parts.append("Prometheus-Warnungen: keine") if prom.get("disk_current"): lines = [f"{d['host']}: {d['used_pct']}%" for d in prom["disk_current"]] summary_parts.append("Disk-Nutzung aktuell:\n " + "\n ".join(lines)) if prom.get("disk_trend_24h"): trend_lines = [] for h, t in prom["disk_trend_24h"].items(): if abs(t["delta_24h"]) > 0.5: trend_lines.append( f" {h}: {t['start_pct']}% → {t['end_pct']}% (Δ {t['delta_24h']:+.1f}% in 24h)" ) if trend_lines: summary_parts.append("Disk-Trends (letzte 24h):\n" + "\n".join(trend_lines)) if prom.get("memory"): high_mem = [m for m in prom["memory"] if m["used_pct"] > 70] if high_mem: mem_lines = [f"{m['host']}: {m['used_pct']}%" for m in high_mem] summary_parts.append("RAM > 70%:\n " + "\n ".join(mem_lines)) if prom.get("load5"): high_load = [l for l in prom["load5"] if l["load5"] > 2.0] if high_load: load_lines = [f"{l['host']}: load5={l['load5']}" for l in high_load] summary_parts.append("Hohe Last:\n " + "\n ".join(load_lines)) errors = loki.get("errors_24h", {}) if errors: err_lines = [f"{h}: {c} Fehler" for h, c in sorted(errors.items(), key=lambda x: -x[1])[:10]] summary_parts.append("Log-Fehler letzte 24h:\n " + "\n ".join(err_lines)) else: summary_parts.append("Log-Fehler letzte 24h: keine") if loki.get("silent_hosts"): summary_parts.append("Stille Hosts (>60 min kein Log): " + ", ".join(loki["silent_hosts"])) if pve.get("stopped"): stopped_names = [f"CT{c['id']} {c['name']}" for c in pve["stopped"]] summary_parts.append("Gestoppte Container: " + ", ".join(stopped_names)) elif "proxmox_error" not in pve: summary_parts.append(f"Proxmox: {pve.get('running', 0)}/{pve.get('total', 0)} Container laufen") data_summary = "\n\n".join(summary_parts) analysis = _call_analysis_llm(data_summary) header = f"🔭 *Systemvorhersage* ({datetime.now().strftime('%d.%m.%Y %H:%M')})\n\n" return header + analysis HANDLERS = { "get_health_forecast": handle_get_health_forecast, }