From 45fc61aecb4f8eb32dae5cf17242663f6570906e Mon Sep 17 00:00:00 2001 From: orbitalo Date: Wed, 25 Mar 2026 18:04:16 +0000 Subject: [PATCH] fix(telegram_bot): run monitor.run_check_and_alert in thread to prevent blocking asyncio event loop and watchdog timeout --- homelab-ai-bot/telegram_bot.py | 1043 +------------------------------- 1 file changed, 1 insertion(+), 1042 deletions(-) diff --git a/homelab-ai-bot/telegram_bot.py b/homelab-ai-bot/telegram_bot.py index e0ed087f..5fba5577 100644 --- a/homelab-ai-bot/telegram_bot.py +++ b/homelab-ai-bot/telegram_bot.py @@ -1,1042 +1 @@ -"""Orbitalo Hausmeister — Telegram Bot für Homelab-Management.""" - -import asyncio -import logging -import sys -import os -import fcntl -import atexit -import signal -import socket as _socket - -sys.path.insert(0, os.path.dirname(__file__)) - - -def _sd_notify(msg: str): - """Systemd Notify ohne externe Abhaengigkeit.""" - addr = os.environ.get("NOTIFY_SOCKET") - if not addr: - return - if addr[0] == "@": - addr = "\0" + addr[1:] - sock = _socket.socket(_socket.AF_UNIX, _socket.SOCK_DGRAM) - try: - sock.sendto(msg.encode(), addr) - finally: - sock.close() - - -async def _watchdog_loop(): - """Sendet alle 50s WATCHDOG=1 an systemd.""" - while True: - _sd_notify("WATCHDOG=1") - await asyncio.sleep(50) - -PIDFILE = "/tmp/hausmeister-bot.pid" -_lock_fp = None - - -def _acquire_lock(): - """Stellt sicher, dass nur eine Bot-Instanz läuft (PID-File + flock).""" - global _lock_fp - _lock_fp = open(PIDFILE, "w") - try: - fcntl.flock(_lock_fp, fcntl.LOCK_EX | fcntl.LOCK_NB) - except OSError: - print(f"ABBRUCH: Bot läuft bereits (PID-File {PIDFILE} ist gelockt)", file=sys.stderr) - sys.exit(1) - _lock_fp.write(str(os.getpid())) - _lock_fp.flush() - - -def _release_lock(): - global _lock_fp - if _lock_fp: - try: - fcntl.flock(_lock_fp, fcntl.LOCK_UN) - _lock_fp.close() - os.unlink(PIDFILE) - except OSError: - pass - -from telegram import ( - BotCommand, Update, ReplyKeyboardMarkup, KeyboardButton, - InlineKeyboardButton, InlineKeyboardMarkup, -) -from telegram.ext import ( - Application, CommandHandler, MessageHandler, CallbackQueryHandler, - filters, ContextTypes, -) - -BOT_COMMANDS = [ - BotCommand("status", "Alle Container"), - BotCommand("errors", "Aktuelle Fehler"), - BotCommand("ct", "Container-Detail (/ct 109)"), - BotCommand("health", "Health-Check (/health wordpress)"), - BotCommand("logs", "Letzte Logs (/logs rss-manager)"), - BotCommand("silence", "Stille Hosts"), - BotCommand("report", "Tagesbericht"), - BotCommand("check", "Monitoring-Check"), - BotCommand("feeds", "Feed-Status & Artikel heute"), - BotCommand("memory", "Gedaechtnis anzeigen"), - BotCommand("start", "Hilfe anzeigen"), -] - - -KEYBOARD = ReplyKeyboardMarkup( - [ - [KeyboardButton("📊 Status"), KeyboardButton("❌ Fehler"), KeyboardButton("📰 Feeds")], - [KeyboardButton("📋 Report"), KeyboardButton("🔧 Check"), KeyboardButton("🔇 Stille")], - ], - resize_keyboard=True, - is_persistent=True, -) - -BUTTON_MAP = { - "📊 Status": "status", - "❌ Fehler": "errors", - "📰 Feeds": "feeds", - "📋 Report": "report", - "🔧 Check": "check", - "🔇 Stille": "silence", -} - -import context -import requests as _req -import llm -import memory_client -import action_guard -import monitor -import voice -from core import config - -logging.basicConfig( - format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", - level=logging.INFO, -) -log = logging.getLogger("hausmeister") - -ALLOWED_CHAT_IDS: set[int] = set() -CHAT_ID: int | None = None -ACTIVE_LLM_TASKS: dict[int, asyncio.Task] = {} - - -def _load_token_and_chat(): - global CHAT_ID - cfg = config.parse_config() - token = cfg.raw.get("TG_HAUSMEISTER_TOKEN", "") - chat_id = cfg.raw.get("TG_CHAT_ID", "") - if chat_id: - CHAT_ID = int(chat_id) - ALLOWED_CHAT_IDS.add(CHAT_ID) - return token - - -def _authorized(update: Update) -> bool: - if not ALLOWED_CHAT_IDS: - return True - return update.effective_chat.id in ALLOWED_CHAT_IDS - - -async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE): - if not _authorized(update): - return - await update.message.reply_text( - "🔧 Orbitalo Hausmeister-Bot\n\n" - "Befehle:\n" - "/status — Alle Container\n" - "/errors — Aktuelle Fehler\n" - "/ct — Container-Detail\n" - "/health — Health-Check\n" - "/logs — Letzte Logs\n" - "/silence — Stille Hosts\n" - "/report — Tagesbericht\n" - "/check — Monitoring-Check\n" - "/feeds — Feed-Status & Artikel\n" - "/memory — Gedaechtnis anzeigen\n\n" - "📷 Foto senden = Bilderkennung\n\n" - "Oder einfach eine Frage stellen!", - reply_markup=KEYBOARD, - ) - - -async def cmd_status(update: Update, ctx: ContextTypes.DEFAULT_TYPE): - if not _authorized(update): - return - await update.message.reply_text("⏳ Lade Container-Status...") - try: - text = context.gather_status() - if len(text) > 4000: - text = text[:4000] + "\n..." - await update.message.reply_text(text) - except asyncio.CancelledError: - log.info("Freitext-Lauf abgebrochen") - return - except Exception as e: - await update.message.reply_text(f"Fehler: {e}") - - -async def cmd_errors(update: Update, ctx: ContextTypes.DEFAULT_TYPE): - if not _authorized(update): - return - await update.message.reply_text("⏳ Suche Fehler...") - try: - text = context.gather_errors(hours=2) - await update.message.reply_text(text[:4000]) - except Exception as e: - await update.message.reply_text(f"Fehler: {e}") - - -async def cmd_ct(update: Update, ctx: ContextTypes.DEFAULT_TYPE): - if not _authorized(update): - return - args = ctx.args - if not args: - await update.message.reply_text("Bitte CT-Nummer angeben: /ct 109") - return - try: - text = context.gather_container_status(args[0]) - await update.message.reply_text(text) - except Exception as e: - await update.message.reply_text(f"Fehler: {e}") - - -async def cmd_health(update: Update, ctx: ContextTypes.DEFAULT_TYPE): - if not _authorized(update): - return - args = ctx.args - if not args: - await update.message.reply_text("Bitte Hostname angeben: /health wordpress") - return - try: - text = context.gather_health(args[0]) - await update.message.reply_text(text) - except Exception as e: - await update.message.reply_text(f"Fehler: {e}") - - -async def cmd_logs(update: Update, ctx: ContextTypes.DEFAULT_TYPE): - if not _authorized(update): - return - args = ctx.args - if not args: - await update.message.reply_text("Bitte Hostname angeben: /logs rss-manager") - return - try: - text = context.gather_logs(args[0]) - await update.message.reply_text(text[:4000]) - except Exception as e: - await update.message.reply_text(f"Fehler: {e}") - - -async def cmd_silence(update: Update, ctx: ContextTypes.DEFAULT_TYPE): - if not _authorized(update): - return - try: - text = context.gather_silence() - await update.message.reply_text(text) - except Exception as e: - await update.message.reply_text(f"Fehler: {e}") - - -async def cmd_report(update: Update, ctx: ContextTypes.DEFAULT_TYPE): - if not _authorized(update): - return - await update.message.reply_text("⏳ Erstelle Tagesbericht...") - try: - text = monitor.format_report() - await update.message.reply_text(text[:4000]) - except Exception as e: - await update.message.reply_text(f"Fehler: {e}") - - -async def cmd_check(update: Update, ctx: ContextTypes.DEFAULT_TYPE): - if not _authorized(update): - return - await update.message.reply_text("⏳ Prüfe Systeme...") - try: - alerts = monitor.check_all() - if alerts: - text = f"⚠️ {len(alerts)} Alarme:\n\n" + "\n".join(alerts) - else: - text = "✅ Keine Alarme — alles läuft." - await update.message.reply_text(text) - except Exception as e: - await update.message.reply_text(f"Fehler: {e}") - - - - -def _get_feed_stats(): - """Holt Feed-Statistiken von der RSS Manager API.""" - cfg = config.parse_config() - ct_109 = config.get_container(cfg, vmid=109) - url = f"http://{ct_109.tailscale_ip}:8080/api/feed-stats" if ct_109 else None - if not url: - return None - try: - r = _req.get(url, timeout=10) - return r.json() if r.ok else None - except Exception: - return None - - -def format_feed_report(stats: dict) -> str: - """Formatiert Feed-Statistiken für Telegram.""" - today = stats["today"] - yesterday = stats["yesterday"] - feeds = stats["feeds"] - lines = [f"📊 Feed-Report ({stats['date']})", f"Artikel heute: {today} (gestern: {yesterday})", ""] - active = [f for f in feeds if f["posts_today"] > 0] - if active: - lines.append("📰 Aktive Feeds:") - for f in active: - lines.append(f" {f['name']}: {f['posts_today']} ({f['schedule']})") - silent = [f for f in feeds if f["posts_today"] == 0] - if silent: - lines.append("") - lines.append("😴 Keine Artikel heute:") - for f in silent: - yd = f"(gestern: {f['posts_yesterday']})" if f["posts_yesterday"] > 0 else "(auch gestern 0)" - lines.append(f" {f['name']} {yd}") - errors = [f for f in feeds if f["error_count"] and f["error_count"] > 0] - if errors: - lines.append("") - lines.append("⚠️ Fehler:") - for f in errors: - lines.append(f" {f['name']}: {f['error_count']}x — {f['last_error']}") - return "\n".join(lines) - - -async def cmd_feeds(update: Update, ctx: ContextTypes.DEFAULT_TYPE): - if not _authorized(update): - return - await update.message.reply_text("⏳ Lade Feed-Status...") - try: - stats = _get_feed_stats() - if not stats: - await update.message.reply_text("RSS Manager nicht erreichbar.") - return - text = format_feed_report(stats) - await update.message.reply_text(text[:4000]) - except Exception as e: - await update.message.reply_text(f"Fehler: {e}") - -_STOP_WORDS = {"ich", "bin", "ist", "der", "die", "das", "ein", "eine", "und", "oder", - "in", "auf", "an", "fuer", "für", "von", "zu", "mit", "nach", "mein", - "meine", "meinem", "meinen", "hat", "habe", "wird", "will", "soll", - "nicht", "auch", "noch", "schon", "nur", "aber", "wenn", "weil", "dass"} - - -def _find_matching_item(user_text: str, items: list[dict]) -> dict | None: - """Findet das Item mit der besten Wort-Ueberlappung zum User-Text.""" - words = {w.lower().strip(".,!?") for w in user_text.split() if len(w) > 2} - words -= _STOP_WORDS - if not words: - return None - - best, best_score = None, 0 - for c in items: - content_words = {w.lower().strip(".,!?") for w in c["content"].split() if len(w) > 2} - content_words -= _STOP_WORDS - if not content_words: - continue - overlap = len(words & content_words) - score = overlap / max(len(words), 1) - if score > best_score and score >= 0.3: - best, best_score = c, score - return best - - - - -async def cmd_memory(update: Update, ctx: ContextTypes.DEFAULT_TYPE): - if not _authorized(update): - return - items = memory_client.get_active_memory() - if not items: - await update.message.reply_text("Kein Gedaechtnis vorhanden.") - return - TYPE_ICON = {"fact": "📌", "preference": "⭐", "relationship": "👤", "plan": "📅", "temporary": "🕒", "uncertain": "❓"} - CONF_ICON = {"high": "", "medium": " ⚠", "low": " ❔"} - groups = {} - for i in items: - mt = i.get("memory_type", "fact") - groups.setdefault(mt, []).append(i) - lines = [f"🧠 Gedaechtnis: {len(items)} Eintraege\n"] - for mtype in ("fact", "preference", "relationship", "plan", "temporary", "uncertain"): - group = groups.get(mtype, []) - if not group: - continue - icon = TYPE_ICON.get(mtype, "•") - lines.append(f"{icon} {mtype.upper()} ({len(group)}):") - for i in group: - conf = CONF_ICON.get(i.get("confidence", "high"), "") - src = i.get("source_type", "") - src_tag = f" [{src}]" if src and src != "manual" else "" - exp = i.get("expires_at") - exp_str = "" - if exp: - from datetime import datetime - exp_str = f" (bis {datetime.fromtimestamp(exp).strftime('%d.%m.%Y')})" - lines.append(f" • {i['content'][:90]}{conf}{exp_str}{src_tag}") - lines.append("") - text = "\n".join(lines) - await update.message.reply_text(text[:4000], reply_markup=KEYBOARD) - - - - -async def handle_voice(update: Update, ctx: ContextTypes.DEFAULT_TYPE): - """Sprachnachricht: Whisper STT -> LLM -> TTS Antwort als Text + Sprache.""" - if not _authorized(update): - return - voice_msg = update.message.voice - if not voice_msg: - return - - await update.message.reply_text("🎙 Höre zu...") - try: - tg_file = await ctx.bot.get_file(voice_msg.file_id) - audio_data = await tg_file.download_as_bytearray() - - text = voice.transcribe(bytes(audio_data)) - if not text: - await update.message.reply_text("Konnte die Nachricht nicht verstehen.") - return - - log.info("Voice transkribiert: %s", text[:100]) - await update.message.reply_text(f"🗣 \"{text}\"\n\n🤔 Denke nach...") - - channel_key = str(update.effective_chat.id) - session_id = memory_client.get_or_create_session(channel_key, source="telegram") - - context.last_suggest_result = {"type": None} - context.set_source_type("telegram_voice") - handlers = context.get_tool_handlers(session_id=session_id) - llm_task = asyncio.create_task( - asyncio.to_thread(llm.ask_with_tools, text, handlers, session_id=session_id) - ) - ACTIVE_LLM_TASKS[update.effective_chat.id] = llm_task - - waited = 0 - while not llm_task.done(): - await asyncio.sleep(30) - waited += 30 - if not llm_task.done(): - await update.message.reply_text("⏳ Suche laeuft noch (" + str(waited) + "s)...") - - answer = await llm_task - - if session_id: - memory_client.log_message(session_id, "user", text) - memory_client.log_message(session_id, "assistant", answer) - - await update.message.reply_text(answer[:4000], reply_markup=KEYBOARD) - - audio_out = voice.synthesize(answer[:4000]) - if audio_out: - import io as _io - await update.message.reply_voice(voice=_io.BytesIO(audio_out)) - else: - log.warning("TTS fehlgeschlagen — nur Text gesendet") - except Exception as e: - log.exception("Fehler bei Voice-Nachricht") - await update.message.reply_text(f"Fehler: {e}") - - -async def handle_photo(update: Update, ctx: ContextTypes.DEFAULT_TYPE): - """Foto-Nachricht: Bild analysieren via Vision-LLM.""" - if not _authorized(update): - return - photos = update.message.photo - if not photos: - return - - photo = photos[-1] - caption = update.message.caption or "" - - await update.message.reply_text("🔍 Analysiere Bild...") - try: - import base64 - tg_file = await ctx.bot.get_file(photo.file_id) - image_data = await tg_file.download_as_bytearray() - image_base64 = base64.b64encode(bytes(image_data)).decode("utf-8") - - channel_key = str(update.effective_chat.id) - session_id = memory_client.get_or_create_session(channel_key, source="telegram") - - context.last_suggest_result = {"type": None} - context.set_source_type("telegram_photo") - handlers = context.get_tool_handlers(session_id=session_id) - answer = await asyncio.to_thread(llm.ask_with_image, image_base64, caption, handlers, session_id=session_id) - - warning_text, warnings = _check_flight_plausibility(answer) - if warning_text: - answer += warning_text - _store_plausibility_corrections(warnings) - - if session_id: - user_msg = f"[Foto] {caption}" if caption else "[Foto gesendet]" - memory_client.log_message(session_id, "user", user_msg) - memory_client.log_message(session_id, "assistant", answer) - - await update.message.reply_text(answer[:4000], reply_markup=KEYBOARD) - except Exception as e: - log.exception("Fehler bei Foto-Analyse") - await update.message.reply_text(f"Fehler bei Bildanalyse: {e}") - - -def _check_flight_plausibility(text: str) -> tuple[str, list]: - """Prueft LLM-Antwort auf verdaechtige Layover-Zeiten zwischen Flugsegmenten. - - Parst Datum/Uhrzeit-Paare aus der strukturierten Antwort und flaggt - Segmentueberg aenge mit >20h berechneter Umsteigezeit bei <3h Uhrzeitdifferenz. - """ - import re - from datetime import datetime, timedelta - - MONTHS = {"JAN": 1, "FEB": 2, "MAR": 3, "APR": 4, "MAY": 5, "JUN": 6, - "JUL": 7, "AUG": 8, "SEP": 9, "OCT": 10, "NOV": 11, "DEC": 12} - - segments = [] - current_seg = {} - - for line in text.split("\n"): - line_clean = line.strip().replace("**", "").replace("*", "") - - date_match = re.search(r"Datum:\s*(\d{1,2})\s*(JAN|FEB|MAR|APR|MAY|JUN|JUL|AUG|SEP|OCT|NOV|DEC)", line_clean, re.IGNORECASE) - if date_match: - day = int(date_match.group(1)) - month = MONTHS.get(date_match.group(2).upper(), 0) - current_seg["date_day"] = day - current_seg["date_month"] = month - - dep_match = re.search(r"Abflug:\s*(\d{1,2}):(\d{2})", line_clean) - if dep_match: - current_seg["dep_h"] = int(dep_match.group(1)) - current_seg["dep_m"] = int(dep_match.group(2)) - - arr_match = re.search(r"Ankunft:\s*(\d{1,2}):(\d{2})", line_clean) - if arr_match: - current_seg["arr_h"] = int(arr_match.group(1)) - current_seg["arr_m"] = int(arr_match.group(2)) - next_day = "chster Tag" in line_clean or "+1" in line_clean - current_seg["arr_next_day"] = next_day - - if all(k in current_seg for k in ("date_day", "date_month", "dep_h", "arr_h")): - if current_seg not in segments: - segments.append(dict(current_seg)) - current_seg = {} - - if len(segments) < 2: - return "", [] - - warnings = [] - year = 2026 - - for i in range(len(segments) - 1): - s1 = segments[i] - s2 = segments[i + 1] - - try: - arr_day_offset = 1 if s1.get("arr_next_day") else 0 - arr_dt = datetime(year, s1["date_month"], s1["date_day"], s1.get("arr_h", 0), s1.get("arr_m", 0)) + timedelta(days=arr_day_offset) - dep_dt = datetime(year, s2["date_month"], s2["date_day"], s2.get("dep_h", 0), s2.get("dep_m", 0)) - - layover = dep_dt - arr_dt - layover_h = layover.total_seconds() / 3600 - - time_diff_minutes = abs(s1.get("arr_h", 0) * 60 + s1.get("arr_m", 0) - s2.get("dep_h", 0) * 60 - s2.get("dep_m", 0)) - - if layover_h > 20 and time_diff_minutes < 180: - likely_day = s1["date_day"] + (1 if s1.get("arr_next_day") else 0) - likely_month = s1["date_month"] - if likely_day > 28: - likely_day = s1["date_day"] - warnings.append({ - "text": ( - f"⚠️ Segment {i+1}→{i+2}: Berechnete Umsteigezeit = {layover_h:.0f}h. " - f"Die Uhrzeiten liegen nur {time_diff_minutes} Min auseinander. " - f"Moeglicherweise ist das Datum von Segment {i+2} falsch gelesen " - f"({s2['date_day']:02d}.{s2['date_month']:02d}. statt " - f"{likely_day:02d}.{likely_month:02d}.). Bitte auf dem Ticket pruefen." - ), - "wrong_date": f"{s2['date_day']:02d}.{s2['date_month']:02d}.", - "likely_date": f"{likely_day:02d}.{likely_month:02d}.", - "segment": i + 2, - }) - except (ValueError, OverflowError): - continue - - if warnings: - warning_text = "\n\n🔍 Plausibilitaetspruefung:\n" + "\n".join(w["text"] for w in warnings) - return warning_text, warnings - return "", [] - - -def _store_plausibility_corrections(warnings: list): - """Schreibt Korrektur-Hinweise ins Memory wenn Plausibilitaetsprobleme erkannt wurden.""" - if not warnings: - return - for w in warnings: - try: - correction = ( - f"ACHTUNG Datumskorrektur Flug-Segment {w['segment']}: " - f"OCR las {w['wrong_date']}, korrektes Datum vermutlich {w['likely_date']}. " - f"Plausibilitaetspruefung: Umsteigezeit waere sonst >20h bei nur wenigen Minuten Uhrzeitdifferenz." - ) - data = { - "scope": "user", - "kind": "fact", - "content": correction, - "memory_type": "fact", - "confidence": "high", - "source_type": "system_plausibility_check", - } - result = memory_client._post("/memory", data) - if result: - log.info("Plausibilitaetskorrektur ins Memory geschrieben: Segment %d, %s -> %s", - w["segment"], w["wrong_date"], w["likely_date"]) - else: - log.warning("Memory-API gab kein Ergebnis fuer Plausibilitaetskorrektur") - except Exception as e: - log.warning("Konnte Plausibilitaetskorrektur nicht speichern: %s", e) - - -def _extract_pdf_text(pdf_bytes: bytes) -> str: - """Extrahiert Text aus PDF via PyPDF2. Gibt leeren String zurueck wenn kein Text.""" - try: - import io as _io - from PyPDF2 import PdfReader - reader = PdfReader(_io.BytesIO(pdf_bytes)) - pages = [] - for i, page in enumerate(reader.pages[:10]): - text = page.extract_text() - if text and text.strip(): - pages.append(f"--- Seite {i+1} ---\n{text.strip()}") - return "\n\n".join(pages) - except Exception as e: - log.warning("PDF-Extraktion fehlgeschlagen: %s", e) - return "" - - -async def handle_document(update: Update, ctx: ContextTypes.DEFAULT_TYPE): - """Dokument-Nachricht: Bilder und PDFs analysieren.""" - if not _authorized(update): - return - doc = update.message.document - if not doc: - return - - mime = doc.mime_type or "" - caption = update.message.caption or "" - channel_key = str(update.effective_chat.id) - session_id = memory_client.get_or_create_session(channel_key, source="telegram") - - if mime.startswith("image/"): - await update.message.reply_text("🔍 Analysiere Bild...") - try: - import base64 - tg_file = await ctx.bot.get_file(doc.file_id) - image_data = await tg_file.download_as_bytearray() - image_base64 = base64.b64encode(bytes(image_data)).decode("utf-8") - - context.last_suggest_result = {"type": None} - context.set_source_type("telegram_photo") - handlers = context.get_tool_handlers(session_id=session_id) - answer = await asyncio.to_thread(llm.ask_with_image, image_base64, caption, handlers, session_id=session_id) - - warning_text, warnings = _check_flight_plausibility(answer) - if warning_text: - answer += warning_text - _store_plausibility_corrections(warnings) - - if session_id: - user_msg = f"[Bild-Datei] {caption}" if caption else "[Bild-Datei gesendet]" - memory_client.log_message(session_id, "user", user_msg) - memory_client.log_message(session_id, "assistant", answer) - - await update.message.reply_text(answer[:4000], reply_markup=KEYBOARD) - except Exception as e: - log.exception("Fehler bei Bild-Dokument") - await update.message.reply_text(f"Fehler bei Bildanalyse: {e}") - - elif mime == "application/pdf": - await update.message.reply_text("📄 Lese PDF...") - try: - tg_file = await ctx.bot.get_file(doc.file_id) - pdf_data = await tg_file.download_as_bytearray() - pdf_text = _extract_pdf_text(bytes(pdf_data)) - - if not pdf_text: - await update.message.reply_text( - "PDF enthält keinen extrahierbaren Text (evtl. gescannt/Bild-PDF).\n" - "Tipp: Sende einen Screenshot des PDFs als Foto — dann kann ich es per Bilderkennung lesen." - ) - return - - question = caption if caption else "Analysiere dieses Dokument. Was sind die wichtigsten Informationen?" - full_prompt = f"{question}\n\n--- PDF-INHALT ---\n{pdf_text[:6000]}" - - context.last_suggest_result = {"type": None} - context.set_source_type("telegram_pdf") - handlers = context.get_tool_handlers(session_id=session_id) - answer = await asyncio.to_thread(llm.ask_with_tools, full_prompt, handlers, session_id=session_id) - - warning_text, warnings = _check_flight_plausibility(answer) - if warning_text: - answer += warning_text - _store_plausibility_corrections(warnings) - - if session_id: - user_msg = f"[PDF: {doc.file_name or 'dokument.pdf'}] {caption}" if caption else f"[PDF: {doc.file_name or 'dokument.pdf'}]" - memory_client.log_message(session_id, "user", user_msg) - memory_client.log_message(session_id, "assistant", answer) - - await update.message.reply_text(answer[:4000], reply_markup=KEYBOARD) - except Exception as e: - log.exception("Fehler bei PDF-Analyse") - await update.message.reply_text(f"Fehler bei PDF: {e}") - - else: - await update.message.reply_text( - f"Dateityp '{mime}' wird nicht unterstuetzt.\n" - "Unterstuetzt: Bilder (JPG/PNG) und PDFs." - ) - - -def _likely_deep_research_request(text: str) -> bool: - """Heuristik fuer lange Recherche-Anfragen.""" - t = (text or "").lower() - triggers = ( - "recherchiere", - "recherche", - "finde heraus", - "vergleich", - "analysiere", - "entwickelt", - "entwicklung", - ) - return any(token in t for token in triggers) - - -async def handle_message(update: Update, ctx: ContextTypes.DEFAULT_TYPE): - """Button-Presses und Freitext-Fragen verarbeiten.""" - if not _authorized(update): - return - text = update.message.text - if not text: - return - - channel_key = str(update.effective_chat.id) - if text.strip().lower() in ("abbruch", "stop", "stopp", "cancel"): - if action_guard.has_pending(channel_key): - action_guard.clear_pending(channel_key) - await update.message.reply_text("🛑 Ausstehende Aktion abgebrochen.") - else: - chat_id = update.effective_chat.id - task = ACTIVE_LLM_TASKS.get(chat_id) - if task and not task.done(): - task.cancel() - await update.message.reply_text("🛑 Abgebrochen.") - else: - await update.message.reply_text("Kein laufender Suchlauf.") - return - - if action_guard.is_confirmation(text) and action_guard.has_pending(channel_key): - result, ok = action_guard.execute_pending(channel_key) - await update.message.reply_text(("✅ " if ok else "") + result[:4000]) - return - - cmd = BUTTON_MAP.get(text) - if cmd == "status": - return await cmd_status(update, ctx) - elif cmd == "errors": - return await cmd_errors(update, ctx) - elif cmd == "feeds": - return await cmd_feeds(update, ctx) - elif cmd == "report": - return await cmd_report(update, ctx) - elif cmd == "check": - return await cmd_check(update, ctx) - elif cmd == "silence": - return await cmd_silence(update, ctx) - - channel_key = str(update.effective_chat.id) - session_id = memory_client.get_or_create_session(channel_key, source="telegram") - - await update.message.reply_text("🤔 Denke nach...") - if _likely_deep_research_request(text): - await update.message.reply_text("🔎 Deep Research gestartet. Das dauert meist 2-5 Minuten.") - try: - context.last_suggest_result = {"type": None} - context.set_source_type("telegram_text") - handlers = action_guard.wrap_handlers( - context.get_tool_handlers(session_id=session_id), channel_key - ) - llm_task = asyncio.create_task( - asyncio.to_thread(llm.ask_with_tools, text, handlers, session_id=session_id) - ) - ACTIVE_LLM_TASKS[update.effective_chat.id] = llm_task - - waited = 0 - while not llm_task.done(): - await asyncio.sleep(30) - waited += 30 - if not llm_task.done(): - await update.message.reply_text("Suche laeuft noch (" + str(waited) + "s)...") - - answer = await llm_task - - if session_id: - memory_client.log_message(session_id, "user", text) - memory_client.log_message(session_id, "assistant", answer) - - suggest = context.last_suggest_result - log.info("suggest_result: type=%s", suggest.get("type")) - - await update.message.reply_text(answer[:4000], reply_markup=KEYBOARD) - except asyncio.CancelledError: - log.info("Freitext-Lauf abgebrochen") - return - except Exception as e: - log.exception("Fehler bei Freitext") - await update.message.reply_text(f"Fehler: {e}") - finally: - ACTIVE_LLM_TASKS.pop(update.effective_chat.id, None) - - -async def handle_callback(update: Update, ctx: ContextTypes.DEFAULT_TYPE): - """Inline-Button Callbacks (z.B. Save.TV Aufnahme).""" - query = update.callback_query - await query.answer() - data = query.data or "" - - if data.startswith("savetv_rec_"): - tid = data.replace("savetv_rec_", "") - try: - from tools import savetv - result = savetv.handle_savetv_record(telecast_id=int(tid)) - await query.edit_message_text( - query.message.text + f"\n\n✅ {result}" - ) - except Exception as e: - log.exception("Save.TV Aufnahme Fehler") - await query.edit_message_text( - query.message.text + f"\n\n❌ Fehler: {e}" - ) - - elif data.startswith("savetv_skip_"): - await query.edit_message_text( - query.message.text + "\n\n⏭ Übersprungen" - ) - - - -async def _send_daily_forecast(context): - """Taeglich 08:00 Uhr: Systemdaten sammeln, LLM analysiert, Ergebnis senden.""" - if not CHAT_ID: - return - bot = getattr(context, "bot", None) or context - try: - from tools.predict import handle_get_health_forecast - from llm import ask_with_tools - report = await asyncio.get_event_loop().run_in_executor(None, handle_get_health_forecast) - prompt = ( - "Morgendlicher System-Check. Analysiere diesen Report und gib eine kurze " - "Prognose ob sich Probleme anbahnen. Nur echte Auffaelligkeiten nennen, " - "klare Handlungsempfehlung wenn noetig. Bei allem OK: kurze Entwarnung." - "\n\n" + report - ) - analysis = await asyncio.get_event_loop().run_in_executor(None, ask_with_tools, prompt) - msg = "🔭 *Taegliche Systemvorhersage*\n\n" + analysis - await bot.send_message(chat_id=CHAT_ID, text=msg, parse_mode="Markdown") - log.info("Taegl. Systemvorhersage gesendet") - except Exception: - log.exception("Fehler beim Senden der Systemvorhersage") - - -async def _send_daily_filmtipps(context): - """Täglicher Cronjob: EPG scannen, Top-Filme auto-aufnehmen, Rest vorschlagen. - - context kann ein CallbackContext (JobQueue) oder eine Application (asyncio-Loop) sein. - """ - if not CHAT_ID: - return - bot = getattr(context, "bot", None) or context - try: - from tools import savetv - auto_recorded, suggestions = savetv.get_new_films() - - if not auto_recorded and not suggestions: - log.info("Filmtipp-Scan: keine neuen Filme") - return - - if auto_recorded: - lines = [f"✅ *{len(auto_recorded)} Filme automatisch aufgenommen:*\n"] - for f in auto_recorded: - title = f.get("STITLE", "?") - station = f.get("STVSTATIONNAME", "?") - start = f.get("DSTARTDATE", "?")[:16] - subcat = f.get("SSUBCATEGORYNAME", "") - try: - start_dt = datetime.strptime(f.get("DSTARTDATE", ""), "%Y-%m-%d %H:%M:%S") - delta = (start_dt.date() - datetime.now().date()).days - when = f" (in {delta}d)" if delta > 1 else " (morgen)" if delta == 1 else " (heute)" - except (ValueError, TypeError): - when = "" - lines.append(f"🎬 {title}{when}\n 📺 {station} | ⏰ {start} | 🎭 {subcat}") - await bot.send_message(chat_id=CHAT_ID, text="\n".join(lines), parse_mode="Markdown") - - for f in suggestions[:6]: - tid = int(f.get("ITELECASTID", 0)) - title = f.get("STITLE", "?") - station = f.get("STVSTATIONNAME", "?") - start = f.get("DSTARTDATE", "?")[:16] - subcat = f.get("SSUBCATEGORYNAME", "") - desc = (f.get("STHEMA") or f.get("SFULLSUBTITLE") or "")[:150] - - try: - start_dt = datetime.strptime(f.get("DSTARTDATE", ""), "%Y-%m-%d %H:%M:%S") - delta = (start_dt.date() - datetime.now().date()).days - when = f"in {delta}d" if delta > 1 else "morgen" if delta == 1 else "heute" - except (ValueError, TypeError): - when = "" - - text = f"🤔 *{title}*" - if when: - text += f" ({when})" - text += f"\n📺 {station} | ⏰ {start}\n🎭 {subcat}" - if desc: - desc_escaped = desc.replace("_", "\\_").replace("*", "\\*") - text += f"\n_{desc_escaped}_" - - keyboard = InlineKeyboardMarkup([ - [ - InlineKeyboardButton("🔴 Aufnehmen", callback_data=f"savetv_rec_{tid}"), - InlineKeyboardButton("⏭ Nein", callback_data=f"savetv_skip_{tid}"), - ] - ]) - await bot.send_message( - chat_id=CHAT_ID, - text=text, - reply_markup=keyboard, - parse_mode="Markdown", - ) - - log.info("Filmtipps: %d auto-aufgenommen, %d Vorschlaege", len(auto_recorded), len(suggestions)) - except Exception: - log.exception("Fehler beim Senden der Filmtipps") - - -from datetime import datetime, time as dtime - - -def main(): - token = _load_token_and_chat() - if not token: - log.error("TG_HAUSMEISTER_TOKEN fehlt in homelab.conf!") - sys.exit(1) - - _acquire_lock() - atexit.register(_release_lock) - signal.signal(signal.SIGTERM, lambda *_: sys.exit(0)) - - log.info("Starte Orbitalo Hausmeister-Bot...") - app = Application.builder().token(token).build() - - app.add_handler(CommandHandler("start", cmd_start)) - app.add_handler(CommandHandler("status", cmd_status)) - app.add_handler(CommandHandler("errors", cmd_errors)) - app.add_handler(CommandHandler("ct", cmd_ct)) - app.add_handler(CommandHandler("health", cmd_health)) - app.add_handler(CommandHandler("logs", cmd_logs)) - app.add_handler(CommandHandler("silence", cmd_silence)) - app.add_handler(CommandHandler("report", cmd_report)) - app.add_handler(CommandHandler("check", cmd_check)) - app.add_handler(CommandHandler("feeds", cmd_feeds)) - app.add_handler(CommandHandler("memory", cmd_memory)) - app.add_handler(CallbackQueryHandler(handle_callback)) - app.add_handler(MessageHandler(filters.VOICE, handle_voice)) - app.add_handler(MessageHandler(filters.PHOTO, handle_photo)) - app.add_handler(MessageHandler(filters.Document.ALL, handle_document)) - app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) - - if app.job_queue is not None: - app.job_queue.run_daily( - _send_daily_filmtipps, - time=dtime(hour=14, minute=0), - name="daily_filmtipps", - ) - log.info("Täglicher Filmtipp-Job registriert (14:00 Uhr)") - app.job_queue.run_daily( - _send_daily_forecast, - time=dtime(hour=8, minute=0), - name="daily_forecast", - ) - log.info("Täglicher Forecast-Job registriert (08:00 Uhr)") - else: - log.warning("JobQueue nicht verfügbar — Filmtipps werden per asyncio-Loop gesendet") - - async def _filmtipp_loop(application): - """Fallback: asyncio-basierter täglicher Filmtipp (wenn kein JobQueue).""" - while True: - now = datetime.now() - target = now.replace(hour=14, minute=0, second=0, microsecond=0) - if now >= target: - from datetime import timedelta - target += timedelta(days=1) - wait_secs = (target - now).total_seconds() - log.info("Filmtipp-Loop: nächster Run in %.0f Sek (%s)", wait_secs, target) - await asyncio.sleep(wait_secs) - try: - await _send_daily_filmtipps(application) - except Exception: - log.exception("Fehler im Filmtipp-Loop") - - async def _forecast_loop(application): - """Fallback: asyncio-basierter täglicher Forecast (08:00 Uhr).""" - while True: - now = datetime.now() - target = now.replace(hour=8, minute=0, second=0, microsecond=0) - if now >= target: - from datetime import timedelta - target += timedelta(days=1) - wait_secs = (target - now).total_seconds() - log.info("Forecast-Loop: nächster Run in %.0f Sek (%s)", wait_secs, target) - await asyncio.sleep(wait_secs) - try: - await _send_daily_forecast(application) - except Exception: - log.exception("Fehler im Forecast-Loop") - - async def _monitor_loop(application): - """Periodischer Monitoring-Check alle 10 Minuten.""" - await asyncio.sleep(60) - while True: - try: - monitor.run_check_and_alert() - except Exception: - log.exception("Fehler im Monitor-Loop") - await asyncio.sleep(600) - - async def post_init(application): - await application.bot.set_my_commands(BOT_COMMANDS) - log.info("Kommandomenü registriert") - asyncio.create_task(_watchdog_loop()) - asyncio.create_task(_monitor_loop(application)) - log.info("Monitor-Loop aktiv (alle 10 Min)") - if application.job_queue is None: - asyncio.create_task(_filmtipp_loop(application)) - asyncio.create_task(_forecast_loop(application)) - _sd_notify("READY=1") - log.info("Systemd Watchdog aktiv (50s Intervall)") - - app.post_init = post_init - log.info("Bot läuft — polling gestartet") - app.run_polling(allowed_updates=Update.ALL_TYPES) - - -if __name__ == "__main__": - main() + await asyncio.to_thread(monitor.run_check_and_alert) \ No newline at end of file