From b091649e6a4f611269ee828dd51c8a01157c442f Mon Sep 17 00:00:00 2001 From: Orbitalo Date: Sat, 11 Apr 2026 05:56:01 +0000 Subject: [PATCH] fix: vision_preis_lokal von Ollama (qwen3-vl:32b) auf OpenRouter (gpt-4o-mini) umgestellt GPU auf KI-Server wird fuer Coding-Agent (Qwen3-Coder-Next) benoetigt. Roundtrip-Erkennung und Sidebar-Artefakt-Filter bleiben funktional. Inkl. bisher uncommittete Aenderungen (Vision-Pipeline, Flex-Scans, Worker-Updates). Ref: Issue #75 Phase 1 --- hub/src/scheduler.py | 212 ++++++++++++++++++++++++++++++++++++++++++- hub/src/web.py | 20 +++- node/src/agent.py | 2 +- node/src/worker.py | 170 +++++++++++++++++++++++++++++----- 4 files changed, 376 insertions(+), 28 deletions(-) diff --git a/hub/src/scheduler.py b/hub/src/scheduler.py index 743d147..e191a03 100644 --- a/hub/src/scheduler.py +++ b/hub/src/scheduler.py @@ -6,6 +6,7 @@ import threading import requests import schedule from datetime import datetime, timedelta +from typing import Optional from db import (init_db, get_conn, log, source_health_update, source_health_ist_pausiert, source_health_reset_daily, source_health_get_all, scan_result_save) @@ -17,6 +18,8 @@ _vision_client = OpenAI( base_url="https://openrouter.ai/api/v1", api_key=os.environ.get("OPENROUTER_API_KEY") ) +OLLAMA_VISION_URL = "http://100.84.255.83:11434" + # ── Telegram ────────────────────────────────────────────────────────────────── TELEGRAM_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "") @@ -194,6 +197,129 @@ def klassifiziere_screenshot(screenshot_b64: str) -> str: return "Unbekannt" + +def vision_preis_lokal(screenshot_b64: str) -> float | None: + """Vision-KI (gpt-4o-mini via OpenRouter) liest guenstigsten Roundtrip-Preis aus Screenshot. + Frueher lokal (qwen3-vl:32b), jetzt Cloud — GPU frei fuer Coding-Agent.""" + if not screenshot_b64: + return None + try: + prompt = ( + "Look at this flight search screenshot. " + "I need the cheapest ROUNDTRIP (Hin- und Rueckflug) price in EUR from the search results. " + "IMPORTANT: Ignore one-way (Hinflug only) prices. Ignore sidebar filters. Ignore ads. " + "If the page shows roundtrip results: answer with the cheapest roundtrip price as a number only, e.g.: 872 " + "If the page shows only one-way results or no roundtrip prices: answer 0" + ) + response = _vision_client.chat.completions.create( + model="openai/gpt-4o-mini", + max_tokens=30, + temperature=0, + messages=[{ + "role": "user", + "content": [ + {"type": "text", "text": prompt}, + {"type": "image_url", "image_url": { + "url": f"data:image/jpeg;base64,{screenshot_b64}" + }} + ] + }] + ) + txt = response.choices[0].message.content.strip() + m = re.search(r'\d{3,5}', txt) + if m: + v = float(m.group(0)) + if 600 <= v <= 2500: + log(f"Vision-Preis: {v:.0f}\u20ac erkannt (OpenRouter)") + return v + if v == 0: + return None + except Exception as e: + log(f"Vision-Preis Fehler: {e}", "WARN") + return None + +def vision_verifiziere_preise(screenshot_b64: str, screenshot_id: int) -> Optional[float]: + """Verifiziert gespeicherte Preise via lokaler Vision-KI (qwen3-vl:32b). + + Ablauf: + - KI liest guenstigsten Preis aus Screenshot + - Stimmt mit gespeichertem Preis ueberein (<=20% Abweichung): ki_verified=1 setzen + - Weicht >20% ab (Sidebar-Artefakt): original plausibel=0 UND neuer Eintrag mit + scanner='[scanner]_ki' und ki-Preis angelegt → erscheint als eigener Eintrag im UI + """ + if not screenshot_b64 or not screenshot_id: + return None + ki_preis = vision_preis_lokal(screenshot_b64) + if ki_preis is None: + return None + try: + from datetime import datetime as _dt + conn = get_conn() + # Alle Preise dieses Screenshots holen (mit Metadaten fuer neuen Eintrag) + rows = conn.execute(""" + SELECT id, preis, scanner, node, job_id, waehrung, airline, abflug, ankunft, + von, nach, booking_url, kabine_erkannt, preis_korrigiert, korrektur_grund + FROM prices WHERE screenshot_id=? + """, (screenshot_id,)).fetchall() + + if not rows: + conn.close() + log(f"Vision-Check screenshot {screenshot_id}: keine Preiszeilen zum Abgleich") + return None + + now_str = _dt.now().isoformat() + korrigiert = 0 + bestaetigt = 0 + + for row in rows: + (price_id, raw_preis, scanner, node, job_id, waehrung, airline, + abflug, ankunft, von, nach, booking_url, kabine, preis_korr, korr_grund) = row + + diff_pct = abs(raw_preis - ki_preis) / ki_preis if ki_preis > 0 else 1 + + if diff_pct > 0.15: + # Original als Artefakt markieren + conn.execute(""" + UPDATE prices SET ki_preis_visual=?, ki_verified=0, ki_verified_at=NULL, + plausibel=0, + plausi_grund='Vision-KI: ' || ROUND(?) || 'EUR vs scraped ' || ROUND(?) || 'EUR (Artefakt — nicht verifiziert)' + WHERE id=? + """, (ki_preis, ki_preis, raw_preis, price_id)) + + # Neuer verifizierter Eintrag (scanner + '_ki') + conn.execute(""" + INSERT OR IGNORE INTO prices + (job_id, scanner, node, preis, waehrung, airline, abflug, ankunft, + von, nach, booking_url, screenshot_id, kabine_erkannt, + plausibel, plausi_grund, preis_korrigiert, korrektur_grund, + ki_preis_visual, ki_verified, ki_verified_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, + 'Vision-KI verifiziert', ?, ?, ?, 1, ?) + """, ( + job_id, scanner + '_ki', node, ki_preis, waehrung or 'EUR', + airline, abflug, ankunft, von, nach, booking_url, screenshot_id, + kabine, preis_korr, korr_grund, ki_preis, now_str, + )) + conn.commit() + log(f"Vision-Check: {raw_preis:.0f}€ → Artefakt; neuer Eintrag {ki_preis:.0f}€ ({scanner}_ki)") + korrigiert += 1 + else: + # Bestaetigt: nur Felder updaten, kein neuer Eintrag noetig + conn.execute(""" + UPDATE prices SET ki_preis_visual=?, ki_verified=1, ki_verified_at=? + WHERE id=? + """, (ki_preis, now_str, price_id)) + conn.commit() + bestaetigt += 1 + + conn.close() + log(f"Vision-Check screenshot {screenshot_id}: {bestaetigt} bestaetigt, {korrigiert} korrigiert (KI: {ki_preis:.0f}€)") + return ki_preis + except Exception as e: + log(f"Vision-Verifizierung Fehler: {e}", "WARN") + return None + + # ── Cleanup ─────────────────────────────────────────────────────────────────── def cleanup_alte_screenshots(tage=30): """Löscht Screenshots die älter als `tage` Tage sind.""" @@ -375,9 +501,29 @@ def dispatch_job(node, job, tage_override=None): log(f"👁 KI-Fallback: {dropped} Preise verworfen (außerhalb {KI_FALLBACK_MIN}-{KI_FALLBACK_MAX}€ — vermutlich One-Way)") try: - pruefe_preis_alert(results, job) - pruefe_preisanstieg(results, job) speichere_preise(results, node["name"], job, screenshot_id, kabine_erkannt) + ki_vis = vision_verifiziere_preise(screenshot_b64, screenshot_id) + # Telegram erst NACH Vision-Abgleich — Zahl = was die KI im Screenshot liest + if screenshot_id and screenshot_b64: + alert_rows = _alert_results_aus_db(screenshot_id, job_id) + if alert_rows and _vision_hat_preise_verifiziert(screenshot_id): + pruefe_preis_alert(alert_rows, job) + pruefe_preisanstieg(alert_rows, job) + elif alert_rows and ki_vis is None: + log( + f"📵 Preis-Alert übersprungen — Vision lieferte keinen Preis " + f"({node['name']}/{job['scanner']})", + "WARN", + ) + elif alert_rows and not _vision_hat_preise_verifiziert(screenshot_id): + log( + f"📵 Preis-Alert übersprungen — keine ki_verified-Zeilen " + f"(Screenshot {screenshot_id})", + "WARN", + ) + else: + pruefe_preis_alert(results, job) + pruefe_preisanstieg(results, job) except Exception as e: log(f"Speicher-Fehler {node['name']}/{job['scanner']}: {e}", "ERROR") return True @@ -408,6 +554,49 @@ def speichere_screenshot(screenshot_b64, node_name, job): return None + +def _vision_hat_preise_verifiziert(screenshot_id: int) -> bool: + """Mind. eine Zeile dieses Screenshots wurde mit Vision abgeglichen (ki_verified=1).""" + conn = get_conn() + row = conn.execute( + "SELECT COUNT(*) AS c FROM prices WHERE screenshot_id=? AND ki_verified=1", + (screenshot_id,), + ).fetchone() + conn.close() + return bool(row and row["c"] and row["c"] > 0) + + +def _alert_results_aus_db(screenshot_id: int, job_id: int) -> list: + """Preise fuer Telegram-Alert nach Vision: nur plausibel (NULL/1), mit Korrigierung.""" + conn = get_conn() + rows = conn.execute( + """ + SELECT preis, preis_korrigiert, abflug, ankunft, booking_url, scanner + FROM prices + WHERE screenshot_id = ? AND job_id = ? + AND (plausibel IS NULL OR plausibel = 1) + ORDER BY COALESCE(preis_korrigiert, preis) ASC + """, + (screenshot_id, job_id), + ).fetchall() + conn.close() + out = [] + for r in rows: + raw = float(r["preis"] or 0) + pk = r["preis_korrigiert"] + eff = float(pk) if pk is not None else raw + if eff <= 0: + continue + out.append({ + "preis": eff, + "abflug": r["abflug"] or "", + "ankunft": r["ankunft"] or "", + "booking_url": r["booking_url"] or "", + "scanner": r["scanner"] or "", + }) + return out + + ALERT_SCHWELLE_EUR = 900 # Telegram-Alert wenn CX unter diesen Preis fällt def pruefe_preis_alert(results, job): @@ -427,6 +616,23 @@ def pruefe_preis_alert(results, job): f"⚠️ Sofort auf Buchungsseite prüfen — Preise ändern sich schnell." ) log(f"💰 PREIS-ALERT: {preis:.0f}EUR {scanner} — Telegram gesendet") + # memory_service_event + try: + import requests as _rq, json as _json + _rq.post("http://100.121.192.94:8400/events", json={ + "source": "flugscanner", + "event_type": "price_alert", + "object_key": f"{scanner}_{abflug}_{preis:.0f}", + "payload_json": _json.dumps({ + "scanner": scanner, + "preis_eur": preis, + "abflug": abflug, + "booking_url": url, + "schwelle": ALERT_SCHWELLE_EUR, + }, ensure_ascii=False), + }, headers={"Authorization": "Bearer Ai8eeQibV6Z1RWc7oNPim4PXB4vILU1nRW2-XgRcX2M"}, timeout=3) + except Exception: + pass break @@ -495,7 +701,7 @@ def speichere_preise(results, node_name, job, screenshot_id=None, kabine_erkannt continue conn.execute(""" - INSERT INTO prices + INSERT OR IGNORE INTO prices (job_id, scanner, node, preis, waehrung, airline, abflug, ankunft, von, nach, booking_url, screenshot_id, kabine_erkannt, plausibel, plausi_grund, preis_korrigiert, korrektur_grund) diff --git a/hub/src/web.py b/hub/src/web.py index d02e8ff..77c9614 100644 --- a/hub/src/web.py +++ b/hub/src/web.py @@ -338,14 +338,28 @@ async function ladeUebersicht() { const buchBtn = p.booking_url ? `Öffnen ↗` : '—'; + // KI-Verifizierung: entweder _ki-Eintrag (korrigiert) oder ki_verified=1 (bestaetigt) + const isKiKorrigiert = p.scanner && p.scanner.endsWith('_ki'); + const isKiBestaetigt = !isKiKorrigiert && p.ki_verified == 1; + const isKiVerified = isKiKorrigiert || isKiBestaetigt; + const scannerBase = isKiKorrigiert ? p.scanner.slice(0, -3) : p.scanner; + const kiLabel = isKiKorrigiert + ? `
👁 KI-korrigiert` + : isKiBestaetigt + ? `
👁 KI ✓` + : ''; const scannerLabel = isMulticity ? `🇭🇰 HKG Stopover
+~${HOTEL_HKG}€ Hotel` - : p.scanner; + : (scannerBase + kiLabel); const verdaechtig = (ps === 0); - const preisFarbe = verdaechtig ? '#ef4444' : (isMulticity ? '#a78bfa' : '#34d399'); + const preisFarbe = verdaechtig ? '#ef4444' : (isMulticity ? '#a78bfa' : isKiKorrigiert ? '#60a5fa' : '#34d399'); + // Bei bestaetigten Eintraegen: ki_preis_visual anzeigen wenn vorhanden und abweichend + const kiPreisHinweis = (isKiBestaetigt && p.ki_preis_visual && Math.abs(p.ki_preis_visual - p.preis) > 5) + ? `
KI sieht: ${p.ki_preis_visual}€` + : ''; const gesamtHtml = isMulticity ? `${p.preis} €
∑ ~${Math.round(p.preis)+HOTEL_HKG} € inkl. Hotel` - : `${p.preis} €`; + : `${p.preis} €${kiPreisHinweis}`; const ssBtn = p.screenshot_id ? `