homelab-brain/homelab-ai-bot/telegram_bot.py

516 lines
18 KiB
Python

"""Orbitalo Hausmeister — Telegram Bot für Homelab-Management."""
import asyncio
import logging
import sys
import os
import fcntl
import atexit
import signal
sys.path.insert(0, os.path.dirname(__file__))
PIDFILE = "/tmp/hausmeister-bot.pid"
_lock_fp = None
def _acquire_lock():
"""Stellt sicher, dass nur eine Bot-Instanz läuft (PID-File + flock)."""
global _lock_fp
_lock_fp = open(PIDFILE, "w")
try:
fcntl.flock(_lock_fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError:
print(f"ABBRUCH: Bot läuft bereits (PID-File {PIDFILE} ist gelockt)", file=sys.stderr)
sys.exit(1)
_lock_fp.write(str(os.getpid()))
_lock_fp.flush()
def _release_lock():
global _lock_fp
if _lock_fp:
try:
fcntl.flock(_lock_fp, fcntl.LOCK_UN)
_lock_fp.close()
os.unlink(PIDFILE)
except OSError:
pass
from telegram import BotCommand, Update, ReplyKeyboardMarkup, KeyboardButton, InlineKeyboardMarkup, InlineKeyboardButton
from telegram.ext import (
Application, CommandHandler, MessageHandler, CallbackQueryHandler, filters, ContextTypes,
)
BOT_COMMANDS = [
BotCommand("status", "Alle Container"),
BotCommand("errors", "Aktuelle Fehler"),
BotCommand("ct", "Container-Detail (/ct 109)"),
BotCommand("health", "Health-Check (/health wordpress)"),
BotCommand("logs", "Letzte Logs (/logs rss-manager)"),
BotCommand("silence", "Stille Hosts"),
BotCommand("report", "Tagesbericht"),
BotCommand("check", "Monitoring-Check"),
BotCommand("feeds", "Feed-Status & Artikel heute"),
BotCommand("memory", "Offene Memory-Kandidaten"),
BotCommand("start", "Hilfe anzeigen"),
]
KEYBOARD = ReplyKeyboardMarkup(
[
[KeyboardButton("📊 Status"), KeyboardButton("❌ Fehler"), KeyboardButton("📰 Feeds")],
[KeyboardButton("📋 Report"), KeyboardButton("🔧 Check"), KeyboardButton("🔇 Stille")],
],
resize_keyboard=True,
is_persistent=True,
)
BUTTON_MAP = {
"📊 Status": "status",
"❌ Fehler": "errors",
"📰 Feeds": "feeds",
"📋 Report": "report",
"🔧 Check": "check",
"🔇 Stille": "silence",
}
import context
import requests as _req
import llm
import memory_client
import monitor
from core import config
logging.basicConfig(
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
level=logging.INFO,
)
log = logging.getLogger("hausmeister")
ALLOWED_CHAT_IDS: set[int] = set()
def _load_token_and_chat():
cfg = config.parse_config()
token = cfg.raw.get("TG_HAUSMEISTER_TOKEN", "")
chat_id = cfg.raw.get("TG_CHAT_ID", "")
if chat_id:
ALLOWED_CHAT_IDS.add(int(chat_id))
return token
def _authorized(update: Update) -> bool:
if not ALLOWED_CHAT_IDS:
return True
return update.effective_chat.id in ALLOWED_CHAT_IDS
async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not _authorized(update):
return
await update.message.reply_text(
"🔧 Orbitalo Hausmeister-Bot\n\n"
"Befehle:\n"
"/status — Alle Container\n"
"/errors — Aktuelle Fehler\n"
"/ct <nr> — Container-Detail\n"
"/health <name> — Health-Check\n"
"/logs <name> — Letzte Logs\n"
"/silence — Stille Hosts\n"
"/report — Tagesbericht\n"
"/check — Monitoring-Check\n"
"/feeds — Feed-Status & Artikel\n"
"/memory — Offene Memory-Kandidaten\n\n"
"Oder einfach eine Frage stellen!",
reply_markup=KEYBOARD,
)
async def cmd_status(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not _authorized(update):
return
await update.message.reply_text("⏳ Lade Container-Status...")
try:
text = context.gather_status()
if len(text) > 4000:
text = text[:4000] + "\n..."
await update.message.reply_text(text)
except Exception as e:
await update.message.reply_text(f"Fehler: {e}")
async def cmd_errors(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not _authorized(update):
return
await update.message.reply_text("⏳ Suche Fehler...")
try:
text = context.gather_errors(hours=2)
await update.message.reply_text(text[:4000])
except Exception as e:
await update.message.reply_text(f"Fehler: {e}")
async def cmd_ct(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not _authorized(update):
return
args = ctx.args
if not args:
await update.message.reply_text("Bitte CT-Nummer angeben: /ct 109")
return
try:
text = context.gather_container_status(args[0])
await update.message.reply_text(text)
except Exception as e:
await update.message.reply_text(f"Fehler: {e}")
async def cmd_health(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not _authorized(update):
return
args = ctx.args
if not args:
await update.message.reply_text("Bitte Hostname angeben: /health wordpress")
return
try:
text = context.gather_health(args[0])
await update.message.reply_text(text)
except Exception as e:
await update.message.reply_text(f"Fehler: {e}")
async def cmd_logs(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not _authorized(update):
return
args = ctx.args
if not args:
await update.message.reply_text("Bitte Hostname angeben: /logs rss-manager")
return
try:
text = context.gather_logs(args[0])
await update.message.reply_text(text[:4000])
except Exception as e:
await update.message.reply_text(f"Fehler: {e}")
async def cmd_silence(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not _authorized(update):
return
try:
text = context.gather_silence()
await update.message.reply_text(text)
except Exception as e:
await update.message.reply_text(f"Fehler: {e}")
async def cmd_report(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not _authorized(update):
return
await update.message.reply_text("⏳ Erstelle Tagesbericht...")
try:
text = monitor.format_report()
await update.message.reply_text(text[:4000])
except Exception as e:
await update.message.reply_text(f"Fehler: {e}")
async def cmd_check(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not _authorized(update):
return
await update.message.reply_text("⏳ Prüfe Systeme...")
try:
alerts = monitor.check_all()
if alerts:
text = f"⚠️ {len(alerts)} Alarme:\n\n" + "\n".join(alerts)
else:
text = "✅ Keine Alarme — alles läuft."
await update.message.reply_text(text)
except Exception as e:
await update.message.reply_text(f"Fehler: {e}")
def _get_feed_stats():
"""Holt Feed-Statistiken von der RSS Manager API."""
cfg = config.parse_config()
ct_109 = config.get_container(cfg, vmid=109)
url = f"http://{ct_109.tailscale_ip}:8080/api/feed-stats" if ct_109 else None
if not url:
return None
try:
r = _req.get(url, timeout=10)
return r.json() if r.ok else None
except Exception:
return None
def format_feed_report(stats: dict) -> str:
"""Formatiert Feed-Statistiken für Telegram."""
today = stats["today"]
yesterday = stats["yesterday"]
feeds = stats["feeds"]
lines = [f"📊 Feed-Report ({stats['date']})", f"Artikel heute: {today} (gestern: {yesterday})", ""]
active = [f for f in feeds if f["posts_today"] > 0]
if active:
lines.append("📰 Aktive Feeds:")
for f in active:
lines.append(f" {f['name']}: {f['posts_today']} ({f['schedule']})")
silent = [f for f in feeds if f["posts_today"] == 0]
if silent:
lines.append("")
lines.append("😴 Keine Artikel heute:")
for f in silent:
yd = f"(gestern: {f['posts_yesterday']})" if f["posts_yesterday"] > 0 else "(auch gestern 0)"
lines.append(f" {f['name']} {yd}")
errors = [f for f in feeds if f["error_count"] and f["error_count"] > 0]
if errors:
lines.append("")
lines.append("⚠️ Fehler:")
for f in errors:
lines.append(f" {f['name']}: {f['error_count']}x — {f['last_error']}")
return "\n".join(lines)
async def cmd_feeds(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not _authorized(update):
return
await update.message.reply_text("⏳ Lade Feed-Status...")
try:
stats = _get_feed_stats()
if not stats:
await update.message.reply_text("RSS Manager nicht erreichbar.")
return
text = format_feed_report(stats)
await update.message.reply_text(text[:4000])
except Exception as e:
await update.message.reply_text(f"Fehler: {e}")
_STOP_WORDS = {"ich", "bin", "ist", "der", "die", "das", "ein", "eine", "und", "oder",
"in", "auf", "an", "fuer", "für", "von", "zu", "mit", "nach", "mein",
"meine", "meinem", "meinen", "hat", "habe", "wird", "will", "soll",
"nicht", "auch", "noch", "schon", "nur", "aber", "wenn", "weil", "dass"}
def _find_matching_item(user_text: str, items: list[dict]) -> dict | None:
"""Findet das Item mit der besten Wort-Ueberlappung zum User-Text."""
words = {w.lower().strip(".,!?") for w in user_text.split() if len(w) > 2}
words -= _STOP_WORDS
if not words:
return None
best, best_score = None, 0
for c in items:
content_words = {w.lower().strip(".,!?") for w in c["content"].split() if len(w) > 2}
content_words -= _STOP_WORDS
if not content_words:
continue
overlap = len(words & content_words)
score = overlap / max(len(words), 1)
if score > best_score and score >= 0.3:
best, best_score = c, score
return best
def _memory_buttons(item_id: int) -> InlineKeyboardMarkup:
return InlineKeyboardMarkup([[
InlineKeyboardButton("🕒 Temporär", callback_data=f"mem_temp:{item_id}"),
InlineKeyboardButton("📌 Dauerhaft", callback_data=f"mem_perm:{item_id}"),
InlineKeyboardButton("❌ Verwerfen", callback_data=f"mem_delete:{item_id}"),
]])
def _format_candidate(item: dict) -> str:
mtype = item.get("memory_type") or "?"
type_icon = "🕒" if mtype == "temporary" else "📌" if mtype == "permanent" else ""
line = f"{type_icon} [{item['scope']}/{item['kind']}] {mtype}\n{item['content']}"
exp = item.get("expires_at")
if exp:
from datetime import datetime
line += f"\n⏰ Ablauf: {datetime.fromtimestamp(exp).strftime('%d.%m.%Y %H:%M')}"
return line
async def cmd_memory(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not _authorized(update):
return
candidates = memory_client.get_candidates()
if not candidates:
await update.message.reply_text("Keine offenen Kandidaten.")
return
for item in candidates:
text = _format_candidate(item)
await update.message.reply_text(text, reply_markup=_memory_buttons(item["id"]))
async def handle_memory_callback(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
data = query.data or ""
content_preview = query.message.text.split("\n", 2)[-1][:120] if query.message.text else ""
if data.startswith("mem_temp:"):
item_id = int(data.split(":")[1])
candidates = memory_client.get_candidates()
item = next((c for c in candidates if c["id"] == item_id), None)
exp = None
if item:
exp = item.get("expires_at") or memory_client.parse_expires_from_text(item.get("content", ""))
if not exp:
exp = memory_client.default_expires()
ok = memory_client.activate_candidate(item_id, memory_type="temporary", expires_at=exp)
if ok:
from datetime import datetime
exp_str = datetime.fromtimestamp(exp).strftime("%d.%m.%Y")
await query.edit_message_text(f"🕒 Temporär gespeichert (bis {exp_str}): {content_preview}")
else:
await query.edit_message_text("Fehler beim Speichern.")
elif data.startswith("mem_perm:"):
item_id = int(data.split(":")[1])
ok = memory_client.activate_candidate(item_id, memory_type="permanent")
if ok:
await query.edit_message_text(f"📌 Dauerhaft gespeichert: {content_preview}")
else:
await query.edit_message_text("Fehler beim Speichern.")
elif data.startswith("mem_delete:"):
item_id = int(data.split(":")[1])
ok = memory_client.delete_candidate(item_id)
if ok:
await query.edit_message_text("🗑 Verworfen.")
else:
await query.edit_message_text("Fehler beim Loeschen.")
async def handle_message(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
"""Button-Presses und Freitext-Fragen verarbeiten."""
if not _authorized(update):
return
text = update.message.text
if not text:
return
cmd = BUTTON_MAP.get(text)
if cmd == "status":
return await cmd_status(update, ctx)
elif cmd == "errors":
return await cmd_errors(update, ctx)
elif cmd == "feeds":
return await cmd_feeds(update, ctx)
elif cmd == "report":
return await cmd_report(update, ctx)
elif cmd == "check":
return await cmd_check(update, ctx)
elif cmd == "silence":
return await cmd_silence(update, ctx)
channel_key = str(update.effective_chat.id)
session_id = memory_client.get_or_create_session(channel_key, source="telegram")
await update.message.reply_text("🤔 Denke nach...")
try:
context.last_suggest_result = {"type": None, "candidate_id": None}
candidates_before = {c["id"] for c in memory_client.get_candidates()}
handlers = context.get_tool_handlers(session_id=session_id)
answer = llm.ask_with_tools(text, handlers, session_id=session_id)
if session_id:
memory_client.log_message(session_id, "user", text)
memory_client.log_message(session_id, "assistant", answer)
suggest = context.last_suggest_result
log.info("suggest_result: type=%s candidate_id=%s", suggest.get("type"), suggest.get("candidate_id"))
sent = False
if suggest["type"] == "existing_candidate" and suggest["candidate_id"]:
candidates = memory_client.get_candidates()
item = next((c for c in candidates if c["id"] == suggest["candidate_id"]), None)
if item:
await update.message.reply_text(
answer[:4000] + "\n\n" + _format_candidate(item),
reply_markup=_memory_buttons(item["id"]),
)
sent = True
elif suggest["type"] == "new_candidate":
candidates_after = memory_client.get_candidates()
new_candidates = [c for c in candidates_after if c["id"] not in candidates_before]
if new_candidates:
c = new_candidates[0]
type_icon = "🕒" if c.get("memory_type") == "temporary" else "📌" if c.get("memory_type") == "permanent" else "📝"
await update.message.reply_text(
answer[:4000] + "\n\n" + type_icon + " " + c["content"],
reply_markup=_memory_buttons(c["id"]),
)
sent = True
if not sent and suggest["type"] is None:
matched_cand = _find_matching_item(text, memory_client.get_candidates())
if matched_cand:
log.info("Fallback-Match: Kandidat ID=%s", matched_cand["id"])
await update.message.reply_text(
answer[:4000] + "\n\n" + _format_candidate(matched_cand),
reply_markup=_memory_buttons(matched_cand["id"]),
)
sent = True
else:
matched_active = _find_matching_item(text, memory_client.get_active_memory())
if matched_active:
from datetime import datetime as _dt
mtype = matched_active.get("memory_type", "")
exp = matched_active.get("expires_at")
if mtype == "temporary" and exp:
status_msg = f"\n\n🕒 Schon temporär gespeichert bis {_dt.fromtimestamp(exp).strftime('%d.%m.%Y')}."
elif mtype == "permanent":
status_msg = "\n\n📌 Schon dauerhaft gespeichert."
else:
status_msg = "\n\n✅ Bereits gespeichert."
await update.message.reply_text(answer[:3900] + status_msg, reply_markup=KEYBOARD)
sent = True
if not sent:
await update.message.reply_text(answer[:4000], reply_markup=KEYBOARD)
except Exception as e:
log.exception("Fehler bei Freitext")
await update.message.reply_text(f"Fehler: {e}")
def main():
token = _load_token_and_chat()
if not token:
log.error("TG_HAUSMEISTER_TOKEN fehlt in homelab.conf!")
sys.exit(1)
_acquire_lock()
atexit.register(_release_lock)
signal.signal(signal.SIGTERM, lambda *_: sys.exit(0))
log.info("Starte Orbitalo Hausmeister-Bot...")
app = Application.builder().token(token).build()
app.add_handler(CommandHandler("start", cmd_start))
app.add_handler(CommandHandler("status", cmd_status))
app.add_handler(CommandHandler("errors", cmd_errors))
app.add_handler(CommandHandler("ct", cmd_ct))
app.add_handler(CommandHandler("health", cmd_health))
app.add_handler(CommandHandler("logs", cmd_logs))
app.add_handler(CommandHandler("silence", cmd_silence))
app.add_handler(CommandHandler("report", cmd_report))
app.add_handler(CommandHandler("check", cmd_check))
app.add_handler(CommandHandler("feeds", cmd_feeds))
app.add_handler(CommandHandler("memory", cmd_memory))
app.add_handler(CallbackQueryHandler(handle_memory_callback, pattern=r"^mem_"))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
async def post_init(application):
await application.bot.set_my_commands(BOT_COMMANDS)
log.info("Kommandomenü registriert")
app.post_init = post_init
log.info("Bot läuft — polling gestartet")
app.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()