"""RAG Dokumentensuche — Elasticsearch direkt (Hybrid: kNN + deutscher Text). RAGFlow bleibt Ingestion; Suche geht direkt an ES (Issue #51). """ import base64 import json import logging import re import urllib.error import urllib.request log = logging.getLogger("tools.rag") ES_BASE = "http://100.109.101.12:1200" ES_USER = "elastic" ES_PASS = "infini_rag_flow" ES_INDEX = "ragflow_61f51c8c279011f1a174bd19863ba33e" KB_ID = "dc24edda27a311f19fe7fb811de6f016" OLLAMA_EMBED_URL = "http://100.84.255.83:11434/api/embeddings" EMBED_MODEL = "nomic-embed-text" MIN_TOP_K = 5 # Breite Übersichten: mehr ES-Runden, mehr distinct Treffer (pro vollem Pfad docnm_kwd) MAX_TOP_K_NORMAL = 25 MAX_TOP_K_WIDE = 60 ES_SIZE_CAP = 200 TOOLS = [ { "type": "function", "function": { "name": "rag_search", "description": ( "Durchsucht die private Dokumenten-Wissensbasis (>21.000 Dokumente: " "Vertraege, Versicherungen, Rente, Finanzamt, Familiendokumente, " "Anleitungen, Buecher, persoenliche Unterlagen). " "Nutze dieses Tool wenn der User nach einem bestimmten Dokument, " "Vertrag, Brief oder persoenlicher Information fragt. " "Bei breiten Fragen ('welche Versicherungen', Jahreskosten, Listen) " "top_k=15 oder hoeher setzen." ), "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": ( "Suchanfrage: Dokumentname, Thema oder Inhalt. Kurz und praezise, " "z.B. 'Familienbuch Opa Oma' oder 'Grundsteuer Erklaerung 2024'" ), }, "top_k": { "type": "integer", "description": "Anzahl Ergebnisse (5-25, Standard 10)", "default": 10, }, }, "required": ["query"], }, }, }, ] SYSTEM_PROMPT_EXTRA = """RAG DOKUMENTENSUCHE — PFLICHT-REGELN: Du hast Zugriff auf eine private Wissensbasis mit >21.000 Dokumenten (Vertraege, Versicherungen, Rente, Finanzamt, Familiendokumente, Anleitungen, Buecher, persoenliche Unterlagen, Arbeitsvertraege, Kindergeld, Reisepass, Personalausweis, KFZ, Mietvertraege, Bausparvertraege, Rechnungen). WANN rag_search AUFRUFEN — IMMER bei diesen Fragen: - "habe ich..." / "gibt es..." / "wo ist..." / "finde..." / "zeig mir..." + Dokument/Vertrag/Versicherung/Bescheid - Jede Frage nach persoenlichen Unterlagen, Vertraegen, Versicherungen, Rechnungen, Bescheiden - Wohnungen, Immobilien, Grundstuecke, Mietobjekte, Auslands-Objekte (z.B. Kambodscha): immer rag_search — auch wenn das Gedaechtnis schon einen Wohnort nennt - AUCH wenn du glaubst die Antwort zu kennen — das Gedaechtnis ist NICHT die Wissensbasis! - AUCH wenn das Thema im Gedaechtnis steht — trotzdem rag_search aufrufen fuer vollstaendige Antwort WANN NICHT: Nur bei reinen Homelab/IT-Fragen, Smalltalk, oder wenn der User explizit NICHT nach Dokumenten fragt. SUCHANFRAGE: Kurze Keywords, KEINE ganzen Saetze. Beispiele: - "Familienbuch" / "Grundsteuer Erklaerung" / "Haftpflicht" / "Kindergeld" / "Mietvertrag" / "Arbeitsvertrag" / "Reisepass" - "Wohnung Kambodscha" / "Immobilie" / "Condo" ERGEBNISSE AUSWERTEN: - Bei breiten Fragen ("welche Versicherungen", Jahreskosten, Listen): top_k=15-25, ALLE Treffer aus der Tool-Antwort abarbeiten - Liste die gefundenen Dokumente mit Ordner und kurzem Inhalt auf - ERFINDE KEINE Details die nicht im Ergebnis stehen - Der Ordnerpfad (vor dem Dateinamen, getrennt durch __) zeigt die Kategorie - Wenn rag_search Treffer liefert: IMMER auflisten, auch wenn Inhalt unvollstaendig - Mehrere Treffer zur gleichen Versicherung/Gesellschaft: jede Sparte/Dokumentart separat nennen (Kfz, Rechtsschutz, Haftpflicht, Sach, Ausland, Kranken), mit Dateiname/Ordner - Antworte NIEMALS "keine gefunden" oder "nicht gespeichert" OHNE vorher rag_search aufgerufen zu haben""" def _basic_auth_header() -> str: token = base64.b64encode(f"{ES_USER}:{ES_PASS}".encode()).decode() return f"Basic {token}" def _ollama_embed(text: str) -> list | None: body = json.dumps({"model": EMBED_MODEL, "prompt": text}).encode() req = urllib.request.Request( OLLAMA_EMBED_URL, data=body, method="POST", headers={"Content-Type": "application/json"}, ) try: with urllib.request.urlopen(req, timeout=120) as resp: data = json.load(resp) emb = data.get("embedding") if not emb: return None if len(emb) != 768: log.warning("Unexpected embedding dimension %s", len(emb)) return emb except Exception as e: log.error("Ollama embed error: %s", e) return None def _ocr_note(text: str) -> str: if not text or len(text) < 40: return "" non_alnum = sum(1 for c in text if not c.isalnum() and not c.isspace()) ratio = non_alnum / max(len(text), 1) words = re.findall(r"\w+", text, re.UNICODE) avg_len = (sum(len(w) for w in words) / len(words)) if words else 0.0 if ratio > 0.15 or avg_len < 2.0: return " [OCR vermutlich schlecht]" return "" def _folder_from_docname(name: str) -> str: """Extrahiert den Ordnerpfad aus docnm_kwd (__ = Trenner).""" parts = name.rsplit("__", 1) if len(parts) == 2: return parts[0].replace("__", " > ").replace("_", " ") return "" def _dedup_key(name: str) -> str: """Normalisiert Dokumentnamen fuer Deduplizierung. Extrahiert nur den Dateinamen (nach letztem __), ignoriert Dateiendung und Kopie-Marker wie (1), (2). 'Ordner__Foo(1).pdf' und 'Anderer__Foo.txt' werden als gleich behandelt. """ fname = name.rsplit("__", 1)[-1] if "__" in name else name key = re.sub(r"\.(pdf|txt|docx?|xlsx?|csv|png|jpg|jpeg)$", "", fname, flags=re.IGNORECASE) key = re.sub(r"\s*\(\d+\)\s*$", "", key).rstrip() return key.lower() def _dedup_key_full_doc(name: str) -> str: """Ein Chunk pro vollem docnm_kwd — gleicher Dateiname in verschiedenen Ordnern bleibt getrennt.""" return re.sub(r"\s+", " ", (name or "").strip().lower()) def _es_hybrid_search(query: str, es_size: int) -> dict: qvec = _ollama_embed(query) if not qvec: return {"_error": "Embedding fehlgeschlagen (Ollama nicht erreichbar?)."} es_size = min(ES_SIZE_CAP, max(es_size, 20)) kb_filter = {"term": {"kb_id": KB_ID}} body = { "size": es_size, "knn": { "field": "q_768_vec", "query_vector": qvec, "k": es_size, "num_candidates": min(500, max(es_size * 5, 150)), "filter": [kb_filter], }, "query": { "bool": { "filter": [kb_filter], "should": [ {"match": {"content_de": {"query": query, "boost": 2.0}}}, {"match": {"content_ltks": {"query": query.lower(), "boost": 0.4}}}, {"match": {"docnm_kwd": {"query": query, "boost": 3.0}}}, ], "minimum_should_match": 0, } }, } url = f"{ES_BASE}/{ES_INDEX}/_search" req = urllib.request.Request( url, data=json.dumps(body).encode(), method="POST", headers={ "Content-Type": "application/json", "Authorization": _basic_auth_header(), }, ) try: with urllib.request.urlopen(req, timeout=120) as resp: return json.load(resp) except urllib.error.HTTPError as e: err = e.read().decode(errors="replace")[:800] log.error("ES HTTP %s: %s", e.code, err) return {"_error": f"ES HTTP {e.code}: {err}"} except Exception as e: log.error("ES search error: %s", e) return {"_error": str(e)} def _is_wide_recall_query(q: str) -> bool: """Übersichts-/Listen-/Kostenfragen: mehrfach suchen und mergen.""" ql = (q or "").lower() if any(x in ql for x in ("welche versicherung", "alle versicherung", "versicherungen habe")): return True if "versicherung" in ql and any( x in ql for x in ( "welche", "alle", "liste", "überblick", "ueberblick", "kosten", "beitrag", "jähr", "jaehr", "jahres", "gesamt", "summe", "übersicht", "uebersicht", ) ): return True costish = any( x in ql for x in ( "kosten", "kostet", "wie viel", "wieviel", "beitrag", "beiträge", "beitraege", "eur", "euro", "jähr", "jaehr", "jahreskosten", "prämie", "praemie", ) ) broad = any( x in ql for x in ( "liste", "übersicht", "uebersicht", "alle", "gesamt", "summe", "jährlich", "jaehrlich", ) ) return costish and broad # Immobilien / Wohnungen / Kambodscha if any(x in ql for x in ("wohnung", "immobilie", "condo", "apartment", "grundstück", "grundstueck")): if any(x in ql for x in ("welche", "alle", "liste", "habe ich", "übersicht", "uebersicht", "wie viele")): return True if any(x in ql for x in ("kambodscha", "cambodia", "takeo", "phnom", "sihanouk")): if any(x in ql for x in ("welche", "alle", "wohnung", "immobilie", "haus", "condo", "apartment", "mietvertrag")): return True return False # Zusatzanfragen decken Sparten + Gesellschaften ab (Recall) _WIDE_SUBQUERIES = [ "Versicherung Beitragsrechnung Jahresbeitrag", "Wohngebäudeversicherung Gebäude Beitrag", "Hausratversicherung Beitrag Ergo", "Haftpflichtversicherung Beitrag GARANTA", "Kfz Kasko Haftpflicht Beitragsrechnung", "Rechtsschutzversicherung Beitrag", "Lebensversicherung Beitrag", "Krankenversicherung PKV Beitrag", "Sachversicherung LVM Beitrag", "LVM AutoPlus Versicherungsschein", "Allianz Versicherung Police", "Nürnberger Versicherung Beitrag", "Ergo Versicherung Police", "Unfallversicherung Berufsunfähigkeit", "Bausparvertrag Bauspar", "Ford Transit Nutzfahrzeug Versicherung", "Kfz Versicherungsschein Beitrag jährlich", ] _WIDE_SUBQUERIES_IMMOBILIEN = [ "Arakawa Wohnung Mietvertrag", "Arakawa Wohnung D1603", "Arakawa Wohnung G2010", "Arakawa Wohnung-2", "Kambodscha Arakawa Kaufvertrag", "Kambodscha Arakawa Vollmacht", "Kambodscha Arakawa Überweisung", "Wohnung Mietvertrag Kambodscha", "Condo Apartment Cambodia", "Hard Title Wohnung", "Wohnungen Kurtzübersicht", "Mietvertrag Ramirez Antonio", "Mietvertrag Cheng Qiu", "Kambodscha Rechnungen Strom Miete", ] def _merge_hits_from_queries( queries: list[str], es_size: int, pool_cap: int, *, full_path_dedup: bool = False, ) -> tuple[list, str | None]: """Führt mehrere Hybrid-Suchen aus; pro Dedup-Key höchster Score.""" best: dict[str, dict] = {} last_err: str | None = None def dkey(dn: str) -> str: return _dedup_key_full_doc(dn) if full_path_dedup else _dedup_key(dn) def absorb(hits: list) -> None: for h in hits: src = h.get("_source") or {} dn = src.get("docnm_kwd") or "?" dk = dkey(dn) sc = float(h.get("_score") or 0.0) old = best.get(dk) if old is None or sc > float(old.get("_score") or 0.0): best[dk] = h for q in queries: q = (q or "").strip() if not q: continue data = _es_hybrid_search(q, es_size) if "_error" in data: last_err = str(data["_error"]) log.warning("wide_recall subquery fail %s: %s", q[:40], last_err) continue absorb((data.get("hits") or {}).get("hits") or []) merged = sorted(best.values(), key=lambda h: float(h.get("_score") or 0.0), reverse=True) return merged[:pool_cap], last_err def handle_rag_search(query: str, top_k: int = 8, **kw): if not query or not query.strip(): return "rag_search: query fehlt." qstrip = query.strip() wide = _is_wide_recall_query(qstrip) cap = MAX_TOP_K_WIDE if wide else MAX_TOP_K_NORMAL top_k = max(MIN_TOP_K, min(int(top_k or 10), cap)) es_size = min(ES_SIZE_CAP, max(top_k * 10, 70)) if wide: ql = qstrip.lower() _immo_wide = any( x in ql for x in ("wohnung", "immobilie", "condo", "apartment", "grundstück", "grundstueck", "kambodscha", "cambodia", "arakawa") ) _sq_pool = _WIDE_SUBQUERIES_IMMOBILIEN if _immo_wide else _WIDE_SUBQUERIES subqs = [qstrip] for sq in _sq_pool: if sq.lower() not in ql: subqs.append(sq) pool_cap = max(top_k * 5, 120) hits, err = _merge_hits_from_queries( subqs[:22], es_size, pool_cap=pool_cap, full_path_dedup=True, ) if err and not hits: return f"Fehler bei der Dokumentensuche: {err}" header = ( f"**Breitensuche ({len(subqs[:22])} Anfragen, Dedup=voller Pfad) '{qstrip}' — " f"{len(hits)} Kandidaten, zeige bis {top_k}:**\n" ) snip_len = 400 else: data = _es_hybrid_search(qstrip, es_size) if "_error" in data: return f"Fehler bei der Dokumentensuche: {data['_error']}" hits = (data.get("hits") or {}).get("hits") or [] header = f"**Dokumente fuer '{qstrip}' (bis {top_k}):**\n" snip_len = 650 if not hits: return f"Keine Ergebnisse fuer '{qstrip}' in der Wissensbasis gefunden." seen_docs: set[str] = set() lines: list[str] = [] count = 0 def out_dkey(doc_name: str) -> str: return _dedup_key_full_doc(doc_name) if wide else _dedup_key(doc_name) for h in hits: if count >= top_k: break src = h.get("_source") or {} doc_name = src.get("docnm_kwd") or "?" dk = out_dkey(doc_name) if dk in seen_docs: continue seen_docs.add(dk) score = h.get("_score") or 0.0 raw = src.get("content_with_weight") or src.get("content_de") or "" content = raw[:snip_len].strip() ocr = _ocr_note(raw) folder = _folder_from_docname(doc_name) filename = doc_name.rsplit("__", 1)[-1] if "__" in doc_name else doc_name folder_line = f" Ordner: {folder}" if folder else "" lines.append(f"---\n**{count + 1}. {filename}** (Score: {score:.1f}){ocr}") if folder_line: lines.append(folder_line) if content: lines.append(f"```\n{content}\n```") count += 1 if count == 0: return f"Keine Dokumente fuer '{qstrip}' gefunden." lines.insert(0, header) tail = ( "\n---\n(Ende der Ergebnisse. Nur diese Dokumente in dieser Runde. " + ( "Bei Summen/Zahlen: alle Treffer prüfen; OCR kann unvollständig sein." if wide else "" ) + ")" ) lines.append(tail) return "\n".join(lines) HANDLERS = { "rag_search": handle_rag_search, }