homelab-brain/homelab-ai-bot/tools/deep_research.py
Cursor 36d708bee1 refactor(llm): Local-First Routing mit Sonar-Websuche
- Basis: 981118f9 (lokales Qwen3 30B) wiederhergestellt
- Drei Pfade: lokal (qwen3:30b-a3b), Vision (qwen3-vl:32b), Sonar (perplexity/sonar)
- _route_model() fuer sauberes Routing (Web-Keywords -> Sonar, Rest -> lokal)
- /no_think fuer Ollama, Timeout-Fallback auf qwen2.5:14b
- Passthrough-Tools fuer Grafana-Daten
- deep_research TOOLS wieder aktiviert
- Preis-Spaghetti-Logik entfernt
2026-03-21 12:06:00 +01:00

222 lines
7.5 KiB
Python

"""Deep Research Tool — Open Deep Research (CT 121) via LangGraph API."""
import logging
import re
import time
import requests
log = logging.getLogger("deep_research")
DEEP_RESEARCH_URL = "http://10.10.10.121:2024"
ASSISTANT_ID = "e9a5370f-7a53-55a8-ada8-6ab9ef15bb5b"
RESEARCH_MODEL = "openai/gpt-4o-mini"
POLL_INTERVAL = 10
MAX_WAIT = 600
SYSTEM_PROMPT_EXTRA = """DEEP RESEARCH:
Du hast Zugriff auf deep_research — eine KI-gestuetzte Tiefenrecherche die 20-30 Quellen durchsucht.
Nutze es NUR wenn der User explizit "deep research" oder "tiefenrecherche" sagt. Fuer alles andere: web_search.
NICHT fuer einfache Fakten oder Homelab-Fragen.
WICHTIG: deep_research dauert 2-5 Minuten. Das ist normal. Warte auf das Ergebnis.
Das Ergebnis ist ein ausfuehrlicher Report. Fasse ihn fuer Telegram zusammen (max ~3000 Zeichen).
QUALITAET BEI PREISFRAGEN:
- Liefere konkrete Zahlen statt allgemeiner Markttexte.
- Zeige Zeitraum, Preis damals/heute, Delta in % und Quellen.
- Wenn keine belastbaren Daten vorhanden sind, sage es explizit."""
TOOLS = [
{
"type": "function",
"function": {
"name": "deep_research",
"description": "KI-gestuetzte Tiefenrecherche (20-30 Quellen, 2-5 Min). NUR wenn User explizit deep research oder tiefenrecherche sagt.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Die Recherche-Frage"
}
},
"required": ["query"]
},
},
},
]
def _create_thread():
r = requests.post(f"{DEEP_RESEARCH_URL}/threads", json={}, timeout=10)
r.raise_for_status()
return r.json()["thread_id"]
def _start_run(thread_id, query):
payload = {
"assistant_id": ASSISTANT_ID,
"input": {"messages": [{"role": "user", "content": query}]},
"config": {
"configurable": {
"summarization_model": f"openai:{RESEARCH_MODEL}",
"research_model": f"openai:{RESEARCH_MODEL}",
"compression_model": f"openai:{RESEARCH_MODEL}",
"final_report_model": f"openai:{RESEARCH_MODEL}",
"allow_clarification": False,
}
},
}
r = requests.post(
f"{DEEP_RESEARCH_URL}/threads/{thread_id}/runs", json=payload, timeout=30
)
r.raise_for_status()
return r.json()["run_id"]
def _poll_run(thread_id, run_id):
elapsed = 0
while elapsed < MAX_WAIT:
time.sleep(POLL_INTERVAL)
elapsed += POLL_INTERVAL
try:
r = requests.get(
f"{DEEP_RESEARCH_URL}/threads/{thread_id}/runs/{run_id}", timeout=10
)
r.raise_for_status()
data = r.json()
status = data.get("status", "unknown")
log.info("Poll %ds: status=%s", elapsed, status)
if status == "success":
return True, None
if status in ("error", "failed"):
err = data.get("error", "Unbekannter Fehler")
log.error("Run failed: %s", err)
return False, err
if status == "interrupted":
return False, "Research wurde unterbrochen"
except Exception as e:
log.warning("Poll error at %ds: %s", elapsed, e)
return False, f"Timeout nach {MAX_WAIT}s"
def _get_result(thread_id):
r = requests.get(f"{DEEP_RESEARCH_URL}/threads/{thread_id}/state", timeout=30)
r.raise_for_status()
state = r.json()
messages = state.get("values", {}).get("messages", [])
log.info("Messages in result: %d", len(messages))
for i, msg in enumerate(messages):
content = msg.get("content", "")
clen = len(content) if isinstance(content, str) else 0
log.info(" msg[%d] type=%s len=%d", i, msg.get("type", "?"), clen)
for msg in reversed(messages):
content = msg.get("content", "")
if isinstance(content, str) and len(content) > 100:
return content
return "Kein Report generiert."
def _is_price_query(query: str) -> bool:
q = (query or "").lower()
needles = [
"preis",
"preise",
"kosten",
"teuer",
"guenstig",
"ram",
"ddr4",
"ddr5",
"entwicklung",
]
return any(n in q for n in needles)
def _price_report_quality(report: str):
text = report or ""
links = re.findall(r"https?://\S+", text)
has_percent = bool(re.search(r"[-+]?\d+[\.,]?\d*\s*%", text))
has_currency = bool(re.search(r"(?:\d+[\.,]?\d*\s?(?:€|eur|\$))|(?:€\s?\d+)", text, re.I))
has_comparison = bool(
re.search(r"(damals|heute|vor\s+\d+\s+(?:monaten|wochen)|aktuell|delta)", text, re.I)
)
missing = []
if len(links) < 3:
missing.append("mindestens 3 konkrete Quellen-Links")
if not has_percent:
missing.append("Delta in %")
if not has_currency:
missing.append("konkrete Preise mit Waehrung")
if not has_comparison:
missing.append("Preisvergleich damals/heute")
return len(missing) == 0, missing
def _run_research(query: str):
thread_id = _create_thread()
log.info("Thread erstellt: %s", thread_id)
run_id = _start_run(thread_id, query)
log.info("Run gestartet: %s", run_id)
ok, error = _poll_run(thread_id, run_id)
if not ok:
return False, f"Deep Research fehlgeschlagen: {error}"
report = _get_result(thread_id)
log.info("Report erhalten: %d Zeichen", len(report))
return True, report
def handle_deep_research(query: str, **kw):
log.info("deep_research gestartet: %s", query[:120])
try:
ok, report = _run_research(query)
if not ok:
return report
# Harte Qualitaetspruefung fuer Preisfragen
if _is_price_query(query):
good, missing = _price_report_quality(report)
if not good:
log.warning("Preisreport zu schwach, starte Retry. Missing: %s", ", ".join(missing))
stricter_query = (
query
+ "\n\nLIEFERE NUR belastbare Preisdaten im Format:\n"
+ "1) Zeitraum (exakt)\n"
+ "2) Preis damals -> Preis heute (EUR)\n"
+ "3) Delta in %\n"
+ "4) 3-5 konkrete Quellen-Links (keine Startseiten).\n"
+ "Wenn unklar: explizit keine belastbaren Preisdaten gefunden."
)
ok2, report2 = _run_research(stricter_query)
if ok2:
good2, missing2 = _price_report_quality(report2)
if good2:
report = report2
else:
return (
"keine belastbaren Preisdaten gefunden. "
"Es fehlen: "
+ ", ".join(missing2)
+ ". Bitte Anfrage enger formulieren (Produktklasse + Region + Zeitraum)."
)
else:
return report2
if len(report) > 6000:
report = report[:6000] + "\n\n[... Report gekuerzt]"
return report
except requests.ConnectionError:
log.error("CT 121 nicht erreichbar")
return "Deep Research (CT 121) nicht erreichbar. Service laeuft moeglicherweise nicht."
except Exception as e:
log.exception("Deep Research Fehler")
return f"Deep Research Fehler: {e}"
HANDLERS = {"deep_research": handle_deep_research}