"""Loki API client for querying centralized logs.""" import requests from datetime import datetime, timezone, timedelta LOKI_URL = "http://100.109.206.43:3100" def _query(endpoint: str, params: dict, base_url: str = None) -> dict: url = f"{base_url or LOKI_URL}{endpoint}" try: r = requests.get(url, params=params, timeout=10) r.raise_for_status() return r.json() except requests.RequestException as e: return {"error": str(e)} def _ns(dt: datetime) -> str: return str(int(dt.timestamp() * 1e9)) def query_logs(query: str, hours: float = 1, limit: int = 100) -> list[dict]: """Run a LogQL query and return log entries.""" now = datetime.now(timezone.utc) start = now - timedelta(hours=hours) data = _query("/loki/api/v1/query_range", { "query": query, "start": _ns(start), "end": _ns(now), "limit": limit, "direction": "backward", }) if "error" in data: return [{"error": data["error"]}] entries = [] for stream in data.get("data", {}).get("result", []): labels = stream.get("stream", {}) for ts, line in stream.get("values", []): entries.append({ "timestamp": ts, "host": labels.get("host", labels.get("job", "unknown")), "line": line, }) return entries def get_errors(container: str = None, hours: float = 1, limit: int = 200) -> list[dict]: """Get error-level logs, optionally filtered by container hostname.""" if container: q = f'{{host="{container}"}} |~ "(?i)(error|fatal|panic|traceback|exception)" !~ "caller=metrics|query_hash=|executing query|scheduler_processor|Aborted connection|systemd-networkd-wait-online|context canceled|AH01630: client denied|flag evaluation succeeded|pluginsAutoUpdate"' else: q = '{job=~".+"} |~ "(?i)(error|fatal|panic|traceback|exception)" !~ "caller=metrics|query_hash=|executing query|scheduler_processor|Aborted connection|systemd-networkd-wait-online|context canceled|AH01630: client denied|flag evaluation succeeded|pluginsAutoUpdate"' return query_logs(q, hours=hours, limit=limit) def get_labels() -> list[str]: """Get all available label values for 'host'.""" data = _query("/loki/api/v1/label/host/values", {}) if "error" in data: return [] return data.get("data", []) def check_silence(minutes: int = 35) -> list[dict]: """Find hosts that haven't sent logs within the given timeframe.""" all_hosts = get_labels() if not all_hosts: return [{"error": "Could not fetch host labels from Loki"}] now = datetime.now(timezone.utc) start = now - timedelta(minutes=minutes) silent = [] for host in all_hosts: data = _query("/loki/api/v1/query_range", { "query": f'count_over_time({{host="{host}"}}[{minutes}m])', "start": _ns(start), "end": _ns(now), "limit": 1, }) results = data.get("data", {}).get("result", []) has_logs = any( int(v[1]) > 0 for r in results for v in r.get("values", []) ) if not has_logs: silent.append({"host": host, "silent_minutes": minutes}) return silent def get_health(container: str, hours: float = 24) -> dict: """Get a health summary for a specific container.""" errors = get_errors(container=container, hours=hours, limit=200) error_count = len([e for e in errors if "error" not in e]) recent = query_logs(f'{{host="{container}"}}', hours=0.5, limit=5) has_recent = len([e for e in recent if "error" not in e]) > 0 return { "host": container, "errors_last_{hours}h": error_count, "sending_logs": has_recent, "status": "healthy" if error_count < 5 and has_recent else "warning" if error_count < 20 else "critical", } WATCHED_SERVICES = [ ("rss-manager", "rss-manager"), ("wordpress-v2", "wordpress"), ("fuenfvoracht", "fuenfvoracht"), ("homelab-ai-bot", "hausmeister"), ] def count_errors(hours: float = 24) -> dict: """Zählt Fehler-Log-Einträge über einen Zeitraum via Loki metric query.""" now = datetime.now(timezone.utc) start = now - timedelta(hours=hours) q = '{job=~".+"} |~ "(?i)(error|fatal|panic|traceback|exception)" !~ "caller=metrics|query_hash=|executing query|scheduler_processor|Aborted connection|systemd-networkd-wait-online|context canceled|AH01630: client denied|flag evaluation succeeded|pluginsAutoUpdate"' # Loki instant metric query für Gesamtanzahl data = _query("/loki/api/v1/query_range", { "query": q, "start": _ns(start), "end": _ns(now), "limit": 5000, "direction": "backward", }) if "error" in data: return {"error": data["error"], "count": 0} total = sum( len(stream.get("values", [])) for stream in data.get("data", {}).get("result", []) ) # Per-Host aufschlüsseln per_host: dict[str, int] = {} for stream in data.get("data", {}).get("result", []): host = stream.get("stream", {}).get("host", "unknown") per_host[host] = per_host.get(host, 0) + len(stream.get("values", [])) return {"count": total, "hours": hours, "per_host": per_host} def check_service_restarts(minutes: int = 35) -> list[dict]: """Findet Services die innerhalb des Zeitfensters neu gestartet haben (systemd journal via Loki).""" restarts = [] for host, service_name in WATCHED_SERVICES: q = f'{{host="{host}"}} |~ "(?i)(Started|Restarting|restarted).*{service_name}"' entries = query_logs(q, hours=minutes / 60, limit=5) real = [e for e in entries if "error" not in e] if real: restarts.append({"host": host, "service": service_name, "count": len(real)}) return restarts ERROR_RATE_THRESHOLDS = { "rss-manager": 15, "wordpress-v2": 10, } ERROR_RATE_DEFAULT = 25 def check_error_rate(minutes: int = 30) -> list[dict]: """Check if any host exceeds its error-rate threshold within the window.""" all_hosts = get_labels() alerts = [] now = datetime.now(timezone.utc) for host in all_hosts: q = f'count_over_time({{host="{host}"}} |~ "(?i)error" !~ "caller=metrics|query_hash=|executing query|scheduler_processor|Aborted connection|systemd-networkd-wait-online|context canceled|AH01630: client denied|flag evaluation succeeded|pluginsAutoUpdate" [{minutes}m])' data = _query("/loki/api/v1/query", {"query": q, "time": _ns(now)}) count = sum( int(float(r.get("value", [None, "0"])[1])) for r in data.get("data", {}).get("result", []) if len(r.get("value", [])) > 1 ) threshold = ERROR_RATE_THRESHOLDS.get(host, ERROR_RATE_DEFAULT) if count > threshold: alerts.append({"host": host, "count": count, "threshold": threshold}) return alerts def format_logs(entries: list[dict], max_lines: int = 30) -> str: """Format log entries for human/LLM consumption.""" if not entries: return "No log entries found." if entries and "error" in entries[0]: return f"Loki error: {entries[0]['error']}" lines = [] for e in entries[:max_lines]: host = e.get("host", "?") line = e.get("line", "").strip() if len(line) > 200: line = line[:200] + "..." lines.append(f"[{host}] {line}") total = len(entries) if total > max_lines: lines.append(f"\n... and {total - max_lines} more entries") return "\n".join(lines)