diff --git a/homelab-ai-bot/telegram_bot.py b/homelab-ai-bot/telegram_bot.py index 9cd1694e..5774d6a2 100644 --- a/homelab-ai-bot/telegram_bot.py +++ b/homelab-ai-bot/telegram_bot.py @@ -419,6 +419,130 @@ async def cmd_memory(update: Update, ctx: ContextTypes.DEFAULT_TYPE): await update.message.reply_text(text[:4000], reply_markup=build_reply_keyboard(str(update.effective_chat.id))) +async def _run_freitext_llm_pipeline( + update: Update, + text: str, + work_text: str, + session_id: str, + document_mode: bool, + channel_key: str, +) -> None: + """LLM im Hintergrund: Polling blockiert nicht, Bot bleibt fuer neue Updates erreichbar.""" + chat_id = update.effective_chat.id + try: + context.last_suggest_result = {"type": None} + context.set_source_type("telegram_text") + handlers = action_guard.wrap_handlers( + context.get_tool_handlers(session_id=session_id), channel_key + ) + llm_task = asyncio.create_task( + asyncio.to_thread( + llm.ask_with_tools, + work_text, + handlers, + session_id=session_id, + document_mode=document_mode, + ) + ) + waited = 0 + while not llm_task.done(): + await asyncio.sleep(10) + waited += 10 + if not llm_task.done(): + try: + await update.message.reply_text( + "⏳ Noch dran (" + str(waited) + "s) — Save.TV/Modell kann etwas brauchen…" + ) + except Exception as te: + log.warning("Fortschritts-Nachricht fehlgeschlagen: %s", te) + answer = await llm_task + + if session_id: + memory_client.log_message(session_id, "user", text) + memory_client.log_message(session_id, "assistant", answer) + + suggest = context.last_suggest_result + log.info("suggest_result: type=%s", suggest.get("type")) + + await _reply_text_chunked( + update, + answer, + reply_markup=build_reply_keyboard(str(update.effective_chat.id)), + ) + except asyncio.CancelledError: + log.info("Freitext-Lauf abgebrochen") + raise + except Exception as e: + log.exception("Fehler bei Freitext") + try: + await update.message.reply_text(f"Fehler: {e}") + except Exception: + pass + finally: + ACTIVE_LLM_TASKS.pop(chat_id, None) + + +async def _run_voice_llm_pipeline( + update: Update, + text: str, + work_text: str, + session_id: str, + document_mode: bool, +) -> None: + chat_id = update.effective_chat.id + try: + context.last_suggest_result = {"type": None} + context.set_source_type("telegram_voice") + handlers = context.get_tool_handlers(session_id=session_id) + llm_task = asyncio.create_task( + asyncio.to_thread( + llm.ask_with_tools, + work_text, + handlers, + session_id=session_id, + document_mode=document_mode, + ) + ) + waited = 0 + while not llm_task.done(): + await asyncio.sleep(10) + waited += 10 + if not llm_task.done(): + try: + await update.message.reply_text( + "⏳ Noch dran (" + str(waited) + "s) — Save.TV/Modell kann etwas brauchen…" + ) + except Exception as te: + log.warning("Fortschritts-Nachricht fehlgeschlagen: %s", te) + answer = await llm_task + + if session_id: + memory_client.log_message(session_id, "user", text) + memory_client.log_message(session_id, "assistant", answer) + + await _reply_text_chunked( + update, + answer, + reply_markup=build_reply_keyboard(str(update.effective_chat.id)), + ) + + audio_out = voice.synthesize(answer[:4000]) + if audio_out: + import io as _io + await update.message.reply_voice(voice=_io.BytesIO(audio_out)) + else: + log.warning("TTS fehlgeschlagen — nur Text gesendet") + except asyncio.CancelledError: + log.info("Voice-LLM abgebrochen") + raise + except Exception as e: + log.exception("Fehler bei Voice-LLM") + try: + await update.message.reply_text(f"Fehler: {e}") + except Exception: + pass + finally: + ACTIVE_LLM_TASKS.pop(chat_id, None) async def handle_voice(update: Update, ctx: ContextTypes.DEFAULT_TYPE): @@ -468,44 +592,12 @@ async def handle_voice(update: Update, ctx: ContextTypes.DEFAULT_TYPE): session_id = memory_client.get_or_create_session(channel_key, source="telegram") - context.last_suggest_result = {"type": None} - context.set_source_type("telegram_voice") - handlers = context.get_tool_handlers(session_id=session_id) - llm_task = asyncio.create_task( - asyncio.to_thread( - llm.ask_with_tools, - work_text, - handlers, - session_id=session_id, - document_mode=document_mode, + job = asyncio.create_task( + _run_voice_llm_pipeline( + update, text, work_text, session_id, document_mode ) ) - ACTIVE_LLM_TASKS[update.effective_chat.id] = llm_task - - waited = 0 - # Kurze Intervalle: bei Save.TV/LLM wirkt 30s ohne Nachricht wie „Haenger“ - while not llm_task.done(): - await asyncio.sleep(10) - waited += 10 - if not llm_task.done(): - await update.message.reply_text( - "⏳ Noch dran (" + str(waited) + "s) — Save.TV/Modell kann etwas brauchen…" - ) - - answer = await llm_task - - if session_id: - memory_client.log_message(session_id, "user", text) - memory_client.log_message(session_id, "assistant", answer) - - await _reply_text_chunked(update, answer, reply_markup=build_reply_keyboard(str(update.effective_chat.id))) - - audio_out = voice.synthesize(answer[:4000]) - if audio_out: - import io as _io - await update.message.reply_voice(voice=_io.BytesIO(audio_out)) - else: - log.warning("TTS fehlgeschlagen — nur Text gesendet") + ACTIVE_LLM_TASKS[update.effective_chat.id] = job except Exception as e: log.exception("Fehler bei Voice-Nachricht") await update.message.reply_text(f"Fehler: {e}") @@ -859,50 +951,17 @@ async def handle_message(update: Update, ctx: ContextTypes.DEFAULT_TYPE): await update.message.reply_text("🤔 Denke nach...") if _likely_deep_research_request(work_text): await update.message.reply_text("🔎 Deep Research gestartet. Das dauert meist 2-5 Minuten.") - try: - context.last_suggest_result = {"type": None} - context.set_source_type("telegram_text") - handlers = action_guard.wrap_handlers( - context.get_tool_handlers(session_id=session_id), channel_key + job = asyncio.create_task( + _run_freitext_llm_pipeline( + update, + text, + work_text, + session_id, + document_mode, + channel_key, ) - llm_task = asyncio.create_task( - asyncio.to_thread( - llm.ask_with_tools, - work_text, - handlers, - session_id=session_id, - document_mode=document_mode, - ) - ) - ACTIVE_LLM_TASKS[update.effective_chat.id] = llm_task - - waited = 0 - while not llm_task.done(): - await asyncio.sleep(10) - waited += 10 - if not llm_task.done(): - await update.message.reply_text( - "⏳ Noch dran (" + str(waited) + "s) — Save.TV/Modell kann etwas brauchen…" - ) - - answer = await llm_task - - if session_id: - memory_client.log_message(session_id, "user", text) - memory_client.log_message(session_id, "assistant", answer) - - suggest = context.last_suggest_result - log.info("suggest_result: type=%s", suggest.get("type")) - - await _reply_text_chunked(update, answer, reply_markup=build_reply_keyboard(str(update.effective_chat.id))) - except asyncio.CancelledError: - log.info("Freitext-Lauf abgebrochen") - return - except Exception as e: - log.exception("Fehler bei Freitext") - await update.message.reply_text(f"Fehler: {e}") - finally: - ACTIVE_LLM_TASKS.pop(update.effective_chat.id, None) + ) + ACTIVE_LLM_TASKS[update.effective_chat.id] = job async def handle_callback(update: Update, ctx: ContextTypes.DEFAULT_TYPE): @@ -1042,7 +1101,7 @@ def main(): signal.signal(signal.SIGTERM, lambda *_: sys.exit(0)) log.info("Starte Orbitalo Hausmeister-Bot...") - app = Application.builder().token(token).build() + app = Application.builder().token(token).concurrent_updates(True).build() app.add_handler(CommandHandler("start", cmd_start)) app.add_handler(CommandHandler("status", cmd_status))