"""RAG Dokumentensuche — Elasticsearch direkt (Hybrid: kNN + deutscher Text). RAGFlow bleibt Ingestion; Suche geht direkt an ES (Issue #51). """ import base64 import json import logging import re import urllib.error import urllib.request log = logging.getLogger("tools.rag") ES_BASE = "http://100.109.101.12:1200" ES_USER = "elastic" ES_PASS = "infini_rag_flow" ES_INDEX = "ragflow_61f51c8c279011f1a174bd19863ba33e" KB_ID = "dc24edda27a311f19fe7fb811de6f016" OLLAMA_EMBED_URL = "http://100.84.255.83:11434/api/embeddings" EMBED_MODEL = "nomic-embed-text" # Cross-Encoder Reranking (CT 123, pve-hetzner LAN) RERANKER_URL = "http://10.10.10.123:8099" RERANK_CANDIDATES = 15 RERANK_TIMEOUT = 30 RERANK_SNIPPET_CHARS = 512 MIN_TOP_K = 5 # Breite Übersichten: mehr ES-Runden, mehr distinct Treffer (pro vollem Pfad docnm_kwd) MAX_TOP_K_NORMAL = 25 MAX_TOP_K_WIDE = 60 ES_SIZE_CAP = 200 TOOLS = [ { "type": "function", "function": { "name": "rag_search", "description": ( "Durchsucht die private Dokumenten-Wissensbasis (>21.000 Dokumente: " "Vertraege, Versicherungen, Rente, Finanzamt, Familiendokumente, " "Anleitungen, Buecher, persoenliche Unterlagen). " "Nutze dieses Tool wenn der User nach einem bestimmten Dokument, " "Vertrag, Brief oder persoenlicher Information fragt. " "Bei breiten Fragen ('welche Versicherungen', Jahreskosten, Listen) " "top_k=15 oder hoeher setzen." ), "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": ( "Suchanfrage: Dokumentname, Thema oder Inhalt. Kurz und praezise, " "z.B. 'Familienbuch Opa Oma' oder 'Grundsteuer Erklaerung 2024'" ), }, "top_k": { "type": "integer", "description": "Anzahl Ergebnisse (5-25, Standard 10)", "default": 10, }, }, "required": ["query"], }, }, }, ] SYSTEM_PROMPT_EXTRA = """RAG DOKUMENTENSUCHE — PFLICHT-REGELN: Du hast Zugriff auf eine private Wissensbasis mit >21.000 Dokumenten (Vertraege, Versicherungen, Rente, Finanzamt, Familiendokumente, Anleitungen, Buecher, persoenliche Unterlagen, Arbeitsvertraege, Kindergeld, Reisepass, Personalausweis, KFZ, Mietvertraege, Bausparvertraege, Rechnungen). WANN rag_search AUFRUFEN — IMMER bei diesen Fragen: - "habe ich..." / "gibt es..." / "wo ist..." / "finde..." / "zeig mir..." + Dokument/Vertrag/Versicherung/Bescheid - Jede Frage nach persoenlichen Unterlagen, Vertraegen, Versicherungen, Rechnungen, Bescheiden - AUCH wenn du glaubst die Antwort zu kennen — das Gedaechtnis ist NICHT die Wissensbasis! - AUCH wenn das Thema im Gedaechtnis steht — trotzdem rag_search aufrufen fuer vollstaendige Antwort WANN NICHT: Nur bei reinen Homelab/IT-Fragen, Smalltalk, oder wenn der User explizit NICHT nach Dokumenten fragt. SUCHANFRAGE: Kurze Keywords, KEINE ganzen Saetze. Beispiele: - "Familienbuch" / "Grundsteuer Erklaerung" / "Haftpflicht" / "Kindergeld" / "Mietvertrag" / "Arbeitsvertrag" / "Reisepass" ERGEBNISSE AUSWERTEN: - Bei breiten Fragen ("welche Versicherungen", Jahreskosten, Listen): top_k=15-25, ALLE Treffer aus der Tool-Antwort abarbeiten - Liste die gefundenen Dokumente mit Ordner und kurzem Inhalt auf - ERFINDE KEINE Details die nicht im Ergebnis stehen - Der Ordnerpfad (vor dem Dateinamen, getrennt durch __) zeigt die Kategorie - Wenn rag_search Treffer liefert: IMMER auflisten, auch wenn Inhalt unvollstaendig - Mehrere Treffer zur gleichen Versicherung/Gesellschaft: jede Sparte/Dokumentart separat nennen (Kfz, Rechtsschutz, Haftpflicht, Sach, Ausland, Kranken), mit Dateiname/Ordner - Antworte NIEMALS "keine gefunden" oder "nicht gespeichert" OHNE vorher rag_search aufgerufen zu haben""" def _basic_auth_header() -> str: token = base64.b64encode(f"{ES_USER}:{ES_PASS}".encode()).decode() return f"Basic {token}" def _ollama_embed(text: str) -> list | None: body = json.dumps({"model": EMBED_MODEL, "prompt": text}).encode() req = urllib.request.Request( OLLAMA_EMBED_URL, data=body, method="POST", headers={"Content-Type": "application/json"}, ) try: with urllib.request.urlopen(req, timeout=120) as resp: data = json.load(resp) emb = data.get("embedding") if not emb: return None if len(emb) != 768: log.warning("Unexpected embedding dimension %s", len(emb)) return emb except Exception as e: log.error("Ollama embed error: %s", e) return None def _ocr_note(text: str) -> str: if not text or len(text) < 40: return "" non_alnum = sum(1 for c in text if not c.isalnum() and not c.isspace()) ratio = non_alnum / max(len(text), 1) words = re.findall(r"\w+", text, re.UNICODE) avg_len = (sum(len(w) for w in words) / len(words)) if words else 0.0 if ratio > 0.15 or avg_len < 2.0: return " [OCR vermutlich schlecht]" return "" def _folder_from_docname(name: str) -> str: """Extrahiert den Ordnerpfad aus docnm_kwd (__ = Trenner).""" parts = name.rsplit("__", 1) if len(parts) == 2: return parts[0].replace("__", " > ").replace("_", " ") return "" def _dedup_key(name: str) -> str: """Normalisiert Dokumentnamen fuer Deduplizierung. Extrahiert nur den Dateinamen (nach letztem __), ignoriert Dateiendung und Kopie-Marker wie (1), (2). 'Ordner__Foo(1).pdf' und 'Anderer__Foo.txt' werden als gleich behandelt. """ fname = name.rsplit("__", 1)[-1] if "__" in name else name key = re.sub(r"\.(pdf|txt|docx?|xlsx?|csv|png|jpg|jpeg)$", "", fname, flags=re.IGNORECASE) key = re.sub(r"\s*\(\d+\)\s*$", "", key).rstrip() return key.lower() def _dedup_key_full_doc(name: str) -> str: """Ein Chunk pro vollem docnm_kwd — gleicher Dateiname in verschiedenen Ordnern bleibt getrennt.""" return re.sub(r"\s+", " ", (name or "").strip().lower()) def _es_hybrid_search(query: str, es_size: int) -> dict: qvec = _ollama_embed(query) if not qvec: return {"_error": "Embedding fehlgeschlagen (Ollama nicht erreichbar?)."} es_size = min(ES_SIZE_CAP, max(es_size, 20)) kb_filter = {"term": {"kb_id": KB_ID}} body = { "size": es_size, "knn": { "field": "q_768_vec", "query_vector": qvec, "k": es_size, "num_candidates": min(500, max(es_size * 5, 150)), "filter": [kb_filter], }, "query": { "bool": { "filter": [kb_filter], "should": [ {"match": {"content_de": {"query": query, "boost": 2.0}}}, {"match": {"content_ltks": {"query": query.lower(), "boost": 0.4}}}, {"match": {"docnm_kwd": {"query": query, "boost": 3.0}}}, ], "minimum_should_match": 0, } }, } url = f"{ES_BASE}/{ES_INDEX}/_search" req = urllib.request.Request( url, data=json.dumps(body).encode(), method="POST", headers={ "Content-Type": "application/json", "Authorization": _basic_auth_header(), }, ) try: with urllib.request.urlopen(req, timeout=120) as resp: return json.load(resp) except urllib.error.HTTPError as e: err = e.read().decode(errors="replace")[:800] log.error("ES HTTP %s: %s", e.code, err) return {"_error": f"ES HTTP {e.code}: {err}"} except Exception as e: log.error("ES search error: %s", e) return {"_error": str(e)} def _snippet_for_rerank(src: dict) -> str: doc_name = src.get("docnm_kwd") or "" raw = src.get("content_with_weight") or src.get("content_de") or "" prefix = doc_name[:120] + "\n" if doc_name else "" return prefix + raw[:RERANK_SNIPPET_CHARS] def _rerank_hits(query: str, hits: list) -> tuple[list, bool]: """Rerankt mit Cross-Encoder, kombiniert Score mit ES-Rang (RRF).""" if not hits or not RERANKER_URL: return hits, False to_score = hits[:RERANK_CANDIDATES] docs = [] for h in to_score: src = h.get("_source") or {} docs.append(_snippet_for_rerank(src)) if not any((d or "").strip() for d in docs): return hits, False body = json.dumps({"query": query, "documents": docs}).encode() url = f"{RERANKER_URL.rstrip('/')}/rerank" req = urllib.request.Request( url, data=body, method="POST", headers={"Content-Type": "application/json"}, ) try: with urllib.request.urlopen(req, timeout=RERANK_TIMEOUT) as resp: data = json.load(resp) scores = data.get("scores") or [] if len(scores) != len(to_score): log.warning( "rerank score count mismatch: %s vs %s", len(scores), len(to_score), ) return hits, False k = 60 combined: list[tuple[float, int]] = [] for idx, (h, rr_score) in enumerate(zip(to_score, scores)): es_rank = idx + 1 rr_sorted = sorted(scores, reverse=True) rr_rank = rr_sorted.index(rr_score) + 1 rrf = 1.0 / (k + es_rank) + 1.0 / (k + rr_rank) combined.append((rrf, idx)) combined.sort(key=lambda x: x[0], reverse=True) new_order: list = [] for rrf, idx in combined: h = to_score[idx] h["_rerank_score"] = float(scores[idx]) h["_rrf_score"] = float(rrf) new_order.append(h) rest = hits[RERANK_CANDIDATES:] return new_order + rest, True except Exception as e: log.warning("rerank failed (fallback to ES): %s", e) return hits, False def _is_wide_recall_query(q: str) -> bool: """Übersichts-/Listen-/Kostenfragen: mehrfach suchen und mergen.""" ql = (q or "").lower() if any(x in ql for x in ("welche versicherung", "alle versicherung", "versicherungen habe")): return True if "versicherung" in ql and any( x in ql for x in ( "welche", "alle", "liste", "überblick", "ueberblick", "kosten", "beitrag", "jähr", "jaehr", "jahres", "gesamt", "summe", "übersicht", "uebersicht", ) ): return True costish = any( x in ql for x in ( "kosten", "kostet", "wie viel", "wieviel", "beitrag", "beiträge", "beitraege", "eur", "euro", "jähr", "jaehr", "jahreskosten", "prämie", "praemie", ) ) broad = any( x in ql for x in ( "liste", "übersicht", "uebersicht", "alle", "gesamt", "summe", "jährlich", "jaehrlich", ) ) return costish and broad # Zusatzanfragen decken Sparten + Gesellschaften ab (Recall) _WIDE_SUBQUERIES = [ "Versicherung Beitragsrechnung Jahresbeitrag", "Wohngebäudeversicherung Gebäude Beitrag", "Hausratversicherung Beitrag Ergo", "Haftpflichtversicherung Beitrag GARANTA", "Kfz Kasko Haftpflicht Beitragsrechnung", "Rechtsschutzversicherung Beitrag", "Lebensversicherung Beitrag", "Krankenversicherung PKV Beitrag", "Sachversicherung LVM Beitrag", "LVM AutoPlus Versicherungsschein", "Allianz Versicherung Police", "Nürnberger Versicherung Beitrag", "Ergo Versicherung Police", "Unfallversicherung Berufsunfähigkeit", "Bausparvertrag Bauspar", "Ford Transit Nutzfahrzeug Versicherung", "Kfz Versicherungsschein Beitrag jährlich", ] def _merge_hits_from_queries( queries: list[str], es_size: int, pool_cap: int, *, full_path_dedup: bool = False, ) -> tuple[list, str | None]: """Führt mehrere Hybrid-Suchen aus; pro Dedup-Key höchster Score.""" best: dict[str, dict] = {} last_err: str | None = None def dkey(dn: str) -> str: return _dedup_key_full_doc(dn) if full_path_dedup else _dedup_key(dn) def absorb(hits: list) -> None: for h in hits: src = h.get("_source") or {} dn = src.get("docnm_kwd") or "?" dk = dkey(dn) sc = float(h.get("_score") or 0.0) old = best.get(dk) if old is None or sc > float(old.get("_score") or 0.0): best[dk] = h for q in queries: q = (q or "").strip() if not q: continue data = _es_hybrid_search(q, es_size) if "_error" in data: last_err = str(data["_error"]) log.warning("wide_recall subquery fail %s: %s", q[:40], last_err) continue absorb((data.get("hits") or {}).get("hits") or []) merged = sorted(best.values(), key=lambda h: float(h.get("_score") or 0.0), reverse=True) return merged[:pool_cap], last_err def handle_rag_search(query: str, top_k: int = 8, **kw): if not query or not query.strip(): return "rag_search: query fehlt." qstrip = query.strip() wide = _is_wide_recall_query(qstrip) cap = MAX_TOP_K_WIDE if wide else MAX_TOP_K_NORMAL top_k = max(MIN_TOP_K, min(int(top_k or 10), cap)) es_size = min(ES_SIZE_CAP, max(top_k * 10, 70)) if wide: subqs = [qstrip] for sq in _WIDE_SUBQUERIES: if sq.lower() not in qstrip.lower(): subqs.append(sq) pool_cap = max(top_k * 5, 120) hits, err = _merge_hits_from_queries( subqs[:22], es_size, pool_cap=pool_cap, full_path_dedup=True, ) if err and not hits: return f"Fehler bei der Dokumentensuche: {err}" header = ( f"**Breitensuche ({len(subqs[:22])} Anfragen, Dedup=voller Pfad) '{qstrip}' — " f"{len(hits)} Kandidaten, zeige bis {top_k}:**\n" ) snip_len = 400 else: data = _es_hybrid_search(qstrip, es_size) if "_error" in data: return f"Fehler bei der Dokumentensuche: {data['_error']}" hits = (data.get("hits") or {}).get("hits") or [] header = f"**Dokumente fuer '{qstrip}' (bis {top_k}):**\n" snip_len = 650 if not hits: return f"Keine Ergebnisse fuer '{qstrip}' in der Wissensbasis gefunden." hits, reranked = _rerank_hits(qstrip, hits) seen_docs: set[str] = set() lines: list[str] = [] count = 0 def out_dkey(doc_name: str) -> str: return _dedup_key_full_doc(doc_name) if wide else _dedup_key(doc_name) for h in hits: if count >= top_k: break src = h.get("_source") or {} doc_name = src.get("docnm_kwd") or "?" dk = out_dkey(doc_name) if dk in seen_docs: continue seen_docs.add(dk) if "_rrf_score" in h: score = float(h["_rrf_score"]) score_label = "RRF" elif "_rerank_score" in h: score = float(h["_rerank_score"]) score_label = "Rerank" else: score = float(h.get("_score") or 0.0) score_label = "ES" raw = src.get("content_with_weight") or src.get("content_de") or "" content = raw[:snip_len].strip() ocr = _ocr_note(raw) folder = _folder_from_docname(doc_name) filename = doc_name.rsplit("__", 1)[-1] if "__" in doc_name else doc_name folder_line = f" Ordner: {folder}" if folder else "" lines.append( f"---\n**{count + 1}. {filename}** ({score_label}: {score:.3f}){ocr}" ) if folder_line: lines.append(folder_line) if content: lines.append(f"```\n{content}\n```") count += 1 if count == 0: return f"Keine Dokumente fuer '{qstrip}' gefunden." hdr = header.rstrip() + ( " _(Cross-Encoder reranked)_" if reranked else "" ) + "\n" lines.insert(0, hdr) tail = ( "\n---\n(Ende der Ergebnisse. Nur diese Dokumente in dieser Runde. " + ( "Bei Summen/Zahlen: alle Treffer prüfen; OCR kann unvollständig sein." if wide else "" ) + ")" ) lines.append(tail) return "\n".join(lines) HANDLERS = { "rag_search": handle_rag_search, }