205 lines
7.6 KiB
Python
205 lines
7.6 KiB
Python
"""Loki API client for querying centralized logs."""
|
|
|
|
import requests
|
|
from datetime import datetime, timezone, timedelta
|
|
|
|
LOKI_URL = "http://100.109.206.43:3100"
|
|
|
|
|
|
def _query(endpoint: str, params: dict, base_url: str = None) -> dict:
|
|
url = f"{base_url or LOKI_URL}{endpoint}"
|
|
try:
|
|
r = requests.get(url, params=params, timeout=10)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
except requests.RequestException as e:
|
|
return {"error": str(e)}
|
|
|
|
|
|
def _ns(dt: datetime) -> str:
|
|
return str(int(dt.timestamp() * 1e9))
|
|
|
|
|
|
def query_logs(query: str, hours: float = 1, limit: int = 100) -> list[dict]:
|
|
"""Run a LogQL query and return log entries."""
|
|
now = datetime.now(timezone.utc)
|
|
start = now - timedelta(hours=hours)
|
|
data = _query("/loki/api/v1/query_range", {
|
|
"query": query,
|
|
"start": _ns(start),
|
|
"end": _ns(now),
|
|
"limit": limit,
|
|
"direction": "backward",
|
|
})
|
|
if "error" in data:
|
|
return [{"error": data["error"]}]
|
|
|
|
entries = []
|
|
for stream in data.get("data", {}).get("result", []):
|
|
labels = stream.get("stream", {})
|
|
for ts, line in stream.get("values", []):
|
|
entries.append({
|
|
"timestamp": ts,
|
|
"host": labels.get("host", labels.get("job", "unknown")),
|
|
"line": line,
|
|
})
|
|
return entries
|
|
|
|
|
|
def get_errors(container: str = None, hours: float = 1, limit: int = 200) -> list[dict]:
|
|
"""Get error-level logs, optionally filtered by container hostname."""
|
|
if container:
|
|
q = f'{{host="{container}"}} |~ "(?i)(error|fatal|panic|traceback|exception)" !~ "caller=metrics|query_hash=|executing query|scheduler_processor|Aborted connection|systemd-networkd-wait-online|context canceled|AH01630: client denied|flag evaluation succeeded|pluginsAutoUpdate|terror"'
|
|
else:
|
|
q = '{job=~".+"} |~ "(?i)(error|fatal|panic|traceback|exception)" !~ "caller=metrics|query_hash=|executing query|scheduler_processor|Aborted connection|systemd-networkd-wait-online|context canceled|AH01630: client denied|flag evaluation succeeded|pluginsAutoUpdate|terror"'
|
|
return query_logs(q, hours=hours, limit=limit)
|
|
|
|
|
|
def get_labels() -> list[str]:
|
|
"""Get all available label values for 'host'."""
|
|
data = _query("/loki/api/v1/label/host/values", {})
|
|
if "error" in data:
|
|
return []
|
|
return data.get("data", [])
|
|
|
|
|
|
def check_silence(minutes: int = 35) -> list[dict]:
|
|
"""Find hosts that haven't sent logs within the given timeframe."""
|
|
all_hosts = get_labels()
|
|
if not all_hosts:
|
|
return [{"error": "Could not fetch host labels from Loki"}]
|
|
|
|
now = datetime.now(timezone.utc)
|
|
start = now - timedelta(minutes=minutes)
|
|
silent = []
|
|
|
|
for host in all_hosts:
|
|
data = _query("/loki/api/v1/query_range", {
|
|
"query": f'count_over_time({{host="{host}"}}[{minutes}m])',
|
|
"start": _ns(start),
|
|
"end": _ns(now),
|
|
"limit": 1,
|
|
})
|
|
results = data.get("data", {}).get("result", [])
|
|
has_logs = any(
|
|
int(v[1]) > 0
|
|
for r in results
|
|
for v in r.get("values", [])
|
|
)
|
|
if not has_logs:
|
|
silent.append({"host": host, "silent_minutes": minutes})
|
|
|
|
return silent
|
|
|
|
|
|
def get_health(container: str, hours: float = 24) -> dict:
|
|
"""Get a health summary for a specific container."""
|
|
errors = get_errors(container=container, hours=hours, limit=200)
|
|
error_count = len([e for e in errors if "error" not in e])
|
|
|
|
recent = query_logs(f'{{host="{container}"}}', hours=0.5, limit=5)
|
|
has_recent = len([e for e in recent if "error" not in e]) > 0
|
|
|
|
return {
|
|
"host": container,
|
|
"errors_last_{hours}h": error_count,
|
|
"sending_logs": has_recent,
|
|
"status": "healthy" if error_count < 5 and has_recent else
|
|
"warning" if error_count < 20 else "critical",
|
|
}
|
|
|
|
|
|
WATCHED_SERVICES = [
|
|
("rss-manager", "rss-manager"),
|
|
("wordpress-v2", "wordpress"),
|
|
("fuenfvoracht", "fuenfvoracht"),
|
|
("homelab-ai-bot", "hausmeister"),
|
|
]
|
|
|
|
|
|
def count_errors(hours: float = 24) -> dict:
|
|
"""Zählt Fehler-Log-Einträge über einen Zeitraum via Loki metric query."""
|
|
now = datetime.now(timezone.utc)
|
|
start = now - timedelta(hours=hours)
|
|
q = '{job=~".+"} |~ "(?i)(error|fatal|panic|traceback|exception)" !~ "caller=metrics|query_hash=|executing query|scheduler_processor|Aborted connection|systemd-networkd-wait-online|context canceled|AH01630: client denied|flag evaluation succeeded|pluginsAutoUpdate|terror"'
|
|
# Loki instant metric query für Gesamtanzahl
|
|
data = _query("/loki/api/v1/query_range", {
|
|
"query": q,
|
|
"start": _ns(start),
|
|
"end": _ns(now),
|
|
"limit": 5000,
|
|
"direction": "backward",
|
|
})
|
|
if "error" in data:
|
|
return {"error": data["error"], "count": 0}
|
|
total = sum(
|
|
len(stream.get("values", []))
|
|
for stream in data.get("data", {}).get("result", [])
|
|
)
|
|
# Per-Host aufschlüsseln
|
|
per_host: dict[str, int] = {}
|
|
for stream in data.get("data", {}).get("result", []):
|
|
host = stream.get("stream", {}).get("host", "unknown")
|
|
per_host[host] = per_host.get(host, 0) + len(stream.get("values", []))
|
|
return {"count": total, "hours": hours, "per_host": per_host}
|
|
|
|
|
|
def check_service_restarts(minutes: int = 35) -> list[dict]:
|
|
"""Findet Services die innerhalb des Zeitfensters neu gestartet haben (systemd journal via Loki)."""
|
|
restarts = []
|
|
for host, service_name in WATCHED_SERVICES:
|
|
q = f'{{host="{host}"}} |~ "(?i)(Started|Restarting|restarted).*{service_name}"'
|
|
entries = query_logs(q, hours=minutes / 60, limit=5)
|
|
real = [e for e in entries if "error" not in e]
|
|
if real:
|
|
restarts.append({"host": host, "service": service_name, "count": len(real)})
|
|
return restarts
|
|
|
|
|
|
|
|
ERROR_RATE_THRESHOLDS = {
|
|
"rss-manager": 15,
|
|
"wordpress-v2": 10,
|
|
"forgejo": 200, # Oeffentlich — Scanner-404er sind normal
|
|
}
|
|
ERROR_RATE_DEFAULT = 25
|
|
|
|
|
|
def check_error_rate(minutes: int = 30) -> list[dict]:
|
|
"""Check if any host exceeds its error-rate threshold within the window."""
|
|
all_hosts = get_labels()
|
|
alerts = []
|
|
now = datetime.now(timezone.utc)
|
|
for host in all_hosts:
|
|
q = f'count_over_time({{host="{host}"}} |~ "(?i)error" !~ "caller=metrics|query_hash=|executing query|scheduler_processor|Aborted connection|systemd-networkd-wait-online|context canceled|AH01630: client denied|flag evaluation succeeded|pluginsAutoUpdate|terror" [{minutes}m])'
|
|
data = _query("/loki/api/v1/query", {"query": q, "time": _ns(now)})
|
|
count = sum(
|
|
int(float(r.get("value", [None, "0"])[1]))
|
|
for r in data.get("data", {}).get("result", [])
|
|
if len(r.get("value", [])) > 1
|
|
)
|
|
threshold = ERROR_RATE_THRESHOLDS.get(host, ERROR_RATE_DEFAULT)
|
|
if count > threshold:
|
|
alerts.append({"host": host, "count": count, "threshold": threshold})
|
|
return alerts
|
|
|
|
|
|
def format_logs(entries: list[dict], max_lines: int = 30) -> str:
|
|
"""Format log entries for human/LLM consumption."""
|
|
if not entries:
|
|
return "No log entries found."
|
|
if entries and "error" in entries[0]:
|
|
return f"Loki error: {entries[0]['error']}"
|
|
|
|
lines = []
|
|
for e in entries[:max_lines]:
|
|
host = e.get("host", "?")
|
|
line = e.get("line", "").strip()
|
|
if len(line) > 200:
|
|
line = line[:200] + "..."
|
|
lines.append(f"[{host}] {line}")
|
|
|
|
total = len(entries)
|
|
if total > max_lines:
|
|
lines.append(f"\n... and {total - max_lines} more entries")
|
|
return "\n".join(lines)
|