homelab-brain/homelab-ai-bot/core/prometheus_client.py
root 586bedc0eb prometheus_client: alle Filesysteme anzeigen (ZFS, LVM, NVMe)
Root-Partition + Extra-Datastores (ZFS-Pools, /var/lib/vz etc.)
werden jetzt korrekt in Übersicht und Detail angezeigt.

Made-with: Cursor
2026-03-09 14:04:10 +07:00

281 lines
9.4 KiB
Python

"""Prometheus API client — fragt Host-Metriken ab für den Hausmeister-Bot."""
import requests
from datetime import datetime, timezone, timedelta
PROMETHEUS_URL = "http://10.10.10.1:9090"
WARN_CPU = 80
WARN_MEM = 85
WARN_DISK = 85
def _query(endpoint: str, params: dict) -> dict:
try:
r = requests.get(f"{PROMETHEUS_URL}{endpoint}", params=params, timeout=10)
r.raise_for_status()
return r.json()
except requests.RequestException as e:
return {"error": str(e), "status": "unavailable"}
def instant_query(query: str) -> dict:
return _query("/api/v1/query", {"query": query})
def range_query(query: str, hours: float = 1, step: str = "5m") -> dict:
now = datetime.now(timezone.utc)
start = now - timedelta(hours=hours)
return _query("/api/v1/query_range", {
"query": query,
"start": start.isoformat(),
"end": now.isoformat(),
"step": step,
})
def is_available() -> bool:
data = _query("/api/v1/query", {"query": "up"})
return data.get("status") == "success"
def get_targets() -> list[dict]:
data = _query("/api/v1/targets", {})
if "error" in data:
return [{"error": data["error"]}]
targets = []
for t in data.get("data", {}).get("activeTargets", []):
labels = t.get("labels", {})
targets.append({
"host": labels.get("host", labels.get("instance", "?")),
"location": labels.get("location", ""),
"health": t.get("health", "unknown"),
"job": labels.get("job", ""),
})
return targets
def _by_host(data: dict, metric: str) -> list[dict]:
if data.get("status") != "success":
return []
results = []
for r in data.get("data", {}).get("result", []):
m = r.get("metric", {})
host = m.get("host", m.get("instance", "?"))
val = float(r.get("value", [0, 0])[1])
results.append({"host": host, "value": val, "metric": metric})
return sorted(results, key=lambda x: x["host"])
def get_cpu(host: str = None) -> list[dict]:
filt = f', host="{host}"' if host else ""
q = f'100 - (avg by (host) (rate(node_cpu_seconds_total{{mode="idle"{filt}}}[5m])) * 100)'
return _by_host(instant_query(q), "cpu")
def get_memory(host: str = None) -> list[dict]:
filt = f', host="{host}"' if host else ""
q = f'(1 - node_memory_MemAvailable_bytes{{host!=""{filt}}} / node_memory_MemTotal_bytes{{host!=""{filt}}}) * 100'
return _by_host(instant_query(q), "mem")
def get_disk(host: str = None) -> list[dict]:
filt = f', host="{host}"' if host else ""
q = f'(1 - node_filesystem_avail_bytes{{mountpoint="/", host!=""{filt}}} / node_filesystem_size_bytes{{mountpoint="/", host!=""{filt}}}) * 100'
return _by_host(instant_query(q), "disk")
def get_disk_bytes(host: str = None) -> list[dict]:
"""Returns available and total bytes for root partition per host."""
filt = f', host="{host}"' if host else ""
avail = _by_host(instant_query(
f'node_filesystem_avail_bytes{{mountpoint="/", host!=""{filt}}}'), "avail_bytes")
total = _by_host(instant_query(
f'node_filesystem_size_bytes{{mountpoint="/", host!=""{filt}}}'), "total_bytes")
total_map = {r["host"]: r["value"] for r in total}
result = []
for a in avail:
t = total_map.get(a["host"], 0)
result.append({
"host": a["host"],
"avail_gb": a["value"] / (1024**3),
"total_gb": t / (1024**3),
})
return result
def get_all_filesystems(host: str = None) -> list[dict]:
"""All non-trivial filesystems (skips tmpfs, dev, run, boot)."""
filt = f', host="{host}"' if host else ""
skip = "tmpfs|devtmpfs|efivarfs"
q_avail = f'node_filesystem_avail_bytes{{fstype!~"{skip}", host!=""{filt}}}'
q_total = f'node_filesystem_size_bytes{{fstype!~"{skip}", host!=""{filt}}}'
data_a = instant_query(q_avail)
data_t = instant_query(q_total)
if data_a.get("status") != "success":
return []
total_map = {}
for r in data_t.get("data", {}).get("result", []):
m = r.get("metric", {})
key = (m.get("host", m.get("instance", "?")), m.get("mountpoint", ""))
total_map[key] = float(r.get("value", [0, 0])[1])
results = []
for r in data_a.get("data", {}).get("result", []):
m = r.get("metric", {})
h = m.get("host", m.get("instance", "?"))
mp = m.get("mountpoint", "")
if mp in ("/boot/efi", "/boot"):
continue
avail = float(r.get("value", [0, 0])[1])
total = total_map.get((h, mp), 0)
if total < 500 * 1024 * 1024:
continue
used_pct = ((total - avail) / total * 100) if total > 0 else 0
results.append({
"host": h,
"mountpoint": mp,
"total_gb": total / (1024**3),
"avail_gb": avail / (1024**3),
"used_pct": used_pct,
"device": m.get("device", ""),
})
return sorted(results, key=lambda x: (x["host"], x["mountpoint"]))
def get_uptime(host: str = None) -> list[dict]:
filt = f', host="{host}"' if host else ""
q = f'node_time_seconds{{host!=""{filt}}} - node_boot_time_seconds{{host!=""{filt}}}'
return _by_host(instant_query(q), "uptime_sec")
def get_load(host: str = None) -> list[dict]:
filt = f', host="{host}"' if host else ""
q = f'node_load5{{host!=""{filt}}}'
return _by_host(instant_query(q), "load5")
def get_warnings() -> list[str]:
"""Return list of warning strings for hosts exceeding thresholds."""
warnings = []
for r in get_cpu():
if r["value"] >= WARN_CPU:
warnings.append(f"🔴 {r['host']}: CPU {r['value']:.0f}%")
for r in get_memory():
if r["value"] >= WARN_MEM:
warnings.append(f"🔴 {r['host']}: RAM {r['value']:.0f}%")
for r in get_disk():
if r["value"] >= WARN_DISK:
warnings.append(f"🔴 {r['host']}: Disk {r['value']:.0f}%")
return warnings
def _fmt_uptime(seconds: float) -> str:
days = int(seconds // 86400)
hours = int((seconds % 86400) // 3600)
if days > 0:
return f"{days}d {hours}h"
return f"{hours}h"
def format_overview() -> str:
"""Kompakte Übersicht aller Hosts — für den Bot."""
if not is_available():
return "⚠️ Prometheus nicht erreichbar."
cpu = {r["host"]: r["value"] for r in get_cpu()}
mem = {r["host"]: r["value"] for r in get_memory()}
disk = {r["host"]: r["value"] for r in get_disk()}
disk_gb = {r["host"]: r for r in get_disk_bytes()}
uptime = {r["host"]: r["value"] for r in get_uptime()}
load = {r["host"]: r["value"] for r in get_load()}
all_fs = get_all_filesystems()
extra_fs = {}
for fs in all_fs:
if fs["mountpoint"] != "/":
extra_fs.setdefault(fs["host"], []).append(fs)
hosts = sorted(set(list(cpu.keys()) + list(mem.keys()) + list(disk.keys())))
if not hosts:
return "Keine Prometheus-Daten verfügbar."
lines = [f"📊 Server-Metriken ({len(hosts)} Hosts)\n"]
for h in hosts:
c = cpu.get(h, -1)
m = mem.get(h, -1)
d = disk.get(h, -1)
dinfo = disk_gb.get(h, {})
u = uptime.get(h, 0)
l5 = load.get(h, -1)
warn = ""
if c >= WARN_CPU or m >= WARN_MEM or d >= WARN_DISK:
warn = " ⚠️"
disk_str = f"{d:.0f}%" if d >= 0 else "n/a"
if dinfo:
disk_str += f" ({dinfo.get('avail_gb', 0):.0f}/{dinfo.get('total_gb', 0):.0f} GB frei)"
emoji = "🟢" if c >= 0 else "🟡"
cpu_str = f"{c:.0f}%" if c >= 0 else "n/a"
mem_str = f"{m:.0f}%" if m >= 0 else "n/a"
load_str = f"{l5:.1f}" if l5 >= 0 else "n/a"
extra_line = ""
if h in extra_fs:
parts = []
for efs in extra_fs[h]:
parts.append(f"{efs['mountpoint']}: {efs['avail_gb']:.0f}/{efs['total_gb']:.0f} GB frei")
extra_line = "\n Storage: " + ", ".join(parts)
lines.append(
f"{emoji} {h}{warn}\n"
f" CPU: {cpu_str} RAM: {mem_str} Disk: {disk_str}\n"
f" Load5: {load_str} Uptime: {_fmt_uptime(u)}{extra_line}"
)
warnings = get_warnings()
if warnings:
lines.append("\n⚠️ WARNUNGEN:")
lines.extend(warnings)
else:
lines.append("\n✅ Alle Werte im Normalbereich.")
return "\n".join(lines)
def format_host_detail(host: str) -> str:
"""Detail-Metriken für einen einzelnen Host."""
if not is_available():
return "⚠️ Prometheus nicht erreichbar."
cpu = get_cpu(host)
mem = get_memory(host)
filesystems = get_all_filesystems(host)
uptime = get_uptime(host)
load = get_load(host)
if not cpu and not mem:
return f"Keine Metriken für '{host}' gefunden."
lines = [f"📊 {host} — Detail\n"]
if cpu:
lines.append(f"CPU: {cpu[0]['value']:.1f}%")
if mem:
lines.append(f"RAM: {mem[0]['value']:.1f}%")
if filesystems:
lines.append("Speicher:")
for fs in filesystems:
warn = " ⚠️" if fs["used_pct"] >= WARN_DISK else ""
lines.append(
f" {fs['mountpoint']}: {fs['used_pct']:.0f}% belegt "
f"({fs['avail_gb']:.0f} / {fs['total_gb']:.0f} GB frei){warn}"
)
if load:
lines.append(f"Load (5m): {load[0]['value']:.2f}")
if uptime:
lines.append(f"Uptime: {_fmt_uptime(uptime[0]['value'])}")
return "\n".join(lines)