diff --git a/homelab-ai-bot/telegram_bot.py b/homelab-ai-bot/telegram_bot.py index 5fba5577..baa16e3c 100644 --- a/homelab-ai-bot/telegram_bot.py +++ b/homelab-ai-bot/telegram_bot.py @@ -1 +1,1042 @@ - await asyncio.to_thread(monitor.run_check_and_alert) \ No newline at end of file +"""Orbitalo Hausmeister — Telegram Bot für Homelab-Management.""" + +import asyncio +import logging +import sys +import os +import fcntl +import atexit +import signal +import socket as _socket + +sys.path.insert(0, os.path.dirname(__file__)) + + +def _sd_notify(msg: str): + """Systemd Notify ohne externe Abhaengigkeit.""" + addr = os.environ.get("NOTIFY_SOCKET") + if not addr: + return + if addr[0] == "@": + addr = "\0" + addr[1:] + sock = _socket.socket(_socket.AF_UNIX, _socket.SOCK_DGRAM) + try: + sock.sendto(msg.encode(), addr) + finally: + sock.close() + + +async def _watchdog_loop(): + """Sendet alle 50s WATCHDOG=1 an systemd.""" + while True: + _sd_notify("WATCHDOG=1") + await asyncio.sleep(50) + +PIDFILE = "/tmp/hausmeister-bot.pid" +_lock_fp = None + + +def _acquire_lock(): + """Stellt sicher, dass nur eine Bot-Instanz läuft (PID-File + flock).""" + global _lock_fp + _lock_fp = open(PIDFILE, "w") + try: + fcntl.flock(_lock_fp, fcntl.LOCK_EX | fcntl.LOCK_NB) + except OSError: + print(f"ABBRUCH: Bot läuft bereits (PID-File {PIDFILE} ist gelockt)", file=sys.stderr) + sys.exit(1) + _lock_fp.write(str(os.getpid())) + _lock_fp.flush() + + +def _release_lock(): + global _lock_fp + if _lock_fp: + try: + fcntl.flock(_lock_fp, fcntl.LOCK_UN) + _lock_fp.close() + os.unlink(PIDFILE) + except OSError: + pass + +from telegram import ( + BotCommand, Update, ReplyKeyboardMarkup, KeyboardButton, + InlineKeyboardButton, InlineKeyboardMarkup, +) +from telegram.ext import ( + Application, CommandHandler, MessageHandler, CallbackQueryHandler, + filters, ContextTypes, +) + +BOT_COMMANDS = [ + BotCommand("status", "Alle Container"), + BotCommand("errors", "Aktuelle Fehler"), + BotCommand("ct", "Container-Detail (/ct 109)"), + BotCommand("health", "Health-Check (/health wordpress)"), + BotCommand("logs", "Letzte Logs (/logs rss-manager)"), + BotCommand("silence", "Stille Hosts"), + BotCommand("report", "Tagesbericht"), + BotCommand("check", "Monitoring-Check"), + BotCommand("feeds", "Feed-Status & Artikel heute"), + BotCommand("memory", "Gedaechtnis anzeigen"), + BotCommand("start", "Hilfe anzeigen"), +] + + +KEYBOARD = ReplyKeyboardMarkup( + [ + [KeyboardButton("📊 Status"), KeyboardButton("❌ Fehler"), KeyboardButton("📰 Feeds")], + [KeyboardButton("📋 Report"), KeyboardButton("🔧 Check"), KeyboardButton("🔇 Stille")], + ], + resize_keyboard=True, + is_persistent=True, +) + +BUTTON_MAP = { + "📊 Status": "status", + "❌ Fehler": "errors", + "📰 Feeds": "feeds", + "📋 Report": "report", + "🔧 Check": "check", + "🔇 Stille": "silence", +} + +import context +import requests as _req +import llm +import memory_client +import action_guard +import monitor +import voice +from core import config + +logging.basicConfig( + format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", + level=logging.INFO, +) +log = logging.getLogger("hausmeister") + +ALLOWED_CHAT_IDS: set[int] = set() +CHAT_ID: int | None = None +ACTIVE_LLM_TASKS: dict[int, asyncio.Task] = {} + + +def _load_token_and_chat(): + global CHAT_ID + cfg = config.parse_config() + token = cfg.raw.get("TG_HAUSMEISTER_TOKEN", "") + chat_id = cfg.raw.get("TG_CHAT_ID", "") + if chat_id: + CHAT_ID = int(chat_id) + ALLOWED_CHAT_IDS.add(CHAT_ID) + return token + + +def _authorized(update: Update) -> bool: + if not ALLOWED_CHAT_IDS: + return True + return update.effective_chat.id in ALLOWED_CHAT_IDS + + +async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + if not _authorized(update): + return + await update.message.reply_text( + "🔧 Orbitalo Hausmeister-Bot\n\n" + "Befehle:\n" + "/status — Alle Container\n" + "/errors — Aktuelle Fehler\n" + "/ct — Container-Detail\n" + "/health — Health-Check\n" + "/logs — Letzte Logs\n" + "/silence — Stille Hosts\n" + "/report — Tagesbericht\n" + "/check — Monitoring-Check\n" + "/feeds — Feed-Status & Artikel\n" + "/memory — Gedaechtnis anzeigen\n\n" + "📷 Foto senden = Bilderkennung\n\n" + "Oder einfach eine Frage stellen!", + reply_markup=KEYBOARD, + ) + + +async def cmd_status(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + if not _authorized(update): + return + await update.message.reply_text("⏳ Lade Container-Status...") + try: + text = context.gather_status() + if len(text) > 4000: + text = text[:4000] + "\n..." + await update.message.reply_text(text) + except asyncio.CancelledError: + log.info("Freitext-Lauf abgebrochen") + return + except Exception as e: + await update.message.reply_text(f"Fehler: {e}") + + +async def cmd_errors(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + if not _authorized(update): + return + await update.message.reply_text("⏳ Suche Fehler...") + try: + text = context.gather_errors(hours=2) + await update.message.reply_text(text[:4000]) + except Exception as e: + await update.message.reply_text(f"Fehler: {e}") + + +async def cmd_ct(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + if not _authorized(update): + return + args = ctx.args + if not args: + await update.message.reply_text("Bitte CT-Nummer angeben: /ct 109") + return + try: + text = context.gather_container_status(args[0]) + await update.message.reply_text(text) + except Exception as e: + await update.message.reply_text(f"Fehler: {e}") + + +async def cmd_health(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + if not _authorized(update): + return + args = ctx.args + if not args: + await update.message.reply_text("Bitte Hostname angeben: /health wordpress") + return + try: + text = context.gather_health(args[0]) + await update.message.reply_text(text) + except Exception as e: + await update.message.reply_text(f"Fehler: {e}") + + +async def cmd_logs(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + if not _authorized(update): + return + args = ctx.args + if not args: + await update.message.reply_text("Bitte Hostname angeben: /logs rss-manager") + return + try: + text = context.gather_logs(args[0]) + await update.message.reply_text(text[:4000]) + except Exception as e: + await update.message.reply_text(f"Fehler: {e}") + + +async def cmd_silence(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + if not _authorized(update): + return + try: + text = context.gather_silence() + await update.message.reply_text(text) + except Exception as e: + await update.message.reply_text(f"Fehler: {e}") + + +async def cmd_report(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + if not _authorized(update): + return + await update.message.reply_text("⏳ Erstelle Tagesbericht...") + try: + text = monitor.format_report() + await update.message.reply_text(text[:4000]) + except Exception as e: + await update.message.reply_text(f"Fehler: {e}") + + +async def cmd_check(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + if not _authorized(update): + return + await update.message.reply_text("⏳ Prüfe Systeme...") + try: + alerts = monitor.check_all() + if alerts: + text = f"⚠️ {len(alerts)} Alarme:\n\n" + "\n".join(alerts) + else: + text = "✅ Keine Alarme — alles läuft." + await update.message.reply_text(text) + except Exception as e: + await update.message.reply_text(f"Fehler: {e}") + + + + +def _get_feed_stats(): + """Holt Feed-Statistiken von der RSS Manager API.""" + cfg = config.parse_config() + ct_109 = config.get_container(cfg, vmid=109) + url = f"http://{ct_109.tailscale_ip}:8080/api/feed-stats" if ct_109 else None + if not url: + return None + try: + r = _req.get(url, timeout=10) + return r.json() if r.ok else None + except Exception: + return None + + +def format_feed_report(stats: dict) -> str: + """Formatiert Feed-Statistiken für Telegram.""" + today = stats["today"] + yesterday = stats["yesterday"] + feeds = stats["feeds"] + lines = [f"📊 Feed-Report ({stats['date']})", f"Artikel heute: {today} (gestern: {yesterday})", ""] + active = [f for f in feeds if f["posts_today"] > 0] + if active: + lines.append("📰 Aktive Feeds:") + for f in active: + lines.append(f" {f['name']}: {f['posts_today']} ({f['schedule']})") + silent = [f for f in feeds if f["posts_today"] == 0] + if silent: + lines.append("") + lines.append("😴 Keine Artikel heute:") + for f in silent: + yd = f"(gestern: {f['posts_yesterday']})" if f["posts_yesterday"] > 0 else "(auch gestern 0)" + lines.append(f" {f['name']} {yd}") + errors = [f for f in feeds if f["error_count"] and f["error_count"] > 0] + if errors: + lines.append("") + lines.append("⚠️ Fehler:") + for f in errors: + lines.append(f" {f['name']}: {f['error_count']}x — {f['last_error']}") + return "\n".join(lines) + + +async def cmd_feeds(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + if not _authorized(update): + return + await update.message.reply_text("⏳ Lade Feed-Status...") + try: + stats = _get_feed_stats() + if not stats: + await update.message.reply_text("RSS Manager nicht erreichbar.") + return + text = format_feed_report(stats) + await update.message.reply_text(text[:4000]) + except Exception as e: + await update.message.reply_text(f"Fehler: {e}") + +_STOP_WORDS = {"ich", "bin", "ist", "der", "die", "das", "ein", "eine", "und", "oder", + "in", "auf", "an", "fuer", "für", "von", "zu", "mit", "nach", "mein", + "meine", "meinem", "meinen", "hat", "habe", "wird", "will", "soll", + "nicht", "auch", "noch", "schon", "nur", "aber", "wenn", "weil", "dass"} + + +def _find_matching_item(user_text: str, items: list[dict]) -> dict | None: + """Findet das Item mit der besten Wort-Ueberlappung zum User-Text.""" + words = {w.lower().strip(".,!?") for w in user_text.split() if len(w) > 2} + words -= _STOP_WORDS + if not words: + return None + + best, best_score = None, 0 + for c in items: + content_words = {w.lower().strip(".,!?") for w in c["content"].split() if len(w) > 2} + content_words -= _STOP_WORDS + if not content_words: + continue + overlap = len(words & content_words) + score = overlap / max(len(words), 1) + if score > best_score and score >= 0.3: + best, best_score = c, score + return best + + + + +async def cmd_memory(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + if not _authorized(update): + return + items = memory_client.get_active_memory() + if not items: + await update.message.reply_text("Kein Gedaechtnis vorhanden.") + return + TYPE_ICON = {"fact": "📌", "preference": "⭐", "relationship": "👤", "plan": "📅", "temporary": "🕒", "uncertain": "❓"} + CONF_ICON = {"high": "", "medium": " ⚠", "low": " ❔"} + groups = {} + for i in items: + mt = i.get("memory_type", "fact") + groups.setdefault(mt, []).append(i) + lines = [f"🧠 Gedaechtnis: {len(items)} Eintraege\n"] + for mtype in ("fact", "preference", "relationship", "plan", "temporary", "uncertain"): + group = groups.get(mtype, []) + if not group: + continue + icon = TYPE_ICON.get(mtype, "•") + lines.append(f"{icon} {mtype.upper()} ({len(group)}):") + for i in group: + conf = CONF_ICON.get(i.get("confidence", "high"), "") + src = i.get("source_type", "") + src_tag = f" [{src}]" if src and src != "manual" else "" + exp = i.get("expires_at") + exp_str = "" + if exp: + from datetime import datetime + exp_str = f" (bis {datetime.fromtimestamp(exp).strftime('%d.%m.%Y')})" + lines.append(f" • {i['content'][:90]}{conf}{exp_str}{src_tag}") + lines.append("") + text = "\n".join(lines) + await update.message.reply_text(text[:4000], reply_markup=KEYBOARD) + + + + +async def handle_voice(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + """Sprachnachricht: Whisper STT -> LLM -> TTS Antwort als Text + Sprache.""" + if not _authorized(update): + return + voice_msg = update.message.voice + if not voice_msg: + return + + await update.message.reply_text("🎙 Höre zu...") + try: + tg_file = await ctx.bot.get_file(voice_msg.file_id) + audio_data = await tg_file.download_as_bytearray() + + text = voice.transcribe(bytes(audio_data)) + if not text: + await update.message.reply_text("Konnte die Nachricht nicht verstehen.") + return + + log.info("Voice transkribiert: %s", text[:100]) + await update.message.reply_text(f"🗣 \"{text}\"\n\n🤔 Denke nach...") + + channel_key = str(update.effective_chat.id) + session_id = memory_client.get_or_create_session(channel_key, source="telegram") + + context.last_suggest_result = {"type": None} + context.set_source_type("telegram_voice") + handlers = context.get_tool_handlers(session_id=session_id) + llm_task = asyncio.create_task( + asyncio.to_thread(llm.ask_with_tools, text, handlers, session_id=session_id) + ) + ACTIVE_LLM_TASKS[update.effective_chat.id] = llm_task + + waited = 0 + while not llm_task.done(): + await asyncio.sleep(30) + waited += 30 + if not llm_task.done(): + await update.message.reply_text("⏳ Suche laeuft noch (" + str(waited) + "s)...") + + answer = await llm_task + + if session_id: + memory_client.log_message(session_id, "user", text) + memory_client.log_message(session_id, "assistant", answer) + + await update.message.reply_text(answer[:4000], reply_markup=KEYBOARD) + + audio_out = voice.synthesize(answer[:4000]) + if audio_out: + import io as _io + await update.message.reply_voice(voice=_io.BytesIO(audio_out)) + else: + log.warning("TTS fehlgeschlagen — nur Text gesendet") + except Exception as e: + log.exception("Fehler bei Voice-Nachricht") + await update.message.reply_text(f"Fehler: {e}") + + +async def handle_photo(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + """Foto-Nachricht: Bild analysieren via Vision-LLM.""" + if not _authorized(update): + return + photos = update.message.photo + if not photos: + return + + photo = photos[-1] + caption = update.message.caption or "" + + await update.message.reply_text("🔍 Analysiere Bild...") + try: + import base64 + tg_file = await ctx.bot.get_file(photo.file_id) + image_data = await tg_file.download_as_bytearray() + image_base64 = base64.b64encode(bytes(image_data)).decode("utf-8") + + channel_key = str(update.effective_chat.id) + session_id = memory_client.get_or_create_session(channel_key, source="telegram") + + context.last_suggest_result = {"type": None} + context.set_source_type("telegram_photo") + handlers = context.get_tool_handlers(session_id=session_id) + answer = await asyncio.to_thread(llm.ask_with_image, image_base64, caption, handlers, session_id=session_id) + + warning_text, warnings = _check_flight_plausibility(answer) + if warning_text: + answer += warning_text + _store_plausibility_corrections(warnings) + + if session_id: + user_msg = f"[Foto] {caption}" if caption else "[Foto gesendet]" + memory_client.log_message(session_id, "user", user_msg) + memory_client.log_message(session_id, "assistant", answer) + + await update.message.reply_text(answer[:4000], reply_markup=KEYBOARD) + except Exception as e: + log.exception("Fehler bei Foto-Analyse") + await update.message.reply_text(f"Fehler bei Bildanalyse: {e}") + + +def _check_flight_plausibility(text: str) -> tuple[str, list]: + """Prueft LLM-Antwort auf verdaechtige Layover-Zeiten zwischen Flugsegmenten. + + Parst Datum/Uhrzeit-Paare aus der strukturierten Antwort und flaggt + Segmentueberg aenge mit >20h berechneter Umsteigezeit bei <3h Uhrzeitdifferenz. + """ + import re + from datetime import datetime, timedelta + + MONTHS = {"JAN": 1, "FEB": 2, "MAR": 3, "APR": 4, "MAY": 5, "JUN": 6, + "JUL": 7, "AUG": 8, "SEP": 9, "OCT": 10, "NOV": 11, "DEC": 12} + + segments = [] + current_seg = {} + + for line in text.split("\n"): + line_clean = line.strip().replace("**", "").replace("*", "") + + date_match = re.search(r"Datum:\s*(\d{1,2})\s*(JAN|FEB|MAR|APR|MAY|JUN|JUL|AUG|SEP|OCT|NOV|DEC)", line_clean, re.IGNORECASE) + if date_match: + day = int(date_match.group(1)) + month = MONTHS.get(date_match.group(2).upper(), 0) + current_seg["date_day"] = day + current_seg["date_month"] = month + + dep_match = re.search(r"Abflug:\s*(\d{1,2}):(\d{2})", line_clean) + if dep_match: + current_seg["dep_h"] = int(dep_match.group(1)) + current_seg["dep_m"] = int(dep_match.group(2)) + + arr_match = re.search(r"Ankunft:\s*(\d{1,2}):(\d{2})", line_clean) + if arr_match: + current_seg["arr_h"] = int(arr_match.group(1)) + current_seg["arr_m"] = int(arr_match.group(2)) + next_day = "chster Tag" in line_clean or "+1" in line_clean + current_seg["arr_next_day"] = next_day + + if all(k in current_seg for k in ("date_day", "date_month", "dep_h", "arr_h")): + if current_seg not in segments: + segments.append(dict(current_seg)) + current_seg = {} + + if len(segments) < 2: + return "", [] + + warnings = [] + year = 2026 + + for i in range(len(segments) - 1): + s1 = segments[i] + s2 = segments[i + 1] + + try: + arr_day_offset = 1 if s1.get("arr_next_day") else 0 + arr_dt = datetime(year, s1["date_month"], s1["date_day"], s1.get("arr_h", 0), s1.get("arr_m", 0)) + timedelta(days=arr_day_offset) + dep_dt = datetime(year, s2["date_month"], s2["date_day"], s2.get("dep_h", 0), s2.get("dep_m", 0)) + + layover = dep_dt - arr_dt + layover_h = layover.total_seconds() / 3600 + + time_diff_minutes = abs(s1.get("arr_h", 0) * 60 + s1.get("arr_m", 0) - s2.get("dep_h", 0) * 60 - s2.get("dep_m", 0)) + + if layover_h > 20 and time_diff_minutes < 180: + likely_day = s1["date_day"] + (1 if s1.get("arr_next_day") else 0) + likely_month = s1["date_month"] + if likely_day > 28: + likely_day = s1["date_day"] + warnings.append({ + "text": ( + f"⚠️ Segment {i+1}→{i+2}: Berechnete Umsteigezeit = {layover_h:.0f}h. " + f"Die Uhrzeiten liegen nur {time_diff_minutes} Min auseinander. " + f"Moeglicherweise ist das Datum von Segment {i+2} falsch gelesen " + f"({s2['date_day']:02d}.{s2['date_month']:02d}. statt " + f"{likely_day:02d}.{likely_month:02d}.). Bitte auf dem Ticket pruefen." + ), + "wrong_date": f"{s2['date_day']:02d}.{s2['date_month']:02d}.", + "likely_date": f"{likely_day:02d}.{likely_month:02d}.", + "segment": i + 2, + }) + except (ValueError, OverflowError): + continue + + if warnings: + warning_text = "\n\n🔍 Plausibilitaetspruefung:\n" + "\n".join(w["text"] for w in warnings) + return warning_text, warnings + return "", [] + + +def _store_plausibility_corrections(warnings: list): + """Schreibt Korrektur-Hinweise ins Memory wenn Plausibilitaetsprobleme erkannt wurden.""" + if not warnings: + return + for w in warnings: + try: + correction = ( + f"ACHTUNG Datumskorrektur Flug-Segment {w['segment']}: " + f"OCR las {w['wrong_date']}, korrektes Datum vermutlich {w['likely_date']}. " + f"Plausibilitaetspruefung: Umsteigezeit waere sonst >20h bei nur wenigen Minuten Uhrzeitdifferenz." + ) + data = { + "scope": "user", + "kind": "fact", + "content": correction, + "memory_type": "fact", + "confidence": "high", + "source_type": "system_plausibility_check", + } + result = memory_client._post("/memory", data) + if result: + log.info("Plausibilitaetskorrektur ins Memory geschrieben: Segment %d, %s -> %s", + w["segment"], w["wrong_date"], w["likely_date"]) + else: + log.warning("Memory-API gab kein Ergebnis fuer Plausibilitaetskorrektur") + except Exception as e: + log.warning("Konnte Plausibilitaetskorrektur nicht speichern: %s", e) + + +def _extract_pdf_text(pdf_bytes: bytes) -> str: + """Extrahiert Text aus PDF via PyPDF2. Gibt leeren String zurueck wenn kein Text.""" + try: + import io as _io + from PyPDF2 import PdfReader + reader = PdfReader(_io.BytesIO(pdf_bytes)) + pages = [] + for i, page in enumerate(reader.pages[:10]): + text = page.extract_text() + if text and text.strip(): + pages.append(f"--- Seite {i+1} ---\n{text.strip()}") + return "\n\n".join(pages) + except Exception as e: + log.warning("PDF-Extraktion fehlgeschlagen: %s", e) + return "" + + +async def handle_document(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + """Dokument-Nachricht: Bilder und PDFs analysieren.""" + if not _authorized(update): + return + doc = update.message.document + if not doc: + return + + mime = doc.mime_type or "" + caption = update.message.caption or "" + channel_key = str(update.effective_chat.id) + session_id = memory_client.get_or_create_session(channel_key, source="telegram") + + if mime.startswith("image/"): + await update.message.reply_text("🔍 Analysiere Bild...") + try: + import base64 + tg_file = await ctx.bot.get_file(doc.file_id) + image_data = await tg_file.download_as_bytearray() + image_base64 = base64.b64encode(bytes(image_data)).decode("utf-8") + + context.last_suggest_result = {"type": None} + context.set_source_type("telegram_photo") + handlers = context.get_tool_handlers(session_id=session_id) + answer = await asyncio.to_thread(llm.ask_with_image, image_base64, caption, handlers, session_id=session_id) + + warning_text, warnings = _check_flight_plausibility(answer) + if warning_text: + answer += warning_text + _store_plausibility_corrections(warnings) + + if session_id: + user_msg = f"[Bild-Datei] {caption}" if caption else "[Bild-Datei gesendet]" + memory_client.log_message(session_id, "user", user_msg) + memory_client.log_message(session_id, "assistant", answer) + + await update.message.reply_text(answer[:4000], reply_markup=KEYBOARD) + except Exception as e: + log.exception("Fehler bei Bild-Dokument") + await update.message.reply_text(f"Fehler bei Bildanalyse: {e}") + + elif mime == "application/pdf": + await update.message.reply_text("📄 Lese PDF...") + try: + tg_file = await ctx.bot.get_file(doc.file_id) + pdf_data = await tg_file.download_as_bytearray() + pdf_text = _extract_pdf_text(bytes(pdf_data)) + + if not pdf_text: + await update.message.reply_text( + "PDF enthält keinen extrahierbaren Text (evtl. gescannt/Bild-PDF).\n" + "Tipp: Sende einen Screenshot des PDFs als Foto — dann kann ich es per Bilderkennung lesen." + ) + return + + question = caption if caption else "Analysiere dieses Dokument. Was sind die wichtigsten Informationen?" + full_prompt = f"{question}\n\n--- PDF-INHALT ---\n{pdf_text[:6000]}" + + context.last_suggest_result = {"type": None} + context.set_source_type("telegram_pdf") + handlers = context.get_tool_handlers(session_id=session_id) + answer = await asyncio.to_thread(llm.ask_with_tools, full_prompt, handlers, session_id=session_id) + + warning_text, warnings = _check_flight_plausibility(answer) + if warning_text: + answer += warning_text + _store_plausibility_corrections(warnings) + + if session_id: + user_msg = f"[PDF: {doc.file_name or 'dokument.pdf'}] {caption}" if caption else f"[PDF: {doc.file_name or 'dokument.pdf'}]" + memory_client.log_message(session_id, "user", user_msg) + memory_client.log_message(session_id, "assistant", answer) + + await update.message.reply_text(answer[:4000], reply_markup=KEYBOARD) + except Exception as e: + log.exception("Fehler bei PDF-Analyse") + await update.message.reply_text(f"Fehler bei PDF: {e}") + + else: + await update.message.reply_text( + f"Dateityp '{mime}' wird nicht unterstuetzt.\n" + "Unterstuetzt: Bilder (JPG/PNG) und PDFs." + ) + + +def _likely_deep_research_request(text: str) -> bool: + """Heuristik fuer lange Recherche-Anfragen.""" + t = (text or "").lower() + triggers = ( + "recherchiere", + "recherche", + "finde heraus", + "vergleich", + "analysiere", + "entwickelt", + "entwicklung", + ) + return any(token in t for token in triggers) + + +async def handle_message(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + """Button-Presses und Freitext-Fragen verarbeiten.""" + if not _authorized(update): + return + text = update.message.text + if not text: + return + + channel_key = str(update.effective_chat.id) + if text.strip().lower() in ("abbruch", "stop", "stopp", "cancel"): + if action_guard.has_pending(channel_key): + action_guard.clear_pending(channel_key) + await update.message.reply_text("🛑 Ausstehende Aktion abgebrochen.") + else: + chat_id = update.effective_chat.id + task = ACTIVE_LLM_TASKS.get(chat_id) + if task and not task.done(): + task.cancel() + await update.message.reply_text("🛑 Abgebrochen.") + else: + await update.message.reply_text("Kein laufender Suchlauf.") + return + + if action_guard.is_confirmation(text) and action_guard.has_pending(channel_key): + result, ok = action_guard.execute_pending(channel_key) + await update.message.reply_text(("✅ " if ok else "") + result[:4000]) + return + + cmd = BUTTON_MAP.get(text) + if cmd == "status": + return await cmd_status(update, ctx) + elif cmd == "errors": + return await cmd_errors(update, ctx) + elif cmd == "feeds": + return await cmd_feeds(update, ctx) + elif cmd == "report": + return await cmd_report(update, ctx) + elif cmd == "check": + return await cmd_check(update, ctx) + elif cmd == "silence": + return await cmd_silence(update, ctx) + + channel_key = str(update.effective_chat.id) + session_id = memory_client.get_or_create_session(channel_key, source="telegram") + + await update.message.reply_text("🤔 Denke nach...") + if _likely_deep_research_request(text): + await update.message.reply_text("🔎 Deep Research gestartet. Das dauert meist 2-5 Minuten.") + try: + context.last_suggest_result = {"type": None} + context.set_source_type("telegram_text") + handlers = action_guard.wrap_handlers( + context.get_tool_handlers(session_id=session_id), channel_key + ) + llm_task = asyncio.create_task( + asyncio.to_thread(llm.ask_with_tools, text, handlers, session_id=session_id) + ) + ACTIVE_LLM_TASKS[update.effective_chat.id] = llm_task + + waited = 0 + while not llm_task.done(): + await asyncio.sleep(30) + waited += 30 + if not llm_task.done(): + await update.message.reply_text("Suche laeuft noch (" + str(waited) + "s)...") + + answer = await llm_task + + if session_id: + memory_client.log_message(session_id, "user", text) + memory_client.log_message(session_id, "assistant", answer) + + suggest = context.last_suggest_result + log.info("suggest_result: type=%s", suggest.get("type")) + + await update.message.reply_text(answer[:4000], reply_markup=KEYBOARD) + except asyncio.CancelledError: + log.info("Freitext-Lauf abgebrochen") + return + except Exception as e: + log.exception("Fehler bei Freitext") + await update.message.reply_text(f"Fehler: {e}") + finally: + ACTIVE_LLM_TASKS.pop(update.effective_chat.id, None) + + +async def handle_callback(update: Update, ctx: ContextTypes.DEFAULT_TYPE): + """Inline-Button Callbacks (z.B. Save.TV Aufnahme).""" + query = update.callback_query + await query.answer() + data = query.data or "" + + if data.startswith("savetv_rec_"): + tid = data.replace("savetv_rec_", "") + try: + from tools import savetv + result = savetv.handle_savetv_record(telecast_id=int(tid)) + await query.edit_message_text( + query.message.text + f"\n\n✅ {result}" + ) + except Exception as e: + log.exception("Save.TV Aufnahme Fehler") + await query.edit_message_text( + query.message.text + f"\n\n❌ Fehler: {e}" + ) + + elif data.startswith("savetv_skip_"): + await query.edit_message_text( + query.message.text + "\n\n⏭ Übersprungen" + ) + + + +async def _send_daily_forecast(context): + """Taeglich 08:00 Uhr: Systemdaten sammeln, LLM analysiert, Ergebnis senden.""" + if not CHAT_ID: + return + bot = getattr(context, "bot", None) or context + try: + from tools.predict import handle_get_health_forecast + from llm import ask_with_tools + report = await asyncio.get_event_loop().run_in_executor(None, handle_get_health_forecast) + prompt = ( + "Morgendlicher System-Check. Analysiere diesen Report und gib eine kurze " + "Prognose ob sich Probleme anbahnen. Nur echte Auffaelligkeiten nennen, " + "klare Handlungsempfehlung wenn noetig. Bei allem OK: kurze Entwarnung." + "\n\n" + report + ) + analysis = await asyncio.get_event_loop().run_in_executor(None, ask_with_tools, prompt) + msg = "🔭 *Taegliche Systemvorhersage*\n\n" + analysis + await bot.send_message(chat_id=CHAT_ID, text=msg, parse_mode="Markdown") + log.info("Taegl. Systemvorhersage gesendet") + except Exception: + log.exception("Fehler beim Senden der Systemvorhersage") + + +async def _send_daily_filmtipps(context): + """Täglicher Cronjob: EPG scannen, Top-Filme auto-aufnehmen, Rest vorschlagen. + + context kann ein CallbackContext (JobQueue) oder eine Application (asyncio-Loop) sein. + """ + if not CHAT_ID: + return + bot = getattr(context, "bot", None) or context + try: + from tools import savetv + auto_recorded, suggestions = savetv.get_new_films() + + if not auto_recorded and not suggestions: + log.info("Filmtipp-Scan: keine neuen Filme") + return + + if auto_recorded: + lines = [f"✅ *{len(auto_recorded)} Filme automatisch aufgenommen:*\n"] + for f in auto_recorded: + title = f.get("STITLE", "?") + station = f.get("STVSTATIONNAME", "?") + start = f.get("DSTARTDATE", "?")[:16] + subcat = f.get("SSUBCATEGORYNAME", "") + try: + start_dt = datetime.strptime(f.get("DSTARTDATE", ""), "%Y-%m-%d %H:%M:%S") + delta = (start_dt.date() - datetime.now().date()).days + when = f" (in {delta}d)" if delta > 1 else " (morgen)" if delta == 1 else " (heute)" + except (ValueError, TypeError): + when = "" + lines.append(f"🎬 {title}{when}\n 📺 {station} | ⏰ {start} | 🎭 {subcat}") + await bot.send_message(chat_id=CHAT_ID, text="\n".join(lines), parse_mode="Markdown") + + for f in suggestions[:6]: + tid = int(f.get("ITELECASTID", 0)) + title = f.get("STITLE", "?") + station = f.get("STVSTATIONNAME", "?") + start = f.get("DSTARTDATE", "?")[:16] + subcat = f.get("SSUBCATEGORYNAME", "") + desc = (f.get("STHEMA") or f.get("SFULLSUBTITLE") or "")[:150] + + try: + start_dt = datetime.strptime(f.get("DSTARTDATE", ""), "%Y-%m-%d %H:%M:%S") + delta = (start_dt.date() - datetime.now().date()).days + when = f"in {delta}d" if delta > 1 else "morgen" if delta == 1 else "heute" + except (ValueError, TypeError): + when = "" + + text = f"🤔 *{title}*" + if when: + text += f" ({when})" + text += f"\n📺 {station} | ⏰ {start}\n🎭 {subcat}" + if desc: + desc_escaped = desc.replace("_", "\\_").replace("*", "\\*") + text += f"\n_{desc_escaped}_" + + keyboard = InlineKeyboardMarkup([ + [ + InlineKeyboardButton("🔴 Aufnehmen", callback_data=f"savetv_rec_{tid}"), + InlineKeyboardButton("⏭ Nein", callback_data=f"savetv_skip_{tid}"), + ] + ]) + await bot.send_message( + chat_id=CHAT_ID, + text=text, + reply_markup=keyboard, + parse_mode="Markdown", + ) + + log.info("Filmtipps: %d auto-aufgenommen, %d Vorschlaege", len(auto_recorded), len(suggestions)) + except Exception: + log.exception("Fehler beim Senden der Filmtipps") + + +from datetime import datetime, time as dtime + + +def main(): + token = _load_token_and_chat() + if not token: + log.error("TG_HAUSMEISTER_TOKEN fehlt in homelab.conf!") + sys.exit(1) + + _acquire_lock() + atexit.register(_release_lock) + signal.signal(signal.SIGTERM, lambda *_: sys.exit(0)) + + log.info("Starte Orbitalo Hausmeister-Bot...") + app = Application.builder().token(token).build() + + app.add_handler(CommandHandler("start", cmd_start)) + app.add_handler(CommandHandler("status", cmd_status)) + app.add_handler(CommandHandler("errors", cmd_errors)) + app.add_handler(CommandHandler("ct", cmd_ct)) + app.add_handler(CommandHandler("health", cmd_health)) + app.add_handler(CommandHandler("logs", cmd_logs)) + app.add_handler(CommandHandler("silence", cmd_silence)) + app.add_handler(CommandHandler("report", cmd_report)) + app.add_handler(CommandHandler("check", cmd_check)) + app.add_handler(CommandHandler("feeds", cmd_feeds)) + app.add_handler(CommandHandler("memory", cmd_memory)) + app.add_handler(CallbackQueryHandler(handle_callback)) + app.add_handler(MessageHandler(filters.VOICE, handle_voice)) + app.add_handler(MessageHandler(filters.PHOTO, handle_photo)) + app.add_handler(MessageHandler(filters.Document.ALL, handle_document)) + app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) + + if app.job_queue is not None: + app.job_queue.run_daily( + _send_daily_filmtipps, + time=dtime(hour=14, minute=0), + name="daily_filmtipps", + ) + log.info("Täglicher Filmtipp-Job registriert (14:00 Uhr)") + app.job_queue.run_daily( + _send_daily_forecast, + time=dtime(hour=8, minute=0), + name="daily_forecast", + ) + log.info("Täglicher Forecast-Job registriert (08:00 Uhr)") + else: + log.warning("JobQueue nicht verfügbar — Filmtipps werden per asyncio-Loop gesendet") + + async def _filmtipp_loop(application): + """Fallback: asyncio-basierter täglicher Filmtipp (wenn kein JobQueue).""" + while True: + now = datetime.now() + target = now.replace(hour=14, minute=0, second=0, microsecond=0) + if now >= target: + from datetime import timedelta + target += timedelta(days=1) + wait_secs = (target - now).total_seconds() + log.info("Filmtipp-Loop: nächster Run in %.0f Sek (%s)", wait_secs, target) + await asyncio.sleep(wait_secs) + try: + await _send_daily_filmtipps(application) + except Exception: + log.exception("Fehler im Filmtipp-Loop") + + async def _forecast_loop(application): + """Fallback: asyncio-basierter täglicher Forecast (08:00 Uhr).""" + while True: + now = datetime.now() + target = now.replace(hour=8, minute=0, second=0, microsecond=0) + if now >= target: + from datetime import timedelta + target += timedelta(days=1) + wait_secs = (target - now).total_seconds() + log.info("Forecast-Loop: nächster Run in %.0f Sek (%s)", wait_secs, target) + await asyncio.sleep(wait_secs) + try: + await _send_daily_forecast(application) + except Exception: + log.exception("Fehler im Forecast-Loop") + + async def _monitor_loop(application): + """Periodischer Monitoring-Check alle 10 Minuten.""" + await asyncio.sleep(60) + while True: + try: + await asyncio.to_thread(monitor.run_check_and_alert) + except Exception: + log.exception("Fehler im Monitor-Loop") + await asyncio.sleep(600) + + async def post_init(application): + await application.bot.set_my_commands(BOT_COMMANDS) + log.info("Kommandomenü registriert") + asyncio.create_task(_watchdog_loop()) + asyncio.create_task(_monitor_loop(application)) + log.info("Monitor-Loop aktiv (alle 10 Min)") + if application.job_queue is None: + asyncio.create_task(_filmtipp_loop(application)) + asyncio.create_task(_forecast_loop(application)) + _sd_notify("READY=1") + log.info("Systemd Watchdog aktiv (50s Intervall)") + + app.post_init = post_init + log.info("Bot läuft — polling gestartet") + app.run_polling(allowed_updates=Update.ALL_TYPES) + + +if __name__ == "__main__": + main()