homelab-brain/homelab-ai-bot/telegram_bot.py
Homelab Cursor d2a6391f52 feat(llm): Ollama warmup bei Start - Modelle permanent im VRAM
- warmup_ollama() laedt qwen3:30b-a3b + nomic-embed-text mit keep_alive=-1
- Wird beim Bot-Start in post_init() aufgerufen (via asyncio.to_thread)
- keep_alive=-1 nur ueber native Ollama API (/api/generate) moeglich
- GPU haelt 22.6/24 GB permanent: Text + Embeddings ohne Swap
2026-03-25 20:59:30 +01:00

1043 lines
38 KiB
Python

"""Orbitalo Hausmeister — Telegram Bot für Homelab-Management."""
import asyncio
import logging
import sys
import os
import fcntl
import atexit
import signal
import socket as _socket
sys.path.insert(0, os.path.dirname(__file__))
def _sd_notify(msg: str):
"""Systemd Notify ohne externe Abhaengigkeit."""
addr = os.environ.get("NOTIFY_SOCKET")
if not addr:
return
if addr[0] == "@":
addr = "\0" + addr[1:]
sock = _socket.socket(_socket.AF_UNIX, _socket.SOCK_DGRAM)
try:
sock.sendto(msg.encode(), addr)
finally:
sock.close()
async def _watchdog_loop():
"""Sendet alle 50s WATCHDOG=1 an systemd."""
while True:
_sd_notify("WATCHDOG=1")
await asyncio.sleep(50)
PIDFILE = "/tmp/hausmeister-bot.pid"
_lock_fp = None
def _acquire_lock():
"""Stellt sicher, dass nur eine Bot-Instanz läuft (PID-File + flock)."""
global _lock_fp
_lock_fp = open(PIDFILE, "w")
try:
fcntl.flock(_lock_fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError:
print(f"ABBRUCH: Bot läuft bereits (PID-File {PIDFILE} ist gelockt)", file=sys.stderr)
sys.exit(1)
_lock_fp.write(str(os.getpid()))
_lock_fp.flush()
def _release_lock():
global _lock_fp
if _lock_fp:
try:
fcntl.flock(_lock_fp, fcntl.LOCK_UN)
_lock_fp.close()
os.unlink(PIDFILE)
except OSError:
pass
from telegram import (
BotCommand, Update, ReplyKeyboardMarkup, KeyboardButton,
InlineKeyboardButton, InlineKeyboardMarkup,
)
from telegram.ext import (
Application, CommandHandler, MessageHandler, CallbackQueryHandler,
filters, ContextTypes,
)
BOT_COMMANDS = [
BotCommand("status", "Alle Container"),
BotCommand("errors", "Aktuelle Fehler"),
BotCommand("ct", "Container-Detail (/ct 109)"),
BotCommand("health", "Health-Check (/health wordpress)"),
BotCommand("logs", "Letzte Logs (/logs rss-manager)"),
BotCommand("silence", "Stille Hosts"),
BotCommand("report", "Tagesbericht"),
BotCommand("check", "Monitoring-Check"),
BotCommand("feeds", "Feed-Status & Artikel heute"),
BotCommand("memory", "Gedaechtnis anzeigen"),
BotCommand("start", "Hilfe anzeigen"),
]
KEYBOARD = ReplyKeyboardMarkup(
[
[KeyboardButton("📊 Status"), KeyboardButton("❌ Fehler"), KeyboardButton("📰 Feeds")],
[KeyboardButton("📋 Report"), KeyboardButton("🔧 Check"), KeyboardButton("🔇 Stille")],
],
resize_keyboard=True,
is_persistent=True,
)
BUTTON_MAP = {
"📊 Status": "status",
"❌ Fehler": "errors",
"📰 Feeds": "feeds",
"📋 Report": "report",
"🔧 Check": "check",
"🔇 Stille": "silence",
}
import context
import requests as _req
import llm
import memory_client
import action_guard
import monitor
import voice
from core import config
logging.basicConfig(
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
level=logging.INFO,
)
log = logging.getLogger("hausmeister")
ALLOWED_CHAT_IDS: set[int] = set()
CHAT_ID: int | None = None
ACTIVE_LLM_TASKS: dict[int, asyncio.Task] = {}
def _load_token_and_chat():
global CHAT_ID
cfg = config.parse_config()
token = cfg.raw.get("TG_HAUSMEISTER_TOKEN", "")
chat_id = cfg.raw.get("TG_CHAT_ID", "")
if chat_id:
CHAT_ID = int(chat_id)
ALLOWED_CHAT_IDS.add(CHAT_ID)
return token
def _authorized(update: Update) -> bool:
if not ALLOWED_CHAT_IDS:
return True
return update.effective_chat.id in ALLOWED_CHAT_IDS
async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not _authorized(update):
return
await update.message.reply_text(
"🔧 Orbitalo Hausmeister-Bot\n\n"
"Befehle:\n"
"/status — Alle Container\n"
"/errors — Aktuelle Fehler\n"
"/ct <nr> — Container-Detail\n"
"/health <name> — Health-Check\n"
"/logs <name> — Letzte Logs\n"
"/silence — Stille Hosts\n"
"/report — Tagesbericht\n"
"/check — Monitoring-Check\n"
"/feeds — Feed-Status & Artikel\n"
"/memory — Gedaechtnis anzeigen\n\n"
"📷 Foto senden = Bilderkennung\n\n"
"Oder einfach eine Frage stellen!",
reply_markup=KEYBOARD,
)
async def cmd_status(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not _authorized(update):
return
await update.message.reply_text("⏳ Lade Container-Status...")
try:
text = context.gather_status()
if len(text) > 4000:
text = text[:4000] + "\n..."
await update.message.reply_text(text)
except asyncio.CancelledError:
log.info("Freitext-Lauf abgebrochen")
return
except Exception as e:
await update.message.reply_text(f"Fehler: {e}")
async def cmd_errors(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not _authorized(update):
return
await update.message.reply_text("⏳ Suche Fehler...")
try:
text = context.gather_errors(hours=2)
await update.message.reply_text(text[:4000])
except Exception as e:
await update.message.reply_text(f"Fehler: {e}")
async def cmd_ct(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not _authorized(update):
return
args = ctx.args
if not args:
await update.message.reply_text("Bitte CT-Nummer angeben: /ct 109")
return
try:
text = context.gather_container_status(args[0])
await update.message.reply_text(text)
except Exception as e:
await update.message.reply_text(f"Fehler: {e}")
async def cmd_health(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not _authorized(update):
return
args = ctx.args
if not args:
await update.message.reply_text("Bitte Hostname angeben: /health wordpress")
return
try:
text = context.gather_health(args[0])
await update.message.reply_text(text)
except Exception as e:
await update.message.reply_text(f"Fehler: {e}")
async def cmd_logs(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not _authorized(update):
return
args = ctx.args
if not args:
await update.message.reply_text("Bitte Hostname angeben: /logs rss-manager")
return
try:
text = context.gather_logs(args[0])
await update.message.reply_text(text[:4000])
except Exception as e:
await update.message.reply_text(f"Fehler: {e}")
async def cmd_silence(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not _authorized(update):
return
try:
text = context.gather_silence()
await update.message.reply_text(text)
except Exception as e:
await update.message.reply_text(f"Fehler: {e}")
async def cmd_report(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not _authorized(update):
return
await update.message.reply_text("⏳ Erstelle Tagesbericht...")
try:
text = monitor.format_report()
await update.message.reply_text(text[:4000])
except Exception as e:
await update.message.reply_text(f"Fehler: {e}")
async def cmd_check(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not _authorized(update):
return
await update.message.reply_text("⏳ Prüfe Systeme...")
try:
alerts = monitor.check_all()
if alerts:
text = f"⚠️ {len(alerts)} Alarme:\n\n" + "\n".join(alerts)
else:
text = "✅ Keine Alarme — alles läuft."
await update.message.reply_text(text)
except Exception as e:
await update.message.reply_text(f"Fehler: {e}")
def _get_feed_stats():
"""Holt Feed-Statistiken von der RSS Manager API."""
cfg = config.parse_config()
ct_109 = config.get_container(cfg, vmid=109)
url = f"http://{ct_109.tailscale_ip}:8080/api/feed-stats" if ct_109 else None
if not url:
return None
try:
r = _req.get(url, timeout=10)
return r.json() if r.ok else None
except Exception:
return None
def format_feed_report(stats: dict) -> str:
"""Formatiert Feed-Statistiken für Telegram."""
today = stats["today"]
yesterday = stats["yesterday"]
feeds = stats["feeds"]
lines = [f"📊 Feed-Report ({stats['date']})", f"Artikel heute: {today} (gestern: {yesterday})", ""]
active = [f for f in feeds if f["posts_today"] > 0]
if active:
lines.append("📰 Aktive Feeds:")
for f in active:
lines.append(f" {f['name']}: {f['posts_today']} ({f['schedule']})")
silent = [f for f in feeds if f["posts_today"] == 0]
if silent:
lines.append("")
lines.append("😴 Keine Artikel heute:")
for f in silent:
yd = f"(gestern: {f['posts_yesterday']})" if f["posts_yesterday"] > 0 else "(auch gestern 0)"
lines.append(f" {f['name']} {yd}")
errors = [f for f in feeds if f["error_count"] and f["error_count"] > 0]
if errors:
lines.append("")
lines.append("⚠️ Fehler:")
for f in errors:
lines.append(f" {f['name']}: {f['error_count']}x — {f['last_error']}")
return "\n".join(lines)
async def cmd_feeds(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not _authorized(update):
return
await update.message.reply_text("⏳ Lade Feed-Status...")
try:
stats = _get_feed_stats()
if not stats:
await update.message.reply_text("RSS Manager nicht erreichbar.")
return
text = format_feed_report(stats)
await update.message.reply_text(text[:4000])
except Exception as e:
await update.message.reply_text(f"Fehler: {e}")
_STOP_WORDS = {"ich", "bin", "ist", "der", "die", "das", "ein", "eine", "und", "oder",
"in", "auf", "an", "fuer", "für", "von", "zu", "mit", "nach", "mein",
"meine", "meinem", "meinen", "hat", "habe", "wird", "will", "soll",
"nicht", "auch", "noch", "schon", "nur", "aber", "wenn", "weil", "dass"}
def _find_matching_item(user_text: str, items: list[dict]) -> dict | None:
"""Findet das Item mit der besten Wort-Ueberlappung zum User-Text."""
words = {w.lower().strip(".,!?") for w in user_text.split() if len(w) > 2}
words -= _STOP_WORDS
if not words:
return None
best, best_score = None, 0
for c in items:
content_words = {w.lower().strip(".,!?") for w in c["content"].split() if len(w) > 2}
content_words -= _STOP_WORDS
if not content_words:
continue
overlap = len(words & content_words)
score = overlap / max(len(words), 1)
if score > best_score and score >= 0.3:
best, best_score = c, score
return best
async def cmd_memory(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not _authorized(update):
return
items = memory_client.get_active_memory()
if not items:
await update.message.reply_text("Kein Gedaechtnis vorhanden.")
return
TYPE_ICON = {"fact": "📌", "preference": "", "relationship": "👤", "plan": "📅", "temporary": "🕒", "uncertain": ""}
CONF_ICON = {"high": "", "medium": "", "low": ""}
groups = {}
for i in items:
mt = i.get("memory_type", "fact")
groups.setdefault(mt, []).append(i)
lines = [f"🧠 Gedaechtnis: {len(items)} Eintraege\n"]
for mtype in ("fact", "preference", "relationship", "plan", "temporary", "uncertain"):
group = groups.get(mtype, [])
if not group:
continue
icon = TYPE_ICON.get(mtype, "")
lines.append(f"{icon} {mtype.upper()} ({len(group)}):")
for i in group:
conf = CONF_ICON.get(i.get("confidence", "high"), "")
src = i.get("source_type", "")
src_tag = f" [{src}]" if src and src != "manual" else ""
exp = i.get("expires_at")
exp_str = ""
if exp:
from datetime import datetime
exp_str = f" (bis {datetime.fromtimestamp(exp).strftime('%d.%m.%Y')})"
lines.append(f"{i['content'][:90]}{conf}{exp_str}{src_tag}")
lines.append("")
text = "\n".join(lines)
await update.message.reply_text(text[:4000], reply_markup=KEYBOARD)
async def handle_voice(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
"""Sprachnachricht: Whisper STT -> LLM -> TTS Antwort als Text + Sprache."""
if not _authorized(update):
return
voice_msg = update.message.voice
if not voice_msg:
return
await update.message.reply_text("🎙 Höre zu...")
try:
tg_file = await ctx.bot.get_file(voice_msg.file_id)
audio_data = await tg_file.download_as_bytearray()
text = voice.transcribe(bytes(audio_data))
if not text:
await update.message.reply_text("Konnte die Nachricht nicht verstehen.")
return
log.info("Voice transkribiert: %s", text[:100])
await update.message.reply_text(f"🗣 \"{text}\"\n\n🤔 Denke nach...")
channel_key = str(update.effective_chat.id)
session_id = memory_client.get_or_create_session(channel_key, source="telegram")
context.last_suggest_result = {"type": None}
context.set_source_type("telegram_voice")
handlers = context.get_tool_handlers(session_id=session_id)
llm_task = asyncio.create_task(
asyncio.to_thread(llm.ask_with_tools, text, handlers, session_id=session_id)
)
ACTIVE_LLM_TASKS[update.effective_chat.id] = llm_task
waited = 0
while not llm_task.done():
await asyncio.sleep(30)
waited += 30
if not llm_task.done():
await update.message.reply_text("⏳ Suche laeuft noch (" + str(waited) + "s)...")
answer = await llm_task
if session_id:
memory_client.log_message(session_id, "user", text)
memory_client.log_message(session_id, "assistant", answer)
await update.message.reply_text(answer[:4000], reply_markup=KEYBOARD)
audio_out = voice.synthesize(answer[:4000])
if audio_out:
import io as _io
await update.message.reply_voice(voice=_io.BytesIO(audio_out))
else:
log.warning("TTS fehlgeschlagen — nur Text gesendet")
except Exception as e:
log.exception("Fehler bei Voice-Nachricht")
await update.message.reply_text(f"Fehler: {e}")
async def handle_photo(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
"""Foto-Nachricht: Bild analysieren via Vision-LLM."""
if not _authorized(update):
return
photos = update.message.photo
if not photos:
return
photo = photos[-1]
caption = update.message.caption or ""
await update.message.reply_text("🔍 Analysiere Bild...")
try:
import base64
tg_file = await ctx.bot.get_file(photo.file_id)
image_data = await tg_file.download_as_bytearray()
image_base64 = base64.b64encode(bytes(image_data)).decode("utf-8")
channel_key = str(update.effective_chat.id)
session_id = memory_client.get_or_create_session(channel_key, source="telegram")
context.last_suggest_result = {"type": None}
context.set_source_type("telegram_photo")
handlers = context.get_tool_handlers(session_id=session_id)
answer = await asyncio.to_thread(llm.ask_with_image, image_base64, caption, handlers, session_id=session_id)
warning_text, warnings = _check_flight_plausibility(answer)
if warning_text:
answer += warning_text
_store_plausibility_corrections(warnings)
if session_id:
user_msg = f"[Foto] {caption}" if caption else "[Foto gesendet]"
memory_client.log_message(session_id, "user", user_msg)
memory_client.log_message(session_id, "assistant", answer)
await update.message.reply_text(answer[:4000], reply_markup=KEYBOARD)
except Exception as e:
log.exception("Fehler bei Foto-Analyse")
await update.message.reply_text(f"Fehler bei Bildanalyse: {e}")
def _check_flight_plausibility(text: str) -> tuple[str, list]:
"""Prueft LLM-Antwort auf verdaechtige Layover-Zeiten zwischen Flugsegmenten.
Parst Datum/Uhrzeit-Paare aus der strukturierten Antwort und flaggt
Segmentueberg aenge mit >20h berechneter Umsteigezeit bei <3h Uhrzeitdifferenz.
"""
import re
from datetime import datetime, timedelta
MONTHS = {"JAN": 1, "FEB": 2, "MAR": 3, "APR": 4, "MAY": 5, "JUN": 6,
"JUL": 7, "AUG": 8, "SEP": 9, "OCT": 10, "NOV": 11, "DEC": 12}
segments = []
current_seg = {}
for line in text.split("\n"):
line_clean = line.strip().replace("**", "").replace("*", "")
date_match = re.search(r"Datum:\s*(\d{1,2})\s*(JAN|FEB|MAR|APR|MAY|JUN|JUL|AUG|SEP|OCT|NOV|DEC)", line_clean, re.IGNORECASE)
if date_match:
day = int(date_match.group(1))
month = MONTHS.get(date_match.group(2).upper(), 0)
current_seg["date_day"] = day
current_seg["date_month"] = month
dep_match = re.search(r"Abflug:\s*(\d{1,2}):(\d{2})", line_clean)
if dep_match:
current_seg["dep_h"] = int(dep_match.group(1))
current_seg["dep_m"] = int(dep_match.group(2))
arr_match = re.search(r"Ankunft:\s*(\d{1,2}):(\d{2})", line_clean)
if arr_match:
current_seg["arr_h"] = int(arr_match.group(1))
current_seg["arr_m"] = int(arr_match.group(2))
next_day = "chster Tag" in line_clean or "+1" in line_clean
current_seg["arr_next_day"] = next_day
if all(k in current_seg for k in ("date_day", "date_month", "dep_h", "arr_h")):
if current_seg not in segments:
segments.append(dict(current_seg))
current_seg = {}
if len(segments) < 2:
return "", []
warnings = []
year = 2026
for i in range(len(segments) - 1):
s1 = segments[i]
s2 = segments[i + 1]
try:
arr_day_offset = 1 if s1.get("arr_next_day") else 0
arr_dt = datetime(year, s1["date_month"], s1["date_day"], s1.get("arr_h", 0), s1.get("arr_m", 0)) + timedelta(days=arr_day_offset)
dep_dt = datetime(year, s2["date_month"], s2["date_day"], s2.get("dep_h", 0), s2.get("dep_m", 0))
layover = dep_dt - arr_dt
layover_h = layover.total_seconds() / 3600
time_diff_minutes = abs(s1.get("arr_h", 0) * 60 + s1.get("arr_m", 0) - s2.get("dep_h", 0) * 60 - s2.get("dep_m", 0))
if layover_h > 20 and time_diff_minutes < 180:
likely_day = s1["date_day"] + (1 if s1.get("arr_next_day") else 0)
likely_month = s1["date_month"]
if likely_day > 28:
likely_day = s1["date_day"]
warnings.append({
"text": (
f"⚠️ Segment {i+1}{i+2}: Berechnete Umsteigezeit = {layover_h:.0f}h. "
f"Die Uhrzeiten liegen nur {time_diff_minutes} Min auseinander. "
f"Moeglicherweise ist das Datum von Segment {i+2} falsch gelesen "
f"({s2['date_day']:02d}.{s2['date_month']:02d}. statt "
f"{likely_day:02d}.{likely_month:02d}.). Bitte auf dem Ticket pruefen."
),
"wrong_date": f"{s2['date_day']:02d}.{s2['date_month']:02d}.",
"likely_date": f"{likely_day:02d}.{likely_month:02d}.",
"segment": i + 2,
})
except (ValueError, OverflowError):
continue
if warnings:
warning_text = "\n\n🔍 Plausibilitaetspruefung:\n" + "\n".join(w["text"] for w in warnings)
return warning_text, warnings
return "", []
def _store_plausibility_corrections(warnings: list):
"""Schreibt Korrektur-Hinweise ins Memory wenn Plausibilitaetsprobleme erkannt wurden."""
if not warnings:
return
for w in warnings:
try:
correction = (
f"ACHTUNG Datumskorrektur Flug-Segment {w['segment']}: "
f"OCR las {w['wrong_date']}, korrektes Datum vermutlich {w['likely_date']}. "
f"Plausibilitaetspruefung: Umsteigezeit waere sonst >20h bei nur wenigen Minuten Uhrzeitdifferenz."
)
data = {
"scope": "user",
"kind": "fact",
"content": correction,
"memory_type": "fact",
"confidence": "high",
"source_type": "system_plausibility_check",
}
result = memory_client._post("/memory", data)
if result:
log.info("Plausibilitaetskorrektur ins Memory geschrieben: Segment %d, %s -> %s",
w["segment"], w["wrong_date"], w["likely_date"])
else:
log.warning("Memory-API gab kein Ergebnis fuer Plausibilitaetskorrektur")
except Exception as e:
log.warning("Konnte Plausibilitaetskorrektur nicht speichern: %s", e)
def _extract_pdf_text(pdf_bytes: bytes) -> str:
"""Extrahiert Text aus PDF via PyPDF2. Gibt leeren String zurueck wenn kein Text."""
try:
import io as _io
from PyPDF2 import PdfReader
reader = PdfReader(_io.BytesIO(pdf_bytes))
pages = []
for i, page in enumerate(reader.pages[:10]):
text = page.extract_text()
if text and text.strip():
pages.append(f"--- Seite {i+1} ---\n{text.strip()}")
return "\n\n".join(pages)
except Exception as e:
log.warning("PDF-Extraktion fehlgeschlagen: %s", e)
return ""
async def handle_document(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
"""Dokument-Nachricht: Bilder und PDFs analysieren."""
if not _authorized(update):
return
doc = update.message.document
if not doc:
return
mime = doc.mime_type or ""
caption = update.message.caption or ""
channel_key = str(update.effective_chat.id)
session_id = memory_client.get_or_create_session(channel_key, source="telegram")
if mime.startswith("image/"):
await update.message.reply_text("🔍 Analysiere Bild...")
try:
import base64
tg_file = await ctx.bot.get_file(doc.file_id)
image_data = await tg_file.download_as_bytearray()
image_base64 = base64.b64encode(bytes(image_data)).decode("utf-8")
context.last_suggest_result = {"type": None}
context.set_source_type("telegram_photo")
handlers = context.get_tool_handlers(session_id=session_id)
answer = await asyncio.to_thread(llm.ask_with_image, image_base64, caption, handlers, session_id=session_id)
warning_text, warnings = _check_flight_plausibility(answer)
if warning_text:
answer += warning_text
_store_plausibility_corrections(warnings)
if session_id:
user_msg = f"[Bild-Datei] {caption}" if caption else "[Bild-Datei gesendet]"
memory_client.log_message(session_id, "user", user_msg)
memory_client.log_message(session_id, "assistant", answer)
await update.message.reply_text(answer[:4000], reply_markup=KEYBOARD)
except Exception as e:
log.exception("Fehler bei Bild-Dokument")
await update.message.reply_text(f"Fehler bei Bildanalyse: {e}")
elif mime == "application/pdf":
await update.message.reply_text("📄 Lese PDF...")
try:
tg_file = await ctx.bot.get_file(doc.file_id)
pdf_data = await tg_file.download_as_bytearray()
pdf_text = _extract_pdf_text(bytes(pdf_data))
if not pdf_text:
await update.message.reply_text(
"PDF enthält keinen extrahierbaren Text (evtl. gescannt/Bild-PDF).\n"
"Tipp: Sende einen Screenshot des PDFs als Foto — dann kann ich es per Bilderkennung lesen."
)
return
question = caption if caption else "Analysiere dieses Dokument. Was sind die wichtigsten Informationen?"
full_prompt = f"{question}\n\n--- PDF-INHALT ---\n{pdf_text[:6000]}"
context.last_suggest_result = {"type": None}
context.set_source_type("telegram_pdf")
handlers = context.get_tool_handlers(session_id=session_id)
answer = await asyncio.to_thread(llm.ask_with_tools, full_prompt, handlers, session_id=session_id)
warning_text, warnings = _check_flight_plausibility(answer)
if warning_text:
answer += warning_text
_store_plausibility_corrections(warnings)
if session_id:
user_msg = f"[PDF: {doc.file_name or 'dokument.pdf'}] {caption}" if caption else f"[PDF: {doc.file_name or 'dokument.pdf'}]"
memory_client.log_message(session_id, "user", user_msg)
memory_client.log_message(session_id, "assistant", answer)
await update.message.reply_text(answer[:4000], reply_markup=KEYBOARD)
except Exception as e:
log.exception("Fehler bei PDF-Analyse")
await update.message.reply_text(f"Fehler bei PDF: {e}")
else:
await update.message.reply_text(
f"Dateityp '{mime}' wird nicht unterstuetzt.\n"
"Unterstuetzt: Bilder (JPG/PNG) und PDFs."
)
def _likely_deep_research_request(text: str) -> bool:
"""Heuristik fuer lange Recherche-Anfragen."""
t = (text or "").lower()
triggers = (
"recherchiere",
"recherche",
"finde heraus",
"vergleich",
"analysiere",
"entwickelt",
"entwicklung",
)
return any(token in t for token in triggers)
async def handle_message(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
"""Button-Presses und Freitext-Fragen verarbeiten."""
if not _authorized(update):
return
text = update.message.text
if not text:
return
channel_key = str(update.effective_chat.id)
if text.strip().lower() in ("abbruch", "stop", "stopp", "cancel"):
if action_guard.has_pending(channel_key):
action_guard.clear_pending(channel_key)
await update.message.reply_text("🛑 Ausstehende Aktion abgebrochen.")
else:
chat_id = update.effective_chat.id
task = ACTIVE_LLM_TASKS.get(chat_id)
if task and not task.done():
task.cancel()
await update.message.reply_text("🛑 Abgebrochen.")
else:
await update.message.reply_text("Kein laufender Suchlauf.")
return
if action_guard.is_confirmation(text) and action_guard.has_pending(channel_key):
result, ok = action_guard.execute_pending(channel_key)
await update.message.reply_text(("" if ok else "") + result[:4000])
return
cmd = BUTTON_MAP.get(text)
if cmd == "status":
return await cmd_status(update, ctx)
elif cmd == "errors":
return await cmd_errors(update, ctx)
elif cmd == "feeds":
return await cmd_feeds(update, ctx)
elif cmd == "report":
return await cmd_report(update, ctx)
elif cmd == "check":
return await cmd_check(update, ctx)
elif cmd == "silence":
return await cmd_silence(update, ctx)
channel_key = str(update.effective_chat.id)
session_id = memory_client.get_or_create_session(channel_key, source="telegram")
await update.message.reply_text("🤔 Denke nach...")
if _likely_deep_research_request(text):
await update.message.reply_text("🔎 Deep Research gestartet. Das dauert meist 2-5 Minuten.")
try:
context.last_suggest_result = {"type": None}
context.set_source_type("telegram_text")
handlers = action_guard.wrap_handlers(
context.get_tool_handlers(session_id=session_id), channel_key
)
llm_task = asyncio.create_task(
asyncio.to_thread(llm.ask_with_tools, text, handlers, session_id=session_id)
)
ACTIVE_LLM_TASKS[update.effective_chat.id] = llm_task
waited = 0
while not llm_task.done():
await asyncio.sleep(30)
waited += 30
if not llm_task.done():
await update.message.reply_text("Suche laeuft noch (" + str(waited) + "s)...")
answer = await llm_task
if session_id:
memory_client.log_message(session_id, "user", text)
memory_client.log_message(session_id, "assistant", answer)
suggest = context.last_suggest_result
log.info("suggest_result: type=%s", suggest.get("type"))
await update.message.reply_text(answer[:4000], reply_markup=KEYBOARD)
except asyncio.CancelledError:
log.info("Freitext-Lauf abgebrochen")
return
except Exception as e:
log.exception("Fehler bei Freitext")
await update.message.reply_text(f"Fehler: {e}")
finally:
ACTIVE_LLM_TASKS.pop(update.effective_chat.id, None)
async def handle_callback(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
"""Inline-Button Callbacks (z.B. Save.TV Aufnahme)."""
query = update.callback_query
await query.answer()
data = query.data or ""
if data.startswith("savetv_rec_"):
tid = data.replace("savetv_rec_", "")
try:
from tools import savetv
result = savetv.handle_savetv_record(telecast_id=int(tid))
await query.edit_message_text(
query.message.text + f"\n\n{result}"
)
except Exception as e:
log.exception("Save.TV Aufnahme Fehler")
await query.edit_message_text(
query.message.text + f"\n\n❌ Fehler: {e}"
)
elif data.startswith("savetv_skip_"):
await query.edit_message_text(
query.message.text + "\n\n⏭ Übersprungen"
)
async def _send_daily_forecast(context):
"""Taeglich 08:00 Uhr: Systemdaten sammeln, LLM analysiert, Ergebnis senden."""
if not CHAT_ID:
return
bot = getattr(context, "bot", None) or context
try:
from tools.predict import handle_get_health_forecast
from llm import ask_with_tools
report = await asyncio.get_event_loop().run_in_executor(None, handle_get_health_forecast)
prompt = (
"Morgendlicher System-Check. Analysiere diesen Report und gib eine kurze "
"Prognose ob sich Probleme anbahnen. Nur echte Auffaelligkeiten nennen, "
"klare Handlungsempfehlung wenn noetig. Bei allem OK: kurze Entwarnung."
"\n\n" + report
)
analysis = await asyncio.get_event_loop().run_in_executor(None, ask_with_tools, prompt)
msg = "🔭 *Taegliche Systemvorhersage*\n\n" + analysis
await bot.send_message(chat_id=CHAT_ID, text=msg, parse_mode="Markdown")
log.info("Taegl. Systemvorhersage gesendet")
except Exception:
log.exception("Fehler beim Senden der Systemvorhersage")
async def _send_daily_filmtipps(context):
"""Täglicher Cronjob: EPG scannen, Top-Filme auto-aufnehmen, Rest vorschlagen.
context kann ein CallbackContext (JobQueue) oder eine Application (asyncio-Loop) sein.
"""
if not CHAT_ID:
return
bot = getattr(context, "bot", None) or context
try:
from tools import savetv
auto_recorded, suggestions = savetv.get_new_films()
if not auto_recorded and not suggestions:
log.info("Filmtipp-Scan: keine neuen Filme")
return
if auto_recorded:
lines = [f"✅ *{len(auto_recorded)} Filme automatisch aufgenommen:*\n"]
for f in auto_recorded:
title = f.get("STITLE", "?")
station = f.get("STVSTATIONNAME", "?")
start = f.get("DSTARTDATE", "?")[:16]
subcat = f.get("SSUBCATEGORYNAME", "")
try:
start_dt = datetime.strptime(f.get("DSTARTDATE", ""), "%Y-%m-%d %H:%M:%S")
delta = (start_dt.date() - datetime.now().date()).days
when = f" (in {delta}d)" if delta > 1 else " (morgen)" if delta == 1 else " (heute)"
except (ValueError, TypeError):
when = ""
lines.append(f"🎬 {title}{when}\n 📺 {station} | ⏰ {start} | 🎭 {subcat}")
await bot.send_message(chat_id=CHAT_ID, text="\n".join(lines), parse_mode="Markdown")
for f in suggestions[:6]:
tid = int(f.get("ITELECASTID", 0))
title = f.get("STITLE", "?")
station = f.get("STVSTATIONNAME", "?")
start = f.get("DSTARTDATE", "?")[:16]
subcat = f.get("SSUBCATEGORYNAME", "")
desc = (f.get("STHEMA") or f.get("SFULLSUBTITLE") or "")[:150]
try:
start_dt = datetime.strptime(f.get("DSTARTDATE", ""), "%Y-%m-%d %H:%M:%S")
delta = (start_dt.date() - datetime.now().date()).days
when = f"in {delta}d" if delta > 1 else "morgen" if delta == 1 else "heute"
except (ValueError, TypeError):
when = ""
text = f"🤔 *{title}*"
if when:
text += f" ({when})"
text += f"\n📺 {station} | ⏰ {start}\n🎭 {subcat}"
if desc:
desc_escaped = desc.replace("_", "\\_").replace("*", "\\*")
text += f"\n_{desc_escaped}_"
keyboard = InlineKeyboardMarkup([
[
InlineKeyboardButton("🔴 Aufnehmen", callback_data=f"savetv_rec_{tid}"),
InlineKeyboardButton("⏭ Nein", callback_data=f"savetv_skip_{tid}"),
]
])
await bot.send_message(
chat_id=CHAT_ID,
text=text,
reply_markup=keyboard,
parse_mode="Markdown",
)
log.info("Filmtipps: %d auto-aufgenommen, %d Vorschlaege", len(auto_recorded), len(suggestions))
except Exception:
log.exception("Fehler beim Senden der Filmtipps")
from datetime import datetime, time as dtime
def main():
token = _load_token_and_chat()
if not token:
log.error("TG_HAUSMEISTER_TOKEN fehlt in homelab.conf!")
sys.exit(1)
_acquire_lock()
atexit.register(_release_lock)
signal.signal(signal.SIGTERM, lambda *_: sys.exit(0))
log.info("Starte Orbitalo Hausmeister-Bot...")
app = Application.builder().token(token).build()
app.add_handler(CommandHandler("start", cmd_start))
app.add_handler(CommandHandler("status", cmd_status))
app.add_handler(CommandHandler("errors", cmd_errors))
app.add_handler(CommandHandler("ct", cmd_ct))
app.add_handler(CommandHandler("health", cmd_health))
app.add_handler(CommandHandler("logs", cmd_logs))
app.add_handler(CommandHandler("silence", cmd_silence))
app.add_handler(CommandHandler("report", cmd_report))
app.add_handler(CommandHandler("check", cmd_check))
app.add_handler(CommandHandler("feeds", cmd_feeds))
app.add_handler(CommandHandler("memory", cmd_memory))
app.add_handler(CallbackQueryHandler(handle_callback))
app.add_handler(MessageHandler(filters.VOICE, handle_voice))
app.add_handler(MessageHandler(filters.PHOTO, handle_photo))
app.add_handler(MessageHandler(filters.Document.ALL, handle_document))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
if app.job_queue is not None:
app.job_queue.run_daily(
_send_daily_filmtipps,
time=dtime(hour=14, minute=0),
name="daily_filmtipps",
)
log.info("Täglicher Filmtipp-Job registriert (14:00 Uhr)")
app.job_queue.run_daily(
_send_daily_forecast,
time=dtime(hour=8, minute=0),
name="daily_forecast",
)
log.info("Täglicher Forecast-Job registriert (08:00 Uhr)")
else:
log.warning("JobQueue nicht verfügbar — Filmtipps werden per asyncio-Loop gesendet")
async def _filmtipp_loop(application):
"""Fallback: asyncio-basierter täglicher Filmtipp (wenn kein JobQueue)."""
while True:
now = datetime.now()
target = now.replace(hour=14, minute=0, second=0, microsecond=0)
if now >= target:
from datetime import timedelta
target += timedelta(days=1)
wait_secs = (target - now).total_seconds()
log.info("Filmtipp-Loop: nächster Run in %.0f Sek (%s)", wait_secs, target)
await asyncio.sleep(wait_secs)
try:
await _send_daily_filmtipps(application)
except Exception:
log.exception("Fehler im Filmtipp-Loop")
async def _forecast_loop(application):
"""Fallback: asyncio-basierter täglicher Forecast (08:00 Uhr)."""
while True:
now = datetime.now()
target = now.replace(hour=8, minute=0, second=0, microsecond=0)
if now >= target:
from datetime import timedelta
target += timedelta(days=1)
wait_secs = (target - now).total_seconds()
log.info("Forecast-Loop: nächster Run in %.0f Sek (%s)", wait_secs, target)
await asyncio.sleep(wait_secs)
try:
await _send_daily_forecast(application)
except Exception:
log.exception("Fehler im Forecast-Loop")
async def _monitor_loop(application):
"""Periodischer Monitoring-Check alle 10 Minuten."""
await asyncio.sleep(60)
while True:
try:
await asyncio.to_thread(monitor.run_check_and_alert)
except Exception:
log.exception("Fehler im Monitor-Loop")
await asyncio.sleep(600)
async def post_init(application):
await application.bot.set_my_commands(BOT_COMMANDS)
log.info("Kommandomenü registriert")
asyncio.create_task(_watchdog_loop())
asyncio.create_task(asyncio.to_thread(llm.warmup_ollama))
asyncio.create_task(_monitor_loop(application))
log.info("Monitor-Loop aktiv (alle 10 Min)")
if application.job_queue is None:
asyncio.create_task(_filmtipp_loop(application))
asyncio.create_task(_forecast_loop(application))
_sd_notify("READY=1")
log.info("Systemd Watchdog aktiv (50s Intervall)")
app.post_init = post_init
log.info("Bot läuft — polling gestartet")
app.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()