homelab-brain/homelab-ai-bot/core/loki_client.py
Homelab Cursor 43ee006f15 monitoring: error-rate alerts, silence proxmox filter, periodic loop
- loki_client.py: check_error_rate() mit host-spezifischen Schwellen (rss-manager:15, wordpress:10, default:25)
- monitor.py: Error-Rate-Check in check_all(), Silence-Check filtert gestoppte Container via Proxmox-Status
- telegram_bot.py: periodischer _monitor_loop alle 10 Min
- Schliesst #30 und #31
2026-03-24 13:30:58 +01:00

204 lines
7.3 KiB
Python

"""Loki API client for querying centralized logs."""
import requests
from datetime import datetime, timezone, timedelta
LOKI_URL = "http://100.109.206.43:3100"
def _query(endpoint: str, params: dict, base_url: str = None) -> dict:
url = f"{base_url or LOKI_URL}{endpoint}"
try:
r = requests.get(url, params=params, timeout=10)
r.raise_for_status()
return r.json()
except requests.RequestException as e:
return {"error": str(e)}
def _ns(dt: datetime) -> str:
return str(int(dt.timestamp() * 1e9))
def query_logs(query: str, hours: float = 1, limit: int = 100) -> list[dict]:
"""Run a LogQL query and return log entries."""
now = datetime.now(timezone.utc)
start = now - timedelta(hours=hours)
data = _query("/loki/api/v1/query_range", {
"query": query,
"start": _ns(start),
"end": _ns(now),
"limit": limit,
"direction": "backward",
})
if "error" in data:
return [{"error": data["error"]}]
entries = []
for stream in data.get("data", {}).get("result", []):
labels = stream.get("stream", {})
for ts, line in stream.get("values", []):
entries.append({
"timestamp": ts,
"host": labels.get("host", labels.get("job", "unknown")),
"line": line,
})
return entries
def get_errors(container: str = None, hours: float = 1, limit: int = 200) -> list[dict]:
"""Get error-level logs, optionally filtered by container hostname."""
if container:
q = f'{{host="{container}"}} |~ "(?i)(error|fatal|panic|traceback|exception)" !~ "caller=metrics|query_hash=|executing query|scheduler_processor|Aborted connection|systemd-networkd-wait-online|context canceled|AH01630: client denied|flag evaluation succeeded|pluginsAutoUpdate"'
else:
q = '{job=~".+"} |~ "(?i)(error|fatal|panic|traceback|exception)" !~ "caller=metrics|query_hash=|executing query|scheduler_processor|Aborted connection|systemd-networkd-wait-online|context canceled|AH01630: client denied|flag evaluation succeeded|pluginsAutoUpdate"'
return query_logs(q, hours=hours, limit=limit)
def get_labels() -> list[str]:
"""Get all available label values for 'host'."""
data = _query("/loki/api/v1/label/host/values", {})
if "error" in data:
return []
return data.get("data", [])
def check_silence(minutes: int = 35) -> list[dict]:
"""Find hosts that haven't sent logs within the given timeframe."""
all_hosts = get_labels()
if not all_hosts:
return [{"error": "Could not fetch host labels from Loki"}]
now = datetime.now(timezone.utc)
start = now - timedelta(minutes=minutes)
silent = []
for host in all_hosts:
data = _query("/loki/api/v1/query_range", {
"query": f'count_over_time({{host="{host}"}}[{minutes}m])',
"start": _ns(start),
"end": _ns(now),
"limit": 1,
})
results = data.get("data", {}).get("result", [])
has_logs = any(
int(v[1]) > 0
for r in results
for v in r.get("values", [])
)
if not has_logs:
silent.append({"host": host, "silent_minutes": minutes})
return silent
def get_health(container: str, hours: float = 24) -> dict:
"""Get a health summary for a specific container."""
errors = get_errors(container=container, hours=hours, limit=200)
error_count = len([e for e in errors if "error" not in e])
recent = query_logs(f'{{host="{container}"}}', hours=0.5, limit=5)
has_recent = len([e for e in recent if "error" not in e]) > 0
return {
"host": container,
"errors_last_{hours}h": error_count,
"sending_logs": has_recent,
"status": "healthy" if error_count < 5 and has_recent else
"warning" if error_count < 20 else "critical",
}
WATCHED_SERVICES = [
("rss-manager", "rss-manager"),
("wordpress-v2", "wordpress"),
("fuenfvoracht", "fuenfvoracht"),
("homelab-ai-bot", "hausmeister"),
]
def count_errors(hours: float = 24) -> dict:
"""Zählt Fehler-Log-Einträge über einen Zeitraum via Loki metric query."""
now = datetime.now(timezone.utc)
start = now - timedelta(hours=hours)
q = '{job=~".+"} |~ "(?i)(error|fatal|panic|traceback|exception)" !~ "caller=metrics|query_hash=|executing query|scheduler_processor|Aborted connection|systemd-networkd-wait-online|context canceled|AH01630: client denied|flag evaluation succeeded|pluginsAutoUpdate"'
# Loki instant metric query für Gesamtanzahl
data = _query("/loki/api/v1/query_range", {
"query": q,
"start": _ns(start),
"end": _ns(now),
"limit": 5000,
"direction": "backward",
})
if "error" in data:
return {"error": data["error"], "count": 0}
total = sum(
len(stream.get("values", []))
for stream in data.get("data", {}).get("result", [])
)
# Per-Host aufschlüsseln
per_host: dict[str, int] = {}
for stream in data.get("data", {}).get("result", []):
host = stream.get("stream", {}).get("host", "unknown")
per_host[host] = per_host.get(host, 0) + len(stream.get("values", []))
return {"count": total, "hours": hours, "per_host": per_host}
def check_service_restarts(minutes: int = 35) -> list[dict]:
"""Findet Services die innerhalb des Zeitfensters neu gestartet haben (systemd journal via Loki)."""
restarts = []
for host, service_name in WATCHED_SERVICES:
q = f'{{host="{host}"}} |~ "(?i)(Started|Restarting|restarted).*{service_name}"'
entries = query_logs(q, hours=minutes / 60, limit=5)
real = [e for e in entries if "error" not in e]
if real:
restarts.append({"host": host, "service": service_name, "count": len(real)})
return restarts
ERROR_RATE_THRESHOLDS = {
"rss-manager": 15,
"wordpress-v2": 10,
}
ERROR_RATE_DEFAULT = 25
def check_error_rate(minutes: int = 30) -> list[dict]:
"""Check if any host exceeds its error-rate threshold within the window."""
all_hosts = get_labels()
alerts = []
now = datetime.now(timezone.utc)
for host in all_hosts:
q = f'count_over_time({{host="{host}"}} |~ "(?i)error" [{minutes}m])'
data = _query("/loki/api/v1/query", {"query": q, "time": _ns(now)})
count = sum(
int(float(r.get("value", [None, "0"])[1]))
for r in data.get("data", {}).get("result", [])
if len(r.get("value", [])) > 1
)
threshold = ERROR_RATE_THRESHOLDS.get(host, ERROR_RATE_DEFAULT)
if count > threshold:
alerts.append({"host": host, "count": count, "threshold": threshold})
return alerts
def format_logs(entries: list[dict], max_lines: int = 30) -> str:
"""Format log entries for human/LLM consumption."""
if not entries:
return "No log entries found."
if entries and "error" in entries[0]:
return f"Loki error: {entries[0]['error']}"
lines = []
for e in entries[:max_lines]:
host = e.get("host", "?")
line = e.get("line", "").strip()
if len(line) > 200:
line = line[:200] + "..."
lines.append(f"[{host}] {line}")
total = len(entries)
if total > max_lines:
lines.append(f"\n... and {total - max_lines} more entries")
return "\n".join(lines)