"""RAG Dokumentensuche — Elasticsearch direkt (Hybrid: kNN + deutscher Text). RAGFlow bleibt Ingestion; Suche geht direkt an ES (Issue #51). """ import base64 import json import logging import re import urllib.error import urllib.request log = logging.getLogger("tools.rag") ES_BASE = "http://100.109.101.12:1200" ES_USER = "elastic" ES_PASS = "infini_rag_flow" ES_INDEX = "ragflow_61f51c8c279011f1a174bd19863ba33e" KB_ID = "dc24edda27a311f19fe7fb811de6f016" OLLAMA_EMBED_URL = "http://100.84.255.83:11434/api/embeddings" EMBED_MODEL = "nomic-embed-text" MIN_TOP_K = 5 TOOLS = [ { "type": "function", "function": { "name": "rag_search", "description": ( "Durchsucht die private Dokumenten-Wissensbasis (>21.000 Dokumente: " "Vertraege, Versicherungen, Rente, Finanzamt, Familiendokumente, " "Anleitungen, Buecher, persoenliche Unterlagen). " "Nutze dieses Tool wenn der User nach einem bestimmten Dokument, " "Vertrag, Brief oder persoenlicher Information fragt. " "Bei breiten Fragen ('welche Versicherungen', 'alle Vertraege') " "immer top_k=10 verwenden." ), "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": ( "Suchanfrage: Dokumentname, Thema oder Inhalt. Kurz und praezise, " "z.B. 'Familienbuch Opa Oma' oder 'Grundsteuer Erklaerung 2024'" ), }, "top_k": { "type": "integer", "description": "Anzahl Ergebnisse (5-10, Standard 8)", "default": 8, }, }, "required": ["query"], }, }, }, ] SYSTEM_PROMPT_EXTRA = """RAG DOKUMENTENSUCHE — PFLICHT-REGELN: Du hast Zugriff auf eine private Wissensbasis mit >21.000 Dokumenten (Vertraege, Versicherungen, Rente, Finanzamt, Familiendokumente, Anleitungen, Buecher, persoenliche Unterlagen, Arbeitsvertraege, Kindergeld, Reisepass, Personalausweis, KFZ, Mietvertraege, Bausparvertraege, Rechnungen). WANN rag_search AUFRUFEN — IMMER bei diesen Fragen: - "habe ich..." / "gibt es..." / "wo ist..." / "finde..." / "zeig mir..." + Dokument/Vertrag/Versicherung/Bescheid - Jede Frage nach persoenlichen Unterlagen, Vertraegen, Versicherungen, Rechnungen, Bescheiden - AUCH wenn du glaubst die Antwort zu kennen — das Gedaechtnis ist NICHT die Wissensbasis! - AUCH wenn das Thema im Gedaechtnis steht — trotzdem rag_search aufrufen fuer vollstaendige Antwort WANN NICHT: Nur bei reinen Homelab/IT-Fragen, Smalltalk, oder wenn der User explizit NICHT nach Dokumenten fragt. SUCHANFRAGE: Kurze Keywords, KEINE ganzen Saetze. Beispiele: - "Familienbuch" / "Grundsteuer Erklaerung" / "Haftpflicht" / "Kindergeld" / "Mietvertrag" / "Arbeitsvertrag" / "Reisepass" ERGEBNISSE AUSWERTEN: - Bei breiten Fragen ("welche Versicherungen", "alle Vertraege"): top_k=10 - Liste die gefundenen Dokumente mit Ordner und kurzem Inhalt auf - ERFINDE KEINE Details die nicht im Ergebnis stehen - Der Ordnerpfad (vor dem Dateinamen, getrennt durch __) zeigt die Kategorie - Wenn rag_search Treffer liefert: IMMER auflisten, auch wenn Inhalt unvollstaendig - Mehrere Treffer zur gleichen Versicherung/Gesellschaft: jede Sparte/Dokumentart separat nennen (Kfz, Rechtsschutz, Haftpflicht, Sach, Ausland, Kranken), mit Dateiname/Ordner - Antworte NIEMALS "keine gefunden" oder "nicht gespeichert" OHNE vorher rag_search aufgerufen zu haben""" def _basic_auth_header() -> str: token = base64.b64encode(f"{ES_USER}:{ES_PASS}".encode()).decode() return f"Basic {token}" def _ollama_embed(text: str) -> list | None: body = json.dumps({"model": EMBED_MODEL, "prompt": text}).encode() req = urllib.request.Request( OLLAMA_EMBED_URL, data=body, method="POST", headers={"Content-Type": "application/json"}, ) try: with urllib.request.urlopen(req, timeout=120) as resp: data = json.load(resp) emb = data.get("embedding") if not emb: return None if len(emb) != 768: log.warning("Unexpected embedding dimension %s", len(emb)) return emb except Exception as e: log.error("Ollama embed error: %s", e) return None def _ocr_note(text: str) -> str: if not text or len(text) < 40: return "" non_alnum = sum(1 for c in text if not c.isalnum() and not c.isspace()) ratio = non_alnum / max(len(text), 1) words = re.findall(r"\w+", text, re.UNICODE) avg_len = (sum(len(w) for w in words) / len(words)) if words else 0.0 if ratio > 0.15 or avg_len < 2.0: return " [OCR vermutlich schlecht]" return "" def _folder_from_docname(name: str) -> str: """Extrahiert den Ordnerpfad aus docnm_kwd (__ = Trenner).""" parts = name.rsplit("__", 1) if len(parts) == 2: return parts[0].replace("__", " > ").replace("_", " ") return "" def _dedup_key(name: str) -> str: """Normalisiert Dokumentnamen fuer Deduplizierung. Extrahiert nur den Dateinamen (nach letztem __), ignoriert Dateiendung und Kopie-Marker wie (1), (2). 'Ordner__Foo(1).pdf' und 'Anderer__Foo.txt' werden als gleich behandelt. """ fname = name.rsplit("__", 1)[-1] if "__" in name else name key = re.sub(r"\.(pdf|txt|docx?|xlsx?|csv|png|jpg|jpeg)$", "", fname, flags=re.IGNORECASE) key = re.sub(r"\s*\(\d+\)\s*$", "", key).rstrip() return key.lower() def _es_hybrid_search(query: str, es_size: int) -> dict: qvec = _ollama_embed(query) if not qvec: return {"_error": "Embedding fehlgeschlagen (Ollama nicht erreichbar?)."} kb_filter = {"term": {"kb_id": KB_ID}} body = { "size": es_size, "knn": { "field": "q_768_vec", "query_vector": qvec, "k": es_size, "num_candidates": min(500, max(es_size * 5, 120)), "filter": [kb_filter], }, "query": { "bool": { "filter": [kb_filter], "should": [ {"match": {"content_de": {"query": query, "boost": 2.0}}}, {"match": {"content_ltks": {"query": query.lower(), "boost": 0.4}}}, {"match": {"docnm_kwd": {"query": query, "boost": 3.0}}}, ], "minimum_should_match": 0, } }, } url = f"{ES_BASE}/{ES_INDEX}/_search" req = urllib.request.Request( url, data=json.dumps(body).encode(), method="POST", headers={ "Content-Type": "application/json", "Authorization": _basic_auth_header(), }, ) try: with urllib.request.urlopen(req, timeout=120) as resp: return json.load(resp) except urllib.error.HTTPError as e: err = e.read().decode(errors="replace")[:800] log.error("ES HTTP %s: %s", e.code, err) return {"_error": f"ES HTTP {e.code}: {err}"} except Exception as e: log.error("ES search error: %s", e) return {"_error": str(e)} def handle_rag_search(query: str, top_k: int = 8, **kw): if not query or not query.strip(): return "rag_search: query fehlt." top_k = max(MIN_TOP_K, min(int(top_k or 8), 10)) es_size = min(120, max(top_k * 12, 50)) data = _es_hybrid_search(query.strip(), es_size) if "_error" in data: return f"Fehler bei der Dokumentensuche: {data['_error']}" hits = (data.get("hits") or {}).get("hits") or [] if not hits: return f"Keine Ergebnisse fuer '{query}' in der Wissensbasis gefunden." seen_docs: set[str] = set() lines: list[str] = [] count = 0 for h in hits: if count >= top_k: break src = h.get("_source") or {} doc_name = src.get("docnm_kwd") or "?" dk = _dedup_key(doc_name) if dk in seen_docs: continue seen_docs.add(dk) score = h.get("_score") or 0.0 raw = src.get("content_with_weight") or src.get("content_de") or "" content = raw[:600].strip() ocr = _ocr_note(raw) folder = _folder_from_docname(doc_name) filename = doc_name.rsplit("__", 1)[-1] if "__" in doc_name else doc_name folder_line = f" Ordner: {folder}" if folder else "" lines.append(f"---\n**{count + 1}. {filename}** (Score: {score:.1f}){ocr}") if folder_line: lines.append(folder_line) if content: lines.append(f"```\n{content}\n```") count += 1 if count == 0: return f"Keine Dokumente fuer '{query}' gefunden." lines.insert(0, f"**{count} verschiedene Dokumente fuer '{query}':**\n") lines.append("\n---\n(Ende der Ergebnisse. Nur diese Dokumente wurden gefunden.)") return "\n".join(lines) HANDLERS = { "rag_search": handle_rag_search, }