homelab-brain/homelab-ai-bot/tools/rag.py
Homelab Cursor f9b69ad283 rag: Elasticsearch direkt (Hybrid kNN + deutsch) statt RAGFlow API
- ES 100.109.101.12:1200, Filter kb_id, knn auf q_768_vec
- Query-Embedding via Ollama nomic-embed-text
- Text: content_de, content_ltks, docnm_kwd
- OCR-Heuristik, Deduplizierung nach docnm_kwd
- Ref: Issue #51
2026-03-26 14:34:40 +01:00

204 lines
6.7 KiB
Python

"""RAG Dokumentensuche — Elasticsearch direkt (Hybrid: kNN + deutscher Text).
RAGFlow bleibt Ingestion; Suche geht direkt an ES (Issue #51).
"""
import base64
import json
import logging
import re
import urllib.error
import urllib.request
log = logging.getLogger("tools.rag")
ES_BASE = "http://100.109.101.12:1200"
ES_USER = "elastic"
ES_PASS = "infini_rag_flow"
ES_INDEX = "ragflow_61f51c8c279011f1a174bd19863ba33e"
KB_ID = "dc24edda27a311f19fe7fb811de6f016"
OLLAMA_EMBED_URL = "http://100.84.255.83:11434/api/embeddings"
EMBED_MODEL = "nomic-embed-text"
TOOLS = [
{
"type": "function",
"function": {
"name": "rag_search",
"description": (
"Durchsucht die private Dokumenten-Wissensbasis (>21.000 Dokumente: "
"Vertraege, Versicherungen, Rente, Finanzamt, Familiendokumente, "
"Anleitungen, Buecher, persoenliche Unterlagen). "
"Nutze dieses Tool wenn der User nach einem bestimmten Dokument, "
"Vertrag, Brief oder persoenlicher Information fragt."
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": (
"Suchanfrage: Dokumentname, Thema oder Inhalt. Kurz und praezise, "
"z.B. 'Familienbuch Opa Oma' oder 'Grundsteuer Erklaerung 2024'"
),
},
"top_k": {
"type": "integer",
"description": "Anzahl Ergebnisse (1-10)",
"default": 5,
},
},
"required": ["query"],
},
},
},
]
SYSTEM_PROMPT_EXTRA = """RAG DOKUMENTENSUCHE:
Du hast Zugriff auf eine private Wissensbasis mit >21.000 Dokumenten (Vertraege, Versicherungen, Rente, Finanzamt, Familiendokumente, Anleitungen, Buecher, persoenliche Unterlagen).
Nutze rag_search wenn der User nach Dokumenten, Vertraegen, persoenlichen Unterlagen oder Informationen aus seinen Dateien fragt.
Die Suchanfrage sollte kurze Keywords sein, KEINE ganzen Saetze. Beispiele:
- "Familienbuch Opa Oma"
- "Grundsteuer Erklaerung"
- "Nuernberger Versicherung"
- "Allianz Beitraege"
Bei schlechten Ergebnissen: andere Keywords versuchen oder Dokumentnamen direkt suchen."""
def _basic_auth_header() -> str:
token = base64.b64encode(f"{ES_USER}:{ES_PASS}".encode()).decode()
return f"Basic {token}"
def _ollama_embed(text: str) -> list | None:
body = json.dumps({"model": EMBED_MODEL, "prompt": text}).encode()
req = urllib.request.Request(
OLLAMA_EMBED_URL,
data=body,
method="POST",
headers={"Content-Type": "application/json"},
)
try:
with urllib.request.urlopen(req, timeout=120) as resp:
data = json.load(resp)
emb = data.get("embedding")
if not emb:
return None
if len(emb) != 768:
log.warning("Unexpected embedding dimension %s", len(emb))
return emb
except Exception as e:
log.error("Ollama embed error: %s", e)
return None
def _ocr_note(text: str) -> str:
if not text or len(text) < 40:
return ""
non_alnum = sum(1 for c in text if not c.isalnum() and not c.isspace())
ratio = non_alnum / max(len(text), 1)
words = re.findall(r"\w+", text, re.UNICODE)
avg_len = (sum(len(w) for w in words) / len(words)) if words else 0.0
if ratio > 0.15 or avg_len < 2.0:
return " [OCR vermutlich schlecht]"
return ""
def _es_hybrid_search(query: str, es_size: int) -> dict:
qvec = _ollama_embed(query)
if not qvec:
return {"_error": "Embedding fehlgeschlagen (Ollama nicht erreichbar?)."}
kb_filter = {"term": {"kb_id": KB_ID}}
body = {
"size": es_size,
"knn": {
"field": "q_768_vec",
"query_vector": qvec,
"k": es_size,
"num_candidates": min(500, max(es_size * 5, 120)),
"filter": [kb_filter],
},
"query": {
"bool": {
"filter": [kb_filter],
"should": [
{"match": {"content_de": {"query": query, "boost": 2.0}}},
{"match": {"content_ltks": {"query": query.lower(), "boost": 0.4}}},
{"match": {"docnm_kwd": {"query": query, "boost": 1.5}}},
],
"minimum_should_match": 0,
}
},
}
url = f"{ES_BASE}/{ES_INDEX}/_search"
req = urllib.request.Request(
url,
data=json.dumps(body).encode(),
method="POST",
headers={
"Content-Type": "application/json",
"Authorization": _basic_auth_header(),
},
)
try:
with urllib.request.urlopen(req, timeout=120) as resp:
return json.load(resp)
except urllib.error.HTTPError as e:
err = e.read().decode(errors="replace")[:800]
log.error("ES HTTP %s: %s", e.code, err)
return {"_error": f"ES HTTP {e.code}: {err}"}
except Exception as e:
log.error("ES search error: %s", e)
return {"_error": str(e)}
def handle_rag_search(query: str, top_k: int = 5, **kw):
if not query or not query.strip():
return "rag_search: query fehlt."
top_k = max(1, min(int(top_k or 5), 10))
es_size = min(100, max(top_k * 12, 35))
data = _es_hybrid_search(query.strip(), es_size)
if "_error" in data:
return f"Fehler bei der Dokumentensuche: {data['_error']}"
hits = (data.get("hits") or {}).get("hits") or []
if not hits:
return f"Keine Ergebnisse fuer '{query}' in der Wissensbasis gefunden."
seen_docs: set[str] = set()
lines: list[str] = []
lines.append(f"**{len(hits)} Roh-Treffer fuer '{query}'** (Top {top_k} Dokumente):\n")
count = 0
for h in hits:
if count >= top_k:
break
src = h.get("_source") or {}
doc_name = src.get("docnm_kwd") or "?"
doc_key = str(doc_name)
if doc_key in seen_docs:
continue
seen_docs.add(doc_key)
score = h.get("_score") or 0.0
raw = src.get("content_with_weight") or src.get("content_de") or ""
content = raw[:400].strip()
ocr = _ocr_note(raw)
lines.append(f"---\n**{count + 1}. {doc_name}** (Score: {score:.3f}){ocr}")
if content:
lines.append(f"```\n{content}\n```")
count += 1
if count == 0:
return f"Keine eindeutigen Dokumente fuer '{query}' (nach Deduplizierung)."
return "\n".join(lines)
HANDLERS = {
"rag_search": handle_rag_search,
}