homelab-brain/homelab-ai-bot/core/loki_client.py

130 lines
4.4 KiB
Python

"""Loki API client for querying centralized logs."""
import requests
from datetime import datetime, timezone, timedelta
LOKI_URL = "http://100.109.206.43:3100"
def _query(endpoint: str, params: dict, base_url: str = None) -> dict:
url = f"{base_url or LOKI_URL}{endpoint}"
try:
r = requests.get(url, params=params, timeout=10)
r.raise_for_status()
return r.json()
except requests.RequestException as e:
return {"error": str(e)}
def _ns(dt: datetime) -> str:
return str(int(dt.timestamp() * 1e9))
def query_logs(query: str, hours: float = 1, limit: int = 100) -> list[dict]:
"""Run a LogQL query and return log entries."""
now = datetime.now(timezone.utc)
start = now - timedelta(hours=hours)
data = _query("/loki/api/v1/query_range", {
"query": query,
"start": _ns(start),
"end": _ns(now),
"limit": limit,
"direction": "backward",
})
if "error" in data:
return [{"error": data["error"]}]
entries = []
for stream in data.get("data", {}).get("result", []):
labels = stream.get("stream", {})
for ts, line in stream.get("values", []):
entries.append({
"timestamp": ts,
"host": labels.get("host", labels.get("job", "unknown")),
"line": line,
})
return entries
def get_errors(container: str = None, hours: float = 1, limit: int = 50) -> list[dict]:
"""Get error-level logs, optionally filtered by container hostname."""
if container:
q = f'{{host="{container}"}} |~ "(?i)(error|fatal|panic|traceback|exception)" !~ "caller=metrics|query_hash=|executing query|scheduler_processor|Aborted connection|systemd-networkd-wait-online|context canceled|AH01630: client denied"'
else:
q = '{job=~".+"} |~ "(?i)(error|fatal|panic|traceback|exception)" !~ "caller=metrics|query_hash=|executing query|scheduler_processor|Aborted connection|systemd-networkd-wait-online|context canceled|AH01630: client denied"'
return query_logs(q, hours=hours, limit=limit)
def get_labels() -> list[str]:
"""Get all available label values for 'host'."""
data = _query("/loki/api/v1/label/host/values", {})
if "error" in data:
return []
return data.get("data", [])
def check_silence(minutes: int = 35) -> list[dict]:
"""Find hosts that haven't sent logs within the given timeframe."""
all_hosts = get_labels()
if not all_hosts:
return [{"error": "Could not fetch host labels from Loki"}]
now = datetime.now(timezone.utc)
start = now - timedelta(minutes=minutes)
silent = []
for host in all_hosts:
data = _query("/loki/api/v1/query_range", {
"query": f'count_over_time({{host="{host}"}}[{minutes}m])',
"start": _ns(start),
"end": _ns(now),
"limit": 1,
})
results = data.get("data", {}).get("result", [])
has_logs = any(
int(v[1]) > 0
for r in results
for v in r.get("values", [])
)
if not has_logs:
silent.append({"host": host, "silent_minutes": minutes})
return silent
def get_health(container: str, hours: float = 24) -> dict:
"""Get a health summary for a specific container."""
errors = get_errors(container=container, hours=hours, limit=200)
error_count = len([e for e in errors if "error" not in e])
recent = query_logs(f'{{host="{container}"}}', hours=0.5, limit=5)
has_recent = len([e for e in recent if "error" not in e]) > 0
return {
"host": container,
"errors_last_{hours}h": error_count,
"sending_logs": has_recent,
"status": "healthy" if error_count < 5 and has_recent else
"warning" if error_count < 20 else "critical",
}
def format_logs(entries: list[dict], max_lines: int = 30) -> str:
"""Format log entries for human/LLM consumption."""
if not entries:
return "No log entries found."
if entries and "error" in entries[0]:
return f"Loki error: {entries[0]['error']}"
lines = []
for e in entries[:max_lines]:
host = e.get("host", "?")
line = e.get("line", "").strip()
if len(line) > 200:
line = line[:200] + "..."
lines.append(f"[{host}] {line}")
total = len(entries)
if total > max_lines:
lines.append(f"\n... and {total - max_lines} more entries")
return "\n".join(lines)