"""Deep Research Tool — Open Deep Research (CT 121) via LangGraph API.""" import logging import re import time import requests log = logging.getLogger("deep_research") DEEP_RESEARCH_URL = "http://10.10.10.121:2024" ASSISTANT_ID = "e9a5370f-7a53-55a8-ada8-6ab9ef15bb5b" RESEARCH_MODEL = "openai/gpt-4o-mini" POLL_INTERVAL = 10 MAX_WAIT = 600 SYSTEM_PROMPT_EXTRA = """DEEP RESEARCH: Du hast Zugriff auf deep_research — eine KI-gestuetzte Tiefenrecherche die 20-30 Quellen durchsucht. Nutze es NUR wenn der User explizit "deep research" oder "tiefenrecherche" sagt. Fuer alles andere: web_search. NICHT fuer einfache Fakten oder Homelab-Fragen. WICHTIG: deep_research dauert 2-5 Minuten. Das ist normal. Warte auf das Ergebnis. Das Ergebnis ist ein ausfuehrlicher Report. Fasse ihn fuer Telegram zusammen (max ~3000 Zeichen). QUALITAET BEI PREISFRAGEN: - Liefere konkrete Zahlen statt allgemeiner Markttexte. - Zeige Zeitraum, Preis damals/heute, Delta in % und Quellen. - Wenn keine belastbaren Daten vorhanden sind, sage es explizit.""" TOOLS = [] # removed from auto-discovery; use HANDLERS directly def _create_thread(): r = requests.post(f"{DEEP_RESEARCH_URL}/threads", json={}, timeout=10) r.raise_for_status() return r.json()["thread_id"] def _start_run(thread_id, query): payload = { "assistant_id": ASSISTANT_ID, "input": {"messages": [{"role": "user", "content": query}]}, "config": { "configurable": { "summarization_model": f"openai:{RESEARCH_MODEL}", "research_model": f"openai:{RESEARCH_MODEL}", "compression_model": f"openai:{RESEARCH_MODEL}", "final_report_model": f"openai:{RESEARCH_MODEL}", "allow_clarification": False, } }, } r = requests.post( f"{DEEP_RESEARCH_URL}/threads/{thread_id}/runs", json=payload, timeout=30 ) r.raise_for_status() return r.json()["run_id"] def _poll_run(thread_id, run_id): elapsed = 0 while elapsed < MAX_WAIT: time.sleep(POLL_INTERVAL) elapsed += POLL_INTERVAL try: r = requests.get( f"{DEEP_RESEARCH_URL}/threads/{thread_id}/runs/{run_id}", timeout=10 ) r.raise_for_status() data = r.json() status = data.get("status", "unknown") log.info("Poll %ds: status=%s", elapsed, status) if status == "success": return True, None if status in ("error", "failed"): err = data.get("error", "Unbekannter Fehler") log.error("Run failed: %s", err) return False, err if status == "interrupted": return False, "Research wurde unterbrochen" except Exception as e: log.warning("Poll error at %ds: %s", elapsed, e) return False, f"Timeout nach {MAX_WAIT}s" def _get_result(thread_id): r = requests.get(f"{DEEP_RESEARCH_URL}/threads/{thread_id}/state", timeout=30) r.raise_for_status() state = r.json() messages = state.get("values", {}).get("messages", []) log.info("Messages in result: %d", len(messages)) for i, msg in enumerate(messages): content = msg.get("content", "") clen = len(content) if isinstance(content, str) else 0 log.info(" msg[%d] type=%s len=%d", i, msg.get("type", "?"), clen) for msg in reversed(messages): content = msg.get("content", "") if isinstance(content, str) and len(content) > 100: return content return "Kein Report generiert." def _is_price_query(query: str) -> bool: q = (query or "").lower() needles = [ "preis", "preise", "kosten", "teuer", "guenstig", "ram", "ddr4", "ddr5", "entwicklung", ] return any(n in q for n in needles) def _price_report_quality(report: str): text = report or "" links = re.findall(r"https?://\S+", text) has_percent = bool(re.search(r"[-+]?\d+[\.,]?\d*\s*%", text)) has_currency = bool(re.search(r"(?:\d+[\.,]?\d*\s?(?:€|eur|\$))|(?:€\s?\d+)", text, re.I)) has_comparison = bool( re.search(r"(damals|heute|vor\s+\d+\s+(?:monaten|wochen)|aktuell|delta)", text, re.I) ) missing = [] if len(links) < 3: missing.append("mindestens 3 konkrete Quellen-Links") if not has_percent: missing.append("Delta in %") if not has_currency: missing.append("konkrete Preise mit Waehrung") if not has_comparison: missing.append("Preisvergleich damals/heute") return len(missing) == 0, missing def _run_research(query: str): thread_id = _create_thread() log.info("Thread erstellt: %s", thread_id) run_id = _start_run(thread_id, query) log.info("Run gestartet: %s", run_id) ok, error = _poll_run(thread_id, run_id) if not ok: return False, f"Deep Research fehlgeschlagen: {error}" report = _get_result(thread_id) log.info("Report erhalten: %d Zeichen", len(report)) return True, report def handle_deep_research(query: str, **kw): log.info("deep_research gestartet: %s", query[:120]) try: ok, report = _run_research(query) if not ok: return report # Harte Qualitaetspruefung fuer Preisfragen if _is_price_query(query): good, missing = _price_report_quality(report) if not good: log.warning("Preisreport zu schwach, starte Retry. Missing: %s", ", ".join(missing)) stricter_query = ( query + "\n\nLIEFERE NUR belastbare Preisdaten im Format:\n" + "1) Zeitraum (exakt)\n" + "2) Preis damals -> Preis heute (EUR)\n" + "3) Delta in %\n" + "4) 3-5 konkrete Quellen-Links (keine Startseiten).\n" + "Wenn unklar: explizit keine belastbaren Preisdaten gefunden." ) ok2, report2 = _run_research(stricter_query) if ok2: good2, missing2 = _price_report_quality(report2) if good2: report = report2 else: return ( "keine belastbaren Preisdaten gefunden. " "Es fehlen: " + ", ".join(missing2) + ". Bitte Anfrage enger formulieren (Produktklasse + Region + Zeitraum)." ) else: return report2 if len(report) > 6000: report = report[:6000] + "\n\n[... Report gekuerzt]" return report except requests.ConnectionError: log.error("CT 121 nicht erreichbar") return "Deep Research (CT 121) nicht erreichbar. Service laeuft moeglicherweise nicht." except Exception as e: log.exception("Deep Research Fehler") return f"Deep Research Fehler: {e}" HANDLERS = {"deep_research": handle_deep_research}