95 lines
2.4 KiB
Python
95 lines
2.4 KiB
Python
"""Seafile REST API Client — Speicherplatz und Bibliotheken."""
|
|
|
|
import requests
|
|
from typing import Optional
|
|
|
|
SEAFILE_URL = ""
|
|
SEAFILE_USER = ""
|
|
SEAFILE_PASS = ""
|
|
_token_cache = ""
|
|
|
|
|
|
def init(cfg):
|
|
global SEAFILE_URL, SEAFILE_USER, SEAFILE_PASS, _token_cache
|
|
_token_cache = ""
|
|
ct_103 = None
|
|
for c in cfg.containers:
|
|
if c.vmid == 103 and c.host == "pve-hetzner":
|
|
ct_103 = c
|
|
break
|
|
ip = "10.10.10.103"
|
|
SEAFILE_URL = f"http://{ip}:8080"
|
|
SEAFILE_USER = cfg.raw.get("SEAFILE_ADMIN_EMAIL", "admin@orbitalo.net")
|
|
SEAFILE_PASS = cfg.passwords.get("default", "")
|
|
|
|
|
|
def _get_token() -> str:
|
|
global _token_cache
|
|
if _token_cache:
|
|
return _token_cache
|
|
try:
|
|
r = requests.post(
|
|
f"{SEAFILE_URL}/api2/auth-token/",
|
|
data={"username": SEAFILE_USER, "password": SEAFILE_PASS},
|
|
timeout=5,
|
|
)
|
|
r.raise_for_status()
|
|
_token_cache = r.json()["token"]
|
|
return _token_cache
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def _get(endpoint: str) -> Optional[dict | list]:
|
|
token = _get_token()
|
|
if not token:
|
|
return None
|
|
try:
|
|
r = requests.get(
|
|
f"{SEAFILE_URL}{endpoint}",
|
|
headers={"Authorization": f"Token {token}"},
|
|
timeout=10,
|
|
)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def get_account_info() -> dict:
|
|
data = _get("/api2/account/info/")
|
|
if not data:
|
|
return {"error": "nicht erreichbar"}
|
|
return {
|
|
"usage_gb": data.get("usage", 0) / 1024**3,
|
|
"total_gb": data.get("total", 0) / 1024**3,
|
|
"email": data.get("email", ""),
|
|
}
|
|
|
|
|
|
def get_libraries() -> list[dict]:
|
|
data = _get("/api2/repos/")
|
|
if not data:
|
|
return []
|
|
results = []
|
|
for r in data:
|
|
results.append({
|
|
"name": r.get("name", "?"),
|
|
"size_mb": r.get("size", 0) / 1024**2,
|
|
"encrypted": r.get("encrypted", False),
|
|
})
|
|
return results
|
|
|
|
|
|
def format_overview() -> str:
|
|
acct = get_account_info()
|
|
if "error" in acct:
|
|
return "Seafile nicht erreichbar."
|
|
|
|
libs = get_libraries()
|
|
lines = [f"Seafile — {acct['usage_gb']:.1f} GB belegt\n"]
|
|
if libs:
|
|
lines.append("Bibliotheken:")
|
|
for lib in libs:
|
|
lines.append(f" {lib['name']}: {lib['size_mb']:.0f} MB")
|
|
return "\n".join(lines)
|