501 lines
17 KiB
Python
501 lines
17 KiB
Python
"""RAG Dokumentensuche — Elasticsearch direkt (Hybrid: kNN + deutscher Text).
|
|
|
|
RAGFlow bleibt Ingestion; Suche geht direkt an ES (Issue #51).
|
|
"""
|
|
|
|
import base64
|
|
import json
|
|
import logging
|
|
import re
|
|
import urllib.error
|
|
import urllib.request
|
|
|
|
log = logging.getLogger("tools.rag")
|
|
|
|
ES_BASE = "http://100.109.101.12:1200"
|
|
ES_USER = "elastic"
|
|
ES_PASS = "infini_rag_flow"
|
|
ES_INDEX = "ragflow_61f51c8c279011f1a174bd19863ba33e"
|
|
KB_ID = "dc24edda27a311f19fe7fb811de6f016"
|
|
OLLAMA_EMBED_URL = "http://100.84.255.83:11434/api/embeddings"
|
|
EMBED_MODEL = "nomic-embed-text"
|
|
|
|
# Cross-Encoder Reranking (CT 123, pve-hetzner LAN)
|
|
RERANKER_URL = "http://10.10.10.123:8099"
|
|
RERANK_CANDIDATES = 15
|
|
RERANK_TIMEOUT = 45
|
|
RERANK_SNIPPET_CHARS = 512
|
|
|
|
MIN_TOP_K = 5
|
|
# Breite Übersichten: mehr ES-Runden, mehr distinct Treffer (pro vollem Pfad docnm_kwd)
|
|
MAX_TOP_K_NORMAL = 25
|
|
MAX_TOP_K_WIDE = 60
|
|
ES_SIZE_CAP = 200
|
|
|
|
TOOLS = [
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "rag_search",
|
|
"description": (
|
|
"Durchsucht die private Dokumenten-Wissensbasis (>21.000 Dokumente: "
|
|
"Vertraege, Versicherungen, Rente, Finanzamt, Familiendokumente, "
|
|
"Anleitungen, Buecher, persoenliche Unterlagen). "
|
|
"Nutze dieses Tool wenn der User nach einem bestimmten Dokument, "
|
|
"Vertrag, Brief oder persoenlicher Information fragt. "
|
|
"Bei breiten Fragen ('welche Versicherungen', Jahreskosten, Listen) "
|
|
"top_k=15 oder hoeher setzen."
|
|
),
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"query": {
|
|
"type": "string",
|
|
"description": (
|
|
"Suchanfrage: Dokumentname, Thema oder Inhalt. Kurz und praezise, "
|
|
"z.B. 'Familienbuch Opa Oma' oder 'Grundsteuer Erklaerung 2024'"
|
|
),
|
|
},
|
|
"top_k": {
|
|
"type": "integer",
|
|
"description": "Anzahl Ergebnisse (5-25, Standard 10)",
|
|
"default": 10,
|
|
},
|
|
},
|
|
"required": ["query"],
|
|
},
|
|
},
|
|
},
|
|
]
|
|
|
|
SYSTEM_PROMPT_EXTRA = """RAG DOKUMENTENSUCHE — PFLICHT-REGELN:
|
|
Du hast Zugriff auf eine private Wissensbasis mit >21.000 Dokumenten (Vertraege, Versicherungen, Rente, Finanzamt, Familiendokumente, Anleitungen, Buecher, persoenliche Unterlagen, Arbeitsvertraege, Kindergeld, Reisepass, Personalausweis, KFZ, Mietvertraege, Bausparvertraege, Rechnungen).
|
|
|
|
WANN rag_search AUFRUFEN — IMMER bei diesen Fragen:
|
|
- "habe ich..." / "gibt es..." / "wo ist..." / "finde..." / "zeig mir..." + Dokument/Vertrag/Versicherung/Bescheid
|
|
- Jede Frage nach persoenlichen Unterlagen, Vertraegen, Versicherungen, Rechnungen, Bescheiden
|
|
- AUCH wenn du glaubst die Antwort zu kennen — das Gedaechtnis ist NICHT die Wissensbasis!
|
|
- AUCH wenn das Thema im Gedaechtnis steht — trotzdem rag_search aufrufen fuer vollstaendige Antwort
|
|
|
|
WANN NICHT: Nur bei reinen Homelab/IT-Fragen, Smalltalk, oder wenn der User explizit NICHT nach Dokumenten fragt.
|
|
|
|
SUCHANFRAGE: Kurze Keywords, KEINE ganzen Saetze. Beispiele:
|
|
- "Familienbuch" / "Grundsteuer Erklaerung" / "Haftpflicht" / "Kindergeld" / "Mietvertrag" / "Arbeitsvertrag" / "Reisepass"
|
|
|
|
ERGEBNISSE AUSWERTEN:
|
|
- Bei breiten Fragen ("welche Versicherungen", Jahreskosten, Listen): top_k=15-25, ALLE Treffer aus der Tool-Antwort abarbeiten
|
|
- Liste die gefundenen Dokumente mit Ordner und kurzem Inhalt auf
|
|
- ERFINDE KEINE Details die nicht im Ergebnis stehen
|
|
- Der Ordnerpfad (vor dem Dateinamen, getrennt durch __) zeigt die Kategorie
|
|
- Wenn rag_search Treffer liefert: IMMER auflisten, auch wenn Inhalt unvollstaendig
|
|
- Mehrere Treffer zur gleichen Versicherung/Gesellschaft: jede Sparte/Dokumentart separat nennen (Kfz, Rechtsschutz, Haftpflicht, Sach, Ausland, Kranken), mit Dateiname/Ordner
|
|
- Antworte NIEMALS "keine gefunden" oder "nicht gespeichert" OHNE vorher rag_search aufgerufen zu haben"""
|
|
|
|
|
|
def _basic_auth_header() -> str:
|
|
token = base64.b64encode(f"{ES_USER}:{ES_PASS}".encode()).decode()
|
|
return f"Basic {token}"
|
|
|
|
|
|
def _ollama_embed(text: str) -> list | None:
|
|
body = json.dumps({"model": EMBED_MODEL, "prompt": text}).encode()
|
|
req = urllib.request.Request(
|
|
OLLAMA_EMBED_URL,
|
|
data=body,
|
|
method="POST",
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=120) as resp:
|
|
data = json.load(resp)
|
|
emb = data.get("embedding")
|
|
if not emb:
|
|
return None
|
|
if len(emb) != 768:
|
|
log.warning("Unexpected embedding dimension %s", len(emb))
|
|
return emb
|
|
except Exception as e:
|
|
log.error("Ollama embed error: %s", e)
|
|
return None
|
|
|
|
|
|
def _ocr_note(text: str) -> str:
|
|
if not text or len(text) < 40:
|
|
return ""
|
|
non_alnum = sum(1 for c in text if not c.isalnum() and not c.isspace())
|
|
ratio = non_alnum / max(len(text), 1)
|
|
words = re.findall(r"\w+", text, re.UNICODE)
|
|
avg_len = (sum(len(w) for w in words) / len(words)) if words else 0.0
|
|
if ratio > 0.15 or avg_len < 2.0:
|
|
return " [OCR vermutlich schlecht]"
|
|
return ""
|
|
|
|
|
|
def _folder_from_docname(name: str) -> str:
|
|
"""Extrahiert den Ordnerpfad aus docnm_kwd (__ = Trenner)."""
|
|
parts = name.rsplit("__", 1)
|
|
if len(parts) == 2:
|
|
return parts[0].replace("__", " > ").replace("_", " ")
|
|
return ""
|
|
|
|
|
|
def _dedup_key(name: str) -> str:
|
|
"""Normalisiert Dokumentnamen fuer Deduplizierung.
|
|
|
|
Extrahiert nur den Dateinamen (nach letztem __), ignoriert
|
|
Dateiendung und Kopie-Marker wie (1), (2).
|
|
'Ordner__Foo(1).pdf' und 'Anderer__Foo.txt' werden als gleich behandelt.
|
|
"""
|
|
fname = name.rsplit("__", 1)[-1] if "__" in name else name
|
|
key = re.sub(r"\.(pdf|txt|docx?|xlsx?|csv|png|jpg|jpeg)$", "", fname, flags=re.IGNORECASE)
|
|
key = re.sub(r"\s*\(\d+\)\s*$", "", key).rstrip()
|
|
return key.lower()
|
|
|
|
|
|
def _dedup_key_full_doc(name: str) -> str:
|
|
"""Ein Chunk pro vollem docnm_kwd — gleicher Dateiname in verschiedenen Ordnern bleibt getrennt."""
|
|
return re.sub(r"\s+", " ", (name or "").strip().lower())
|
|
|
|
|
|
def _es_hybrid_search(query: str, es_size: int) -> dict:
|
|
qvec = _ollama_embed(query)
|
|
if not qvec:
|
|
return {"_error": "Embedding fehlgeschlagen (Ollama nicht erreichbar?)."}
|
|
|
|
es_size = min(ES_SIZE_CAP, max(es_size, 20))
|
|
kb_filter = {"term": {"kb_id": KB_ID}}
|
|
body = {
|
|
"size": es_size,
|
|
"knn": {
|
|
"field": "q_768_vec",
|
|
"query_vector": qvec,
|
|
"k": es_size,
|
|
"num_candidates": min(500, max(es_size * 5, 150)),
|
|
"filter": [kb_filter],
|
|
},
|
|
"query": {
|
|
"bool": {
|
|
"filter": [kb_filter],
|
|
"should": [
|
|
{"match": {"content_de": {"query": query, "boost": 2.0}}},
|
|
{"match": {"content_ltks": {"query": query.lower(), "boost": 0.4}}},
|
|
{"match": {"docnm_kwd": {"query": query, "boost": 3.0}}},
|
|
],
|
|
"minimum_should_match": 0,
|
|
}
|
|
},
|
|
}
|
|
url = f"{ES_BASE}/{ES_INDEX}/_search"
|
|
req = urllib.request.Request(
|
|
url,
|
|
data=json.dumps(body).encode(),
|
|
method="POST",
|
|
headers={
|
|
"Content-Type": "application/json",
|
|
"Authorization": _basic_auth_header(),
|
|
},
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=120) as resp:
|
|
return json.load(resp)
|
|
except urllib.error.HTTPError as e:
|
|
err = e.read().decode(errors="replace")[:800]
|
|
log.error("ES HTTP %s: %s", e.code, err)
|
|
return {"_error": f"ES HTTP {e.code}: {err}"}
|
|
except Exception as e:
|
|
log.error("ES search error: %s", e)
|
|
return {"_error": str(e)}
|
|
|
|
|
|
def _snippet_for_rerank(src: dict) -> str:
|
|
doc_name = src.get("docnm_kwd") or ""
|
|
raw = src.get("content_with_weight") or src.get("content_de") or ""
|
|
prefix = doc_name[:120] + "\n" if doc_name else ""
|
|
return prefix + raw[:RERANK_SNIPPET_CHARS]
|
|
|
|
|
|
def _rerank_hits(query: str, hits: list) -> tuple[list, bool]:
|
|
"""Rerankt mit Cross-Encoder, kombiniert Score mit ES-Rang (RRF)."""
|
|
if not hits or not RERANKER_URL:
|
|
return hits, False
|
|
to_score = hits[:RERANK_CANDIDATES]
|
|
docs = []
|
|
for h in to_score:
|
|
src = h.get("_source") or {}
|
|
docs.append(_snippet_for_rerank(src))
|
|
if not any((d or "").strip() for d in docs):
|
|
return hits, False
|
|
body = json.dumps({"query": query, "documents": docs}).encode()
|
|
url = f"{RERANKER_URL.rstrip('/')}/rerank"
|
|
req = urllib.request.Request(
|
|
url,
|
|
data=body,
|
|
method="POST",
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=RERANK_TIMEOUT) as resp:
|
|
data = json.load(resp)
|
|
scores = data.get("scores") or []
|
|
if len(scores) != len(to_score):
|
|
log.warning(
|
|
"rerank score count mismatch: %s vs %s",
|
|
len(scores),
|
|
len(to_score),
|
|
)
|
|
return hits, False
|
|
|
|
k = 60
|
|
rr_ranked = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)
|
|
rr_rank_map = {i: rank + 1 for rank, i in enumerate(rr_ranked)}
|
|
|
|
combined: list[tuple[float, int]] = []
|
|
for idx in range(len(to_score)):
|
|
es_rank = idx + 1
|
|
rr_rank = rr_rank_map[idx]
|
|
rrf = 1.0 / (k + es_rank) + 1.0 / (k + rr_rank)
|
|
combined.append((rrf, idx))
|
|
combined.sort(key=lambda x: x[0], reverse=True)
|
|
|
|
new_order: list = []
|
|
for rrf, idx in combined:
|
|
h = to_score[idx]
|
|
h["_rerank_score"] = float(scores[idx])
|
|
h["_rrf_score"] = float(rrf)
|
|
new_order.append(h)
|
|
rest = hits[RERANK_CANDIDATES:]
|
|
return new_order + rest, True
|
|
except Exception as e:
|
|
log.warning("rerank failed (fallback to ES): %s", e)
|
|
return hits, False
|
|
|
|
|
|
def _is_wide_recall_query(q: str) -> bool:
|
|
"""Übersichts-/Listen-/Kostenfragen: mehrfach suchen und mergen."""
|
|
ql = (q or "").lower()
|
|
if any(x in ql for x in ("welche versicherung", "alle versicherung", "versicherungen habe")):
|
|
return True
|
|
if "versicherung" in ql and any(
|
|
x in ql
|
|
for x in (
|
|
"welche",
|
|
"alle",
|
|
"liste",
|
|
"überblick",
|
|
"ueberblick",
|
|
"kosten",
|
|
"beitrag",
|
|
"jähr",
|
|
"jaehr",
|
|
"jahres",
|
|
"gesamt",
|
|
"summe",
|
|
"übersicht",
|
|
"uebersicht",
|
|
)
|
|
):
|
|
return True
|
|
costish = any(
|
|
x in ql
|
|
for x in (
|
|
"kosten",
|
|
"kostet",
|
|
"wie viel",
|
|
"wieviel",
|
|
"beitrag",
|
|
"beiträge",
|
|
"beitraege",
|
|
"eur",
|
|
"euro",
|
|
"jähr",
|
|
"jaehr",
|
|
"jahreskosten",
|
|
"prämie",
|
|
"praemie",
|
|
)
|
|
)
|
|
broad = any(
|
|
x in ql
|
|
for x in (
|
|
"liste",
|
|
"übersicht",
|
|
"uebersicht",
|
|
"alle",
|
|
"gesamt",
|
|
"summe",
|
|
"jährlich",
|
|
"jaehrlich",
|
|
)
|
|
)
|
|
return costish and broad
|
|
|
|
|
|
# Zusatzanfragen decken Sparten + Gesellschaften ab (Recall)
|
|
_WIDE_SUBQUERIES = [
|
|
"Versicherung Beitragsrechnung Jahresbeitrag",
|
|
"Wohngebäudeversicherung Gebäude Beitrag",
|
|
"Hausratversicherung Beitrag Ergo",
|
|
"Haftpflichtversicherung Beitrag GARANTA",
|
|
"Kfz Kasko Haftpflicht Beitragsrechnung",
|
|
"Rechtsschutzversicherung Beitrag",
|
|
"Lebensversicherung Beitrag",
|
|
"Krankenversicherung PKV Beitrag",
|
|
"Sachversicherung LVM Beitrag",
|
|
"LVM AutoPlus Versicherungsschein",
|
|
"Allianz Versicherung Police",
|
|
"Nürnberger Versicherung Beitrag",
|
|
"Ergo Versicherung Police",
|
|
"Unfallversicherung Berufsunfähigkeit",
|
|
"Bausparvertrag Bauspar",
|
|
"Ford Transit Nutzfahrzeug Versicherung",
|
|
"Kfz Versicherungsschein Beitrag jährlich",
|
|
]
|
|
|
|
|
|
def _merge_hits_from_queries(
|
|
queries: list[str],
|
|
es_size: int,
|
|
pool_cap: int,
|
|
*,
|
|
full_path_dedup: bool = False,
|
|
) -> tuple[list, str | None]:
|
|
"""Führt mehrere Hybrid-Suchen aus; pro Dedup-Key höchster Score."""
|
|
best: dict[str, dict] = {}
|
|
last_err: str | None = None
|
|
|
|
def dkey(dn: str) -> str:
|
|
return _dedup_key_full_doc(dn) if full_path_dedup else _dedup_key(dn)
|
|
|
|
def absorb(hits: list) -> None:
|
|
for h in hits:
|
|
src = h.get("_source") or {}
|
|
dn = src.get("docnm_kwd") or "?"
|
|
dk = dkey(dn)
|
|
sc = float(h.get("_score") or 0.0)
|
|
old = best.get(dk)
|
|
if old is None or sc > float(old.get("_score") or 0.0):
|
|
best[dk] = h
|
|
|
|
for q in queries:
|
|
q = (q or "").strip()
|
|
if not q:
|
|
continue
|
|
data = _es_hybrid_search(q, es_size)
|
|
if "_error" in data:
|
|
last_err = str(data["_error"])
|
|
log.warning("wide_recall subquery fail %s: %s", q[:40], last_err)
|
|
continue
|
|
absorb((data.get("hits") or {}).get("hits") or [])
|
|
|
|
merged = sorted(best.values(), key=lambda h: float(h.get("_score") or 0.0), reverse=True)
|
|
return merged[:pool_cap], last_err
|
|
|
|
|
|
def handle_rag_search(query: str, top_k: int = 8, **kw):
|
|
if not query or not query.strip():
|
|
return "rag_search: query fehlt."
|
|
|
|
qstrip = query.strip()
|
|
wide = _is_wide_recall_query(qstrip)
|
|
cap = MAX_TOP_K_WIDE if wide else MAX_TOP_K_NORMAL
|
|
top_k = max(MIN_TOP_K, min(int(top_k or 10), cap))
|
|
es_size = min(ES_SIZE_CAP, max(top_k * 10, 70))
|
|
|
|
if wide:
|
|
subqs = [qstrip]
|
|
for sq in _WIDE_SUBQUERIES:
|
|
if sq.lower() not in qstrip.lower():
|
|
subqs.append(sq)
|
|
pool_cap = max(top_k * 5, 120)
|
|
hits, err = _merge_hits_from_queries(
|
|
subqs[:22],
|
|
es_size,
|
|
pool_cap=pool_cap,
|
|
full_path_dedup=True,
|
|
)
|
|
if err and not hits:
|
|
return f"Fehler bei der Dokumentensuche: {err}"
|
|
header = (
|
|
f"**Breitensuche ({len(subqs[:22])} Anfragen, Dedup=voller Pfad) '{qstrip}' — "
|
|
f"{len(hits)} Kandidaten, zeige bis {top_k}:**\n"
|
|
)
|
|
snip_len = 400
|
|
else:
|
|
data = _es_hybrid_search(qstrip, es_size)
|
|
if "_error" in data:
|
|
return f"Fehler bei der Dokumentensuche: {data['_error']}"
|
|
hits = (data.get("hits") or {}).get("hits") or []
|
|
header = f"**Dokumente fuer '{qstrip}' (bis {top_k}):**\n"
|
|
snip_len = 650
|
|
|
|
if not hits:
|
|
return f"Keine Ergebnisse fuer '{qstrip}' in der Wissensbasis gefunden."
|
|
|
|
hits, reranked = _rerank_hits(qstrip, hits)
|
|
|
|
seen_docs: set[str] = set()
|
|
lines: list[str] = []
|
|
count = 0
|
|
|
|
def out_dkey(doc_name: str) -> str:
|
|
return _dedup_key_full_doc(doc_name) if wide else _dedup_key(doc_name)
|
|
|
|
for h in hits:
|
|
if count >= top_k:
|
|
break
|
|
src = h.get("_source") or {}
|
|
doc_name = src.get("docnm_kwd") or "?"
|
|
dk = out_dkey(doc_name)
|
|
if dk in seen_docs:
|
|
continue
|
|
seen_docs.add(dk)
|
|
|
|
if "_rrf_score" in h:
|
|
score = float(h["_rrf_score"])
|
|
score_label = "RRF"
|
|
elif "_rerank_score" in h:
|
|
score = float(h["_rerank_score"])
|
|
score_label = "Rerank"
|
|
else:
|
|
score = float(h.get("_score") or 0.0)
|
|
score_label = "ES"
|
|
raw = src.get("content_with_weight") or src.get("content_de") or ""
|
|
content = raw[:snip_len].strip()
|
|
ocr = _ocr_note(raw)
|
|
folder = _folder_from_docname(doc_name)
|
|
filename = doc_name.rsplit("__", 1)[-1] if "__" in doc_name else doc_name
|
|
folder_line = f" Ordner: {folder}" if folder else ""
|
|
|
|
lines.append(
|
|
f"---\n**{count + 1}. {filename}** ({score_label}: {score:.3f}){ocr}"
|
|
)
|
|
if folder_line:
|
|
lines.append(folder_line)
|
|
if content:
|
|
lines.append(f"```\n{content}\n```")
|
|
count += 1
|
|
|
|
if count == 0:
|
|
return f"Keine Dokumente fuer '{qstrip}' gefunden."
|
|
|
|
hdr = header.rstrip() + (
|
|
" _(Cross-Encoder reranked)_" if reranked else ""
|
|
) + "\n"
|
|
lines.insert(0, hdr)
|
|
tail = (
|
|
"\n---\n(Ende der Ergebnisse. Nur diese Dokumente in dieser Runde. "
|
|
+ (
|
|
"Bei Summen/Zahlen: alle Treffer prüfen; OCR kann unvollständig sein."
|
|
if wide
|
|
else ""
|
|
)
|
|
+ ")"
|
|
)
|
|
lines.append(tail)
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
HANDLERS = {
|
|
"rag_search": handle_rag_search,
|
|
}
|