homelab-brain/homelab-ai-bot/tools/rag.py
Homelab Cursor ae6a50d182 fix(rag+llm): 60 wide treffer, Pfad-Dedup, 100k tool payload
(Nachtrag: vorheriger Commit enthielt nur telegram_bot.)
2026-03-26 17:15:48 +01:00

412 lines
14 KiB
Python

"""RAG Dokumentensuche — Elasticsearch direkt (Hybrid: kNN + deutscher Text).
RAGFlow bleibt Ingestion; Suche geht direkt an ES (Issue #51).
"""
import base64
import json
import logging
import re
import urllib.error
import urllib.request
log = logging.getLogger("tools.rag")
ES_BASE = "http://100.109.101.12:1200"
ES_USER = "elastic"
ES_PASS = "infini_rag_flow"
ES_INDEX = "ragflow_61f51c8c279011f1a174bd19863ba33e"
KB_ID = "dc24edda27a311f19fe7fb811de6f016"
OLLAMA_EMBED_URL = "http://100.84.255.83:11434/api/embeddings"
EMBED_MODEL = "nomic-embed-text"
MIN_TOP_K = 5
# Breite Übersichten: mehr ES-Runden, mehr distinct Treffer (pro vollem Pfad docnm_kwd)
MAX_TOP_K_NORMAL = 25
MAX_TOP_K_WIDE = 60
ES_SIZE_CAP = 200
TOOLS = [
{
"type": "function",
"function": {
"name": "rag_search",
"description": (
"Durchsucht die private Dokumenten-Wissensbasis (>21.000 Dokumente: "
"Vertraege, Versicherungen, Rente, Finanzamt, Familiendokumente, "
"Anleitungen, Buecher, persoenliche Unterlagen). "
"Nutze dieses Tool wenn der User nach einem bestimmten Dokument, "
"Vertrag, Brief oder persoenlicher Information fragt. "
"Bei breiten Fragen ('welche Versicherungen', Jahreskosten, Listen) "
"top_k=15 oder hoeher setzen."
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": (
"Suchanfrage: Dokumentname, Thema oder Inhalt. Kurz und praezise, "
"z.B. 'Familienbuch Opa Oma' oder 'Grundsteuer Erklaerung 2024'"
),
},
"top_k": {
"type": "integer",
"description": "Anzahl Ergebnisse (5-25, Standard 10)",
"default": 10,
},
},
"required": ["query"],
},
},
},
]
SYSTEM_PROMPT_EXTRA = """RAG DOKUMENTENSUCHE — PFLICHT-REGELN:
Du hast Zugriff auf eine private Wissensbasis mit >21.000 Dokumenten (Vertraege, Versicherungen, Rente, Finanzamt, Familiendokumente, Anleitungen, Buecher, persoenliche Unterlagen, Arbeitsvertraege, Kindergeld, Reisepass, Personalausweis, KFZ, Mietvertraege, Bausparvertraege, Rechnungen).
WANN rag_search AUFRUFEN — IMMER bei diesen Fragen:
- "habe ich..." / "gibt es..." / "wo ist..." / "finde..." / "zeig mir..." + Dokument/Vertrag/Versicherung/Bescheid
- Jede Frage nach persoenlichen Unterlagen, Vertraegen, Versicherungen, Rechnungen, Bescheiden
- AUCH wenn du glaubst die Antwort zu kennen — das Gedaechtnis ist NICHT die Wissensbasis!
- AUCH wenn das Thema im Gedaechtnis steht — trotzdem rag_search aufrufen fuer vollstaendige Antwort
WANN NICHT: Nur bei reinen Homelab/IT-Fragen, Smalltalk, oder wenn der User explizit NICHT nach Dokumenten fragt.
SUCHANFRAGE: Kurze Keywords, KEINE ganzen Saetze. Beispiele:
- "Familienbuch" / "Grundsteuer Erklaerung" / "Haftpflicht" / "Kindergeld" / "Mietvertrag" / "Arbeitsvertrag" / "Reisepass"
ERGEBNISSE AUSWERTEN:
- Bei breiten Fragen ("welche Versicherungen", Jahreskosten, Listen): top_k=15-25, ALLE Treffer aus der Tool-Antwort abarbeiten
- Liste die gefundenen Dokumente mit Ordner und kurzem Inhalt auf
- ERFINDE KEINE Details die nicht im Ergebnis stehen
- Der Ordnerpfad (vor dem Dateinamen, getrennt durch __) zeigt die Kategorie
- Wenn rag_search Treffer liefert: IMMER auflisten, auch wenn Inhalt unvollstaendig
- Mehrere Treffer zur gleichen Versicherung/Gesellschaft: jede Sparte/Dokumentart separat nennen (Kfz, Rechtsschutz, Haftpflicht, Sach, Ausland, Kranken), mit Dateiname/Ordner
- Antworte NIEMALS "keine gefunden" oder "nicht gespeichert" OHNE vorher rag_search aufgerufen zu haben"""
def _basic_auth_header() -> str:
token = base64.b64encode(f"{ES_USER}:{ES_PASS}".encode()).decode()
return f"Basic {token}"
def _ollama_embed(text: str) -> list | None:
body = json.dumps({"model": EMBED_MODEL, "prompt": text}).encode()
req = urllib.request.Request(
OLLAMA_EMBED_URL,
data=body,
method="POST",
headers={"Content-Type": "application/json"},
)
try:
with urllib.request.urlopen(req, timeout=120) as resp:
data = json.load(resp)
emb = data.get("embedding")
if not emb:
return None
if len(emb) != 768:
log.warning("Unexpected embedding dimension %s", len(emb))
return emb
except Exception as e:
log.error("Ollama embed error: %s", e)
return None
def _ocr_note(text: str) -> str:
if not text or len(text) < 40:
return ""
non_alnum = sum(1 for c in text if not c.isalnum() and not c.isspace())
ratio = non_alnum / max(len(text), 1)
words = re.findall(r"\w+", text, re.UNICODE)
avg_len = (sum(len(w) for w in words) / len(words)) if words else 0.0
if ratio > 0.15 or avg_len < 2.0:
return " [OCR vermutlich schlecht]"
return ""
def _folder_from_docname(name: str) -> str:
"""Extrahiert den Ordnerpfad aus docnm_kwd (__ = Trenner)."""
parts = name.rsplit("__", 1)
if len(parts) == 2:
return parts[0].replace("__", " > ").replace("_", " ")
return ""
def _dedup_key(name: str) -> str:
"""Normalisiert Dokumentnamen fuer Deduplizierung.
Extrahiert nur den Dateinamen (nach letztem __), ignoriert
Dateiendung und Kopie-Marker wie (1), (2).
'Ordner__Foo(1).pdf' und 'Anderer__Foo.txt' werden als gleich behandelt.
"""
fname = name.rsplit("__", 1)[-1] if "__" in name else name
key = re.sub(r"\.(pdf|txt|docx?|xlsx?|csv|png|jpg|jpeg)$", "", fname, flags=re.IGNORECASE)
key = re.sub(r"\s*\(\d+\)\s*$", "", key).rstrip()
return key.lower()
def _dedup_key_full_doc(name: str) -> str:
"""Ein Chunk pro vollem docnm_kwd — gleicher Dateiname in verschiedenen Ordnern bleibt getrennt."""
return re.sub(r"\s+", " ", (name or "").strip().lower())
def _es_hybrid_search(query: str, es_size: int) -> dict:
qvec = _ollama_embed(query)
if not qvec:
return {"_error": "Embedding fehlgeschlagen (Ollama nicht erreichbar?)."}
es_size = min(ES_SIZE_CAP, max(es_size, 20))
kb_filter = {"term": {"kb_id": KB_ID}}
body = {
"size": es_size,
"knn": {
"field": "q_768_vec",
"query_vector": qvec,
"k": es_size,
"num_candidates": min(500, max(es_size * 5, 150)),
"filter": [kb_filter],
},
"query": {
"bool": {
"filter": [kb_filter],
"should": [
{"match": {"content_de": {"query": query, "boost": 2.0}}},
{"match": {"content_ltks": {"query": query.lower(), "boost": 0.4}}},
{"match": {"docnm_kwd": {"query": query, "boost": 3.0}}},
],
"minimum_should_match": 0,
}
},
}
url = f"{ES_BASE}/{ES_INDEX}/_search"
req = urllib.request.Request(
url,
data=json.dumps(body).encode(),
method="POST",
headers={
"Content-Type": "application/json",
"Authorization": _basic_auth_header(),
},
)
try:
with urllib.request.urlopen(req, timeout=120) as resp:
return json.load(resp)
except urllib.error.HTTPError as e:
err = e.read().decode(errors="replace")[:800]
log.error("ES HTTP %s: %s", e.code, err)
return {"_error": f"ES HTTP {e.code}: {err}"}
except Exception as e:
log.error("ES search error: %s", e)
return {"_error": str(e)}
def _is_wide_recall_query(q: str) -> bool:
"""Übersichts-/Listen-/Kostenfragen: mehrfach suchen und mergen."""
ql = (q or "").lower()
if any(x in ql for x in ("welche versicherung", "alle versicherung", "versicherungen habe")):
return True
if "versicherung" in ql and any(
x in ql
for x in (
"welche",
"alle",
"liste",
"überblick",
"ueberblick",
"kosten",
"beitrag",
"jähr",
"jaehr",
"jahres",
"gesamt",
"summe",
"übersicht",
"uebersicht",
)
):
return True
costish = any(
x in ql
for x in (
"kosten",
"beitrag",
"beiträge",
"beitraege",
"eur",
"euro",
"jähr",
"jaehr",
"jahreskosten",
"prämie",
"praemie",
)
)
broad = any(
x in ql
for x in (
"liste",
"übersicht",
"uebersicht",
"alle",
"gesamt",
"summe",
"jährlich",
"jaehrlich",
)
)
return costish and broad
# Zusatzanfragen decken Sparten + Gesellschaften ab (Recall)
_WIDE_SUBQUERIES = [
"Versicherung Beitragsrechnung Jahresbeitrag",
"Wohngebäudeversicherung Gebäude Beitrag",
"Hausratversicherung Beitrag Ergo",
"Haftpflichtversicherung Beitrag GARANTA",
"Kfz Kasko Haftpflicht Beitragsrechnung",
"Rechtsschutzversicherung Beitrag",
"Lebensversicherung Beitrag",
"Krankenversicherung PKV Beitrag",
"Sachversicherung LVM Beitrag",
"LVM AutoPlus Versicherungsschein",
"Allianz Versicherung Police",
"Nürnberger Versicherung Beitrag",
"Ergo Versicherung Police",
"Unfallversicherung Berufsunfähigkeit",
"Bausparvertrag Bauspar",
]
def _merge_hits_from_queries(
queries: list[str],
es_size: int,
pool_cap: int,
*,
full_path_dedup: bool = False,
) -> tuple[list, str | None]:
"""Führt mehrere Hybrid-Suchen aus; pro Dedup-Key höchster Score."""
best: dict[str, dict] = {}
last_err: str | None = None
def dkey(dn: str) -> str:
return _dedup_key_full_doc(dn) if full_path_dedup else _dedup_key(dn)
def absorb(hits: list) -> None:
for h in hits:
src = h.get("_source") or {}
dn = src.get("docnm_kwd") or "?"
dk = dkey(dn)
sc = float(h.get("_score") or 0.0)
old = best.get(dk)
if old is None or sc > float(old.get("_score") or 0.0):
best[dk] = h
for q in queries:
q = (q or "").strip()
if not q:
continue
data = _es_hybrid_search(q, es_size)
if "_error" in data:
last_err = str(data["_error"])
log.warning("wide_recall subquery fail %s: %s", q[:40], last_err)
continue
absorb((data.get("hits") or {}).get("hits") or [])
merged = sorted(best.values(), key=lambda h: float(h.get("_score") or 0.0), reverse=True)
return merged[:pool_cap], last_err
def handle_rag_search(query: str, top_k: int = 8, **kw):
if not query or not query.strip():
return "rag_search: query fehlt."
qstrip = query.strip()
wide = _is_wide_recall_query(qstrip)
cap = MAX_TOP_K_WIDE if wide else MAX_TOP_K_NORMAL
top_k = max(MIN_TOP_K, min(int(top_k or 10), cap))
es_size = min(ES_SIZE_CAP, max(top_k * 10, 70))
if wide:
subqs = [qstrip]
for sq in _WIDE_SUBQUERIES:
if sq.lower() not in qstrip.lower():
subqs.append(sq)
pool_cap = max(top_k * 5, 120)
hits, err = _merge_hits_from_queries(
subqs[:22],
es_size,
pool_cap=pool_cap,
full_path_dedup=True,
)
if err and not hits:
return f"Fehler bei der Dokumentensuche: {err}"
header = (
f"**Breitensuche ({len(subqs[:22])} Anfragen, Dedup=voller Pfad) '{qstrip}'"
f"{len(hits)} Kandidaten, zeige bis {top_k}:**\n"
)
snip_len = 400
else:
data = _es_hybrid_search(qstrip, es_size)
if "_error" in data:
return f"Fehler bei der Dokumentensuche: {data['_error']}"
hits = (data.get("hits") or {}).get("hits") or []
header = f"**Dokumente fuer '{qstrip}' (bis {top_k}):**\n"
snip_len = 650
if not hits:
return f"Keine Ergebnisse fuer '{qstrip}' in der Wissensbasis gefunden."
seen_docs: set[str] = set()
lines: list[str] = []
count = 0
def out_dkey(doc_name: str) -> str:
return _dedup_key_full_doc(doc_name) if wide else _dedup_key(doc_name)
for h in hits:
if count >= top_k:
break
src = h.get("_source") or {}
doc_name = src.get("docnm_kwd") or "?"
dk = out_dkey(doc_name)
if dk in seen_docs:
continue
seen_docs.add(dk)
score = h.get("_score") or 0.0
raw = src.get("content_with_weight") or src.get("content_de") or ""
content = raw[:snip_len].strip()
ocr = _ocr_note(raw)
folder = _folder_from_docname(doc_name)
filename = doc_name.rsplit("__", 1)[-1] if "__" in doc_name else doc_name
folder_line = f" Ordner: {folder}" if folder else ""
lines.append(f"---\n**{count + 1}. {filename}** (Score: {score:.1f}){ocr}")
if folder_line:
lines.append(folder_line)
if content:
lines.append(f"```\n{content}\n```")
count += 1
if count == 0:
return f"Keine Dokumente fuer '{qstrip}' gefunden."
lines.insert(0, header)
tail = (
"\n---\n(Ende der Ergebnisse. Nur diese Dokumente in dieser Runde. "
+ (
"Bei Summen/Zahlen: alle Treffer prüfen; OCR kann unvollständig sein."
if wide
else ""
)
+ ")"
)
lines.append(tail)
return "\n".join(lines)
HANDLERS = {
"rag_search": handle_rag_search,
}