hausmeister: LLM in Hintergrund-Task — Polling blockiert nicht mehr (toter Bot)
python-telegram-bot verarbeitet Updates nacheinander, solange der Handler nicht zurueckkehrt. Die Warteschleife auf ask_with_tools blockierte alle weiteren Telegram-Updates (kein abbruch, keine Befehle, 'tot'). - Freitext und Sprache: Pipeline in asyncio.create_task ausgelagert - concurrent_updates(True) zusaetzlich
This commit is contained in:
parent
32c6b97a02
commit
e08c820f15
1 changed files with 139 additions and 80 deletions
|
|
@ -419,6 +419,130 @@ async def cmd_memory(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|||
await update.message.reply_text(text[:4000], reply_markup=build_reply_keyboard(str(update.effective_chat.id)))
|
||||
|
||||
|
||||
async def _run_freitext_llm_pipeline(
|
||||
update: Update,
|
||||
text: str,
|
||||
work_text: str,
|
||||
session_id: str,
|
||||
document_mode: bool,
|
||||
channel_key: str,
|
||||
) -> None:
|
||||
"""LLM im Hintergrund: Polling blockiert nicht, Bot bleibt fuer neue Updates erreichbar."""
|
||||
chat_id = update.effective_chat.id
|
||||
try:
|
||||
context.last_suggest_result = {"type": None}
|
||||
context.set_source_type("telegram_text")
|
||||
handlers = action_guard.wrap_handlers(
|
||||
context.get_tool_handlers(session_id=session_id), channel_key
|
||||
)
|
||||
llm_task = asyncio.create_task(
|
||||
asyncio.to_thread(
|
||||
llm.ask_with_tools,
|
||||
work_text,
|
||||
handlers,
|
||||
session_id=session_id,
|
||||
document_mode=document_mode,
|
||||
)
|
||||
)
|
||||
waited = 0
|
||||
while not llm_task.done():
|
||||
await asyncio.sleep(10)
|
||||
waited += 10
|
||||
if not llm_task.done():
|
||||
try:
|
||||
await update.message.reply_text(
|
||||
"⏳ Noch dran (" + str(waited) + "s) — Save.TV/Modell kann etwas brauchen…"
|
||||
)
|
||||
except Exception as te:
|
||||
log.warning("Fortschritts-Nachricht fehlgeschlagen: %s", te)
|
||||
answer = await llm_task
|
||||
|
||||
if session_id:
|
||||
memory_client.log_message(session_id, "user", text)
|
||||
memory_client.log_message(session_id, "assistant", answer)
|
||||
|
||||
suggest = context.last_suggest_result
|
||||
log.info("suggest_result: type=%s", suggest.get("type"))
|
||||
|
||||
await _reply_text_chunked(
|
||||
update,
|
||||
answer,
|
||||
reply_markup=build_reply_keyboard(str(update.effective_chat.id)),
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
log.info("Freitext-Lauf abgebrochen")
|
||||
raise
|
||||
except Exception as e:
|
||||
log.exception("Fehler bei Freitext")
|
||||
try:
|
||||
await update.message.reply_text(f"Fehler: {e}")
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
ACTIVE_LLM_TASKS.pop(chat_id, None)
|
||||
|
||||
|
||||
async def _run_voice_llm_pipeline(
|
||||
update: Update,
|
||||
text: str,
|
||||
work_text: str,
|
||||
session_id: str,
|
||||
document_mode: bool,
|
||||
) -> None:
|
||||
chat_id = update.effective_chat.id
|
||||
try:
|
||||
context.last_suggest_result = {"type": None}
|
||||
context.set_source_type("telegram_voice")
|
||||
handlers = context.get_tool_handlers(session_id=session_id)
|
||||
llm_task = asyncio.create_task(
|
||||
asyncio.to_thread(
|
||||
llm.ask_with_tools,
|
||||
work_text,
|
||||
handlers,
|
||||
session_id=session_id,
|
||||
document_mode=document_mode,
|
||||
)
|
||||
)
|
||||
waited = 0
|
||||
while not llm_task.done():
|
||||
await asyncio.sleep(10)
|
||||
waited += 10
|
||||
if not llm_task.done():
|
||||
try:
|
||||
await update.message.reply_text(
|
||||
"⏳ Noch dran (" + str(waited) + "s) — Save.TV/Modell kann etwas brauchen…"
|
||||
)
|
||||
except Exception as te:
|
||||
log.warning("Fortschritts-Nachricht fehlgeschlagen: %s", te)
|
||||
answer = await llm_task
|
||||
|
||||
if session_id:
|
||||
memory_client.log_message(session_id, "user", text)
|
||||
memory_client.log_message(session_id, "assistant", answer)
|
||||
|
||||
await _reply_text_chunked(
|
||||
update,
|
||||
answer,
|
||||
reply_markup=build_reply_keyboard(str(update.effective_chat.id)),
|
||||
)
|
||||
|
||||
audio_out = voice.synthesize(answer[:4000])
|
||||
if audio_out:
|
||||
import io as _io
|
||||
await update.message.reply_voice(voice=_io.BytesIO(audio_out))
|
||||
else:
|
||||
log.warning("TTS fehlgeschlagen — nur Text gesendet")
|
||||
except asyncio.CancelledError:
|
||||
log.info("Voice-LLM abgebrochen")
|
||||
raise
|
||||
except Exception as e:
|
||||
log.exception("Fehler bei Voice-LLM")
|
||||
try:
|
||||
await update.message.reply_text(f"Fehler: {e}")
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
ACTIVE_LLM_TASKS.pop(chat_id, None)
|
||||
|
||||
|
||||
async def handle_voice(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||
|
|
@ -468,44 +592,12 @@ async def handle_voice(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|||
|
||||
session_id = memory_client.get_or_create_session(channel_key, source="telegram")
|
||||
|
||||
context.last_suggest_result = {"type": None}
|
||||
context.set_source_type("telegram_voice")
|
||||
handlers = context.get_tool_handlers(session_id=session_id)
|
||||
llm_task = asyncio.create_task(
|
||||
asyncio.to_thread(
|
||||
llm.ask_with_tools,
|
||||
work_text,
|
||||
handlers,
|
||||
session_id=session_id,
|
||||
document_mode=document_mode,
|
||||
job = asyncio.create_task(
|
||||
_run_voice_llm_pipeline(
|
||||
update, text, work_text, session_id, document_mode
|
||||
)
|
||||
)
|
||||
ACTIVE_LLM_TASKS[update.effective_chat.id] = llm_task
|
||||
|
||||
waited = 0
|
||||
# Kurze Intervalle: bei Save.TV/LLM wirkt 30s ohne Nachricht wie „Haenger“
|
||||
while not llm_task.done():
|
||||
await asyncio.sleep(10)
|
||||
waited += 10
|
||||
if not llm_task.done():
|
||||
await update.message.reply_text(
|
||||
"⏳ Noch dran (" + str(waited) + "s) — Save.TV/Modell kann etwas brauchen…"
|
||||
)
|
||||
|
||||
answer = await llm_task
|
||||
|
||||
if session_id:
|
||||
memory_client.log_message(session_id, "user", text)
|
||||
memory_client.log_message(session_id, "assistant", answer)
|
||||
|
||||
await _reply_text_chunked(update, answer, reply_markup=build_reply_keyboard(str(update.effective_chat.id)))
|
||||
|
||||
audio_out = voice.synthesize(answer[:4000])
|
||||
if audio_out:
|
||||
import io as _io
|
||||
await update.message.reply_voice(voice=_io.BytesIO(audio_out))
|
||||
else:
|
||||
log.warning("TTS fehlgeschlagen — nur Text gesendet")
|
||||
ACTIVE_LLM_TASKS[update.effective_chat.id] = job
|
||||
except Exception as e:
|
||||
log.exception("Fehler bei Voice-Nachricht")
|
||||
await update.message.reply_text(f"Fehler: {e}")
|
||||
|
|
@ -859,50 +951,17 @@ async def handle_message(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
|||
await update.message.reply_text("🤔 Denke nach...")
|
||||
if _likely_deep_research_request(work_text):
|
||||
await update.message.reply_text("🔎 Deep Research gestartet. Das dauert meist 2-5 Minuten.")
|
||||
try:
|
||||
context.last_suggest_result = {"type": None}
|
||||
context.set_source_type("telegram_text")
|
||||
handlers = action_guard.wrap_handlers(
|
||||
context.get_tool_handlers(session_id=session_id), channel_key
|
||||
job = asyncio.create_task(
|
||||
_run_freitext_llm_pipeline(
|
||||
update,
|
||||
text,
|
||||
work_text,
|
||||
session_id,
|
||||
document_mode,
|
||||
channel_key,
|
||||
)
|
||||
llm_task = asyncio.create_task(
|
||||
asyncio.to_thread(
|
||||
llm.ask_with_tools,
|
||||
work_text,
|
||||
handlers,
|
||||
session_id=session_id,
|
||||
document_mode=document_mode,
|
||||
)
|
||||
)
|
||||
ACTIVE_LLM_TASKS[update.effective_chat.id] = llm_task
|
||||
|
||||
waited = 0
|
||||
while not llm_task.done():
|
||||
await asyncio.sleep(10)
|
||||
waited += 10
|
||||
if not llm_task.done():
|
||||
await update.message.reply_text(
|
||||
"⏳ Noch dran (" + str(waited) + "s) — Save.TV/Modell kann etwas brauchen…"
|
||||
)
|
||||
|
||||
answer = await llm_task
|
||||
|
||||
if session_id:
|
||||
memory_client.log_message(session_id, "user", text)
|
||||
memory_client.log_message(session_id, "assistant", answer)
|
||||
|
||||
suggest = context.last_suggest_result
|
||||
log.info("suggest_result: type=%s", suggest.get("type"))
|
||||
|
||||
await _reply_text_chunked(update, answer, reply_markup=build_reply_keyboard(str(update.effective_chat.id)))
|
||||
except asyncio.CancelledError:
|
||||
log.info("Freitext-Lauf abgebrochen")
|
||||
return
|
||||
except Exception as e:
|
||||
log.exception("Fehler bei Freitext")
|
||||
await update.message.reply_text(f"Fehler: {e}")
|
||||
finally:
|
||||
ACTIVE_LLM_TASKS.pop(update.effective_chat.id, None)
|
||||
)
|
||||
ACTIVE_LLM_TASKS[update.effective_chat.id] = job
|
||||
|
||||
|
||||
async def handle_callback(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||||
|
|
@ -1042,7 +1101,7 @@ def main():
|
|||
signal.signal(signal.SIGTERM, lambda *_: sys.exit(0))
|
||||
|
||||
log.info("Starte Orbitalo Hausmeister-Bot...")
|
||||
app = Application.builder().token(token).build()
|
||||
app = Application.builder().token(token).concurrent_updates(True).build()
|
||||
|
||||
app.add_handler(CommandHandler("start", cmd_start))
|
||||
app.add_handler(CommandHandler("status", cmd_status))
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue