homelab-brain/homelab-ai-bot/core/forgejo_client.py

91 lines
2.6 KiB
Python

"""Forgejo REST API Client — Issues, Repos, Commits."""
import requests
from typing import Optional
FORGEJO_URL = ""
FORGEJO_TOKEN = ""
REPO = "orbitalo/homelab-brain"
def init(cfg):
global FORGEJO_URL, FORGEJO_TOKEN
ct_111 = None
for c in cfg.containers:
if c.vmid == 111 and c.host == "pve-hetzner":
ct_111 = c
break
ip = ct_111.tailscale_ip if ct_111 else "100.89.246.60"
FORGEJO_URL = f"http://{ip}:3000/api/v1"
FORGEJO_TOKEN = cfg.api_keys.get("forgejo_token", cfg.raw.get("FORGEJO_TOKEN", ""))
def _get(endpoint: str, params: dict = None) -> Optional[dict | list]:
if not FORGEJO_URL or not FORGEJO_TOKEN:
return None
try:
r = requests.get(
f"{FORGEJO_URL}{endpoint}",
headers={"Authorization": f"token {FORGEJO_TOKEN}"},
params=params or {},
timeout=10,
)
r.raise_for_status()
return r.json()
except Exception:
return None
def get_open_issues(repo: str = REPO) -> list[dict]:
data = _get(f"/repos/{repo}/issues", {"state": "open", "limit": 20})
if not data:
return []
results = []
for i in data:
labels = [l["name"] for l in i.get("labels", [])]
results.append({
"number": i["number"],
"title": i["title"],
"labels": labels,
"created": i["created_at"][:10],
})
return results
def get_recent_commits(repo: str = REPO, limit: int = 10) -> list[dict]:
data = _get(f"/repos/{repo}/commits", {"limit": limit})
if not data:
return []
results = []
for c in data:
msg = c["commit"]["message"].split("\n")[0][:80]
date = c["commit"]["author"]["date"][:16]
results.append({"date": date, "message": msg})
return results
def get_repos() -> list[dict]:
data = _get("/repos/search", {"limit": 20})
if not data or "data" not in data:
return []
return [{"name": r["full_name"], "issues": r["open_issues_count"]} for r in data["data"]]
def format_overview() -> str:
issues = get_open_issues()
commits = get_recent_commits(limit=5)
lines = [f"Forgejo — {len(issues)} offene Issues\n"]
if issues:
for i in issues:
lbl = f" [{', '.join(i['labels'])}]" if i["labels"] else ""
lines.append(f" #{i['number']}: {i['title']}{lbl}")
else:
lines.append(" Keine offenen Issues.")
if commits:
lines.append("\nLetzte Commits:")
for c in commits:
lines.append(f" {c['date']} {c['message']}")
return "\n".join(lines)