diff --git a/hub/src/scheduler.py b/hub/src/scheduler.py
index 743d147..e191a03 100644
--- a/hub/src/scheduler.py
+++ b/hub/src/scheduler.py
@@ -6,6 +6,7 @@ import threading
import requests
import schedule
from datetime import datetime, timedelta
+from typing import Optional
from db import (init_db, get_conn, log, source_health_update,
source_health_ist_pausiert, source_health_reset_daily,
source_health_get_all, scan_result_save)
@@ -17,6 +18,8 @@ _vision_client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=os.environ.get("OPENROUTER_API_KEY")
)
+OLLAMA_VISION_URL = "http://100.84.255.83:11434"
+
# ── Telegram ──────────────────────────────────────────────────────────────────
TELEGRAM_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "")
@@ -194,6 +197,129 @@ def klassifiziere_screenshot(screenshot_b64: str) -> str:
return "Unbekannt"
+
+def vision_preis_lokal(screenshot_b64: str) -> float | None:
+ """Vision-KI (gpt-4o-mini via OpenRouter) liest guenstigsten Roundtrip-Preis aus Screenshot.
+ Frueher lokal (qwen3-vl:32b), jetzt Cloud — GPU frei fuer Coding-Agent."""
+ if not screenshot_b64:
+ return None
+ try:
+ prompt = (
+ "Look at this flight search screenshot. "
+ "I need the cheapest ROUNDTRIP (Hin- und Rueckflug) price in EUR from the search results. "
+ "IMPORTANT: Ignore one-way (Hinflug only) prices. Ignore sidebar filters. Ignore ads. "
+ "If the page shows roundtrip results: answer with the cheapest roundtrip price as a number only, e.g.: 872 "
+ "If the page shows only one-way results or no roundtrip prices: answer 0"
+ )
+ response = _vision_client.chat.completions.create(
+ model="openai/gpt-4o-mini",
+ max_tokens=30,
+ temperature=0,
+ messages=[{
+ "role": "user",
+ "content": [
+ {"type": "text", "text": prompt},
+ {"type": "image_url", "image_url": {
+ "url": f"data:image/jpeg;base64,{screenshot_b64}"
+ }}
+ ]
+ }]
+ )
+ txt = response.choices[0].message.content.strip()
+ m = re.search(r'\d{3,5}', txt)
+ if m:
+ v = float(m.group(0))
+ if 600 <= v <= 2500:
+ log(f"Vision-Preis: {v:.0f}\u20ac erkannt (OpenRouter)")
+ return v
+ if v == 0:
+ return None
+ except Exception as e:
+ log(f"Vision-Preis Fehler: {e}", "WARN")
+ return None
+
+def vision_verifiziere_preise(screenshot_b64: str, screenshot_id: int) -> Optional[float]:
+ """Verifiziert gespeicherte Preise via lokaler Vision-KI (qwen3-vl:32b).
+
+ Ablauf:
+ - KI liest guenstigsten Preis aus Screenshot
+ - Stimmt mit gespeichertem Preis ueberein (<=20% Abweichung): ki_verified=1 setzen
+ - Weicht >20% ab (Sidebar-Artefakt): original plausibel=0 UND neuer Eintrag mit
+ scanner='[scanner]_ki' und ki-Preis angelegt → erscheint als eigener Eintrag im UI
+ """
+ if not screenshot_b64 or not screenshot_id:
+ return None
+ ki_preis = vision_preis_lokal(screenshot_b64)
+ if ki_preis is None:
+ return None
+ try:
+ from datetime import datetime as _dt
+ conn = get_conn()
+ # Alle Preise dieses Screenshots holen (mit Metadaten fuer neuen Eintrag)
+ rows = conn.execute("""
+ SELECT id, preis, scanner, node, job_id, waehrung, airline, abflug, ankunft,
+ von, nach, booking_url, kabine_erkannt, preis_korrigiert, korrektur_grund
+ FROM prices WHERE screenshot_id=?
+ """, (screenshot_id,)).fetchall()
+
+ if not rows:
+ conn.close()
+ log(f"Vision-Check screenshot {screenshot_id}: keine Preiszeilen zum Abgleich")
+ return None
+
+ now_str = _dt.now().isoformat()
+ korrigiert = 0
+ bestaetigt = 0
+
+ for row in rows:
+ (price_id, raw_preis, scanner, node, job_id, waehrung, airline,
+ abflug, ankunft, von, nach, booking_url, kabine, preis_korr, korr_grund) = row
+
+ diff_pct = abs(raw_preis - ki_preis) / ki_preis if ki_preis > 0 else 1
+
+ if diff_pct > 0.15:
+ # Original als Artefakt markieren
+ conn.execute("""
+ UPDATE prices SET ki_preis_visual=?, ki_verified=0, ki_verified_at=NULL,
+ plausibel=0,
+ plausi_grund='Vision-KI: ' || ROUND(?) || 'EUR vs scraped ' || ROUND(?) || 'EUR (Artefakt — nicht verifiziert)'
+ WHERE id=?
+ """, (ki_preis, ki_preis, raw_preis, price_id))
+
+ # Neuer verifizierter Eintrag (scanner + '_ki')
+ conn.execute("""
+ INSERT OR IGNORE INTO prices
+ (job_id, scanner, node, preis, waehrung, airline, abflug, ankunft,
+ von, nach, booking_url, screenshot_id, kabine_erkannt,
+ plausibel, plausi_grund, preis_korrigiert, korrektur_grund,
+ ki_preis_visual, ki_verified, ki_verified_at)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1,
+ 'Vision-KI verifiziert', ?, ?, ?, 1, ?)
+ """, (
+ job_id, scanner + '_ki', node, ki_preis, waehrung or 'EUR',
+ airline, abflug, ankunft, von, nach, booking_url, screenshot_id,
+ kabine, preis_korr, korr_grund, ki_preis, now_str,
+ ))
+ conn.commit()
+ log(f"Vision-Check: {raw_preis:.0f}€ → Artefakt; neuer Eintrag {ki_preis:.0f}€ ({scanner}_ki)")
+ korrigiert += 1
+ else:
+ # Bestaetigt: nur Felder updaten, kein neuer Eintrag noetig
+ conn.execute("""
+ UPDATE prices SET ki_preis_visual=?, ki_verified=1, ki_verified_at=?
+ WHERE id=?
+ """, (ki_preis, now_str, price_id))
+ conn.commit()
+ bestaetigt += 1
+
+ conn.close()
+ log(f"Vision-Check screenshot {screenshot_id}: {bestaetigt} bestaetigt, {korrigiert} korrigiert (KI: {ki_preis:.0f}€)")
+ return ki_preis
+ except Exception as e:
+ log(f"Vision-Verifizierung Fehler: {e}", "WARN")
+ return None
+
+
# ── Cleanup ───────────────────────────────────────────────────────────────────
def cleanup_alte_screenshots(tage=30):
"""Löscht Screenshots die älter als `tage` Tage sind."""
@@ -375,9 +501,29 @@ def dispatch_job(node, job, tage_override=None):
log(f"👁 KI-Fallback: {dropped} Preise verworfen (außerhalb {KI_FALLBACK_MIN}-{KI_FALLBACK_MAX}€ — vermutlich One-Way)")
try:
- pruefe_preis_alert(results, job)
- pruefe_preisanstieg(results, job)
speichere_preise(results, node["name"], job, screenshot_id, kabine_erkannt)
+ ki_vis = vision_verifiziere_preise(screenshot_b64, screenshot_id)
+ # Telegram erst NACH Vision-Abgleich — Zahl = was die KI im Screenshot liest
+ if screenshot_id and screenshot_b64:
+ alert_rows = _alert_results_aus_db(screenshot_id, job_id)
+ if alert_rows and _vision_hat_preise_verifiziert(screenshot_id):
+ pruefe_preis_alert(alert_rows, job)
+ pruefe_preisanstieg(alert_rows, job)
+ elif alert_rows and ki_vis is None:
+ log(
+ f"📵 Preis-Alert übersprungen — Vision lieferte keinen Preis "
+ f"({node['name']}/{job['scanner']})",
+ "WARN",
+ )
+ elif alert_rows and not _vision_hat_preise_verifiziert(screenshot_id):
+ log(
+ f"📵 Preis-Alert übersprungen — keine ki_verified-Zeilen "
+ f"(Screenshot {screenshot_id})",
+ "WARN",
+ )
+ else:
+ pruefe_preis_alert(results, job)
+ pruefe_preisanstieg(results, job)
except Exception as e:
log(f"Speicher-Fehler {node['name']}/{job['scanner']}: {e}", "ERROR")
return True
@@ -408,6 +554,49 @@ def speichere_screenshot(screenshot_b64, node_name, job):
return None
+
+def _vision_hat_preise_verifiziert(screenshot_id: int) -> bool:
+ """Mind. eine Zeile dieses Screenshots wurde mit Vision abgeglichen (ki_verified=1)."""
+ conn = get_conn()
+ row = conn.execute(
+ "SELECT COUNT(*) AS c FROM prices WHERE screenshot_id=? AND ki_verified=1",
+ (screenshot_id,),
+ ).fetchone()
+ conn.close()
+ return bool(row and row["c"] and row["c"] > 0)
+
+
+def _alert_results_aus_db(screenshot_id: int, job_id: int) -> list:
+ """Preise fuer Telegram-Alert nach Vision: nur plausibel (NULL/1), mit Korrigierung."""
+ conn = get_conn()
+ rows = conn.execute(
+ """
+ SELECT preis, preis_korrigiert, abflug, ankunft, booking_url, scanner
+ FROM prices
+ WHERE screenshot_id = ? AND job_id = ?
+ AND (plausibel IS NULL OR plausibel = 1)
+ ORDER BY COALESCE(preis_korrigiert, preis) ASC
+ """,
+ (screenshot_id, job_id),
+ ).fetchall()
+ conn.close()
+ out = []
+ for r in rows:
+ raw = float(r["preis"] or 0)
+ pk = r["preis_korrigiert"]
+ eff = float(pk) if pk is not None else raw
+ if eff <= 0:
+ continue
+ out.append({
+ "preis": eff,
+ "abflug": r["abflug"] or "",
+ "ankunft": r["ankunft"] or "",
+ "booking_url": r["booking_url"] or "",
+ "scanner": r["scanner"] or "",
+ })
+ return out
+
+
ALERT_SCHWELLE_EUR = 900 # Telegram-Alert wenn CX unter diesen Preis fällt
def pruefe_preis_alert(results, job):
@@ -427,6 +616,23 @@ def pruefe_preis_alert(results, job):
f"⚠️ Sofort auf Buchungsseite prüfen — Preise ändern sich schnell."
)
log(f"💰 PREIS-ALERT: {preis:.0f}EUR {scanner} — Telegram gesendet")
+ # memory_service_event
+ try:
+ import requests as _rq, json as _json
+ _rq.post("http://100.121.192.94:8400/events", json={
+ "source": "flugscanner",
+ "event_type": "price_alert",
+ "object_key": f"{scanner}_{abflug}_{preis:.0f}",
+ "payload_json": _json.dumps({
+ "scanner": scanner,
+ "preis_eur": preis,
+ "abflug": abflug,
+ "booking_url": url,
+ "schwelle": ALERT_SCHWELLE_EUR,
+ }, ensure_ascii=False),
+ }, headers={"Authorization": "Bearer Ai8eeQibV6Z1RWc7oNPim4PXB4vILU1nRW2-XgRcX2M"}, timeout=3)
+ except Exception:
+ pass
break
@@ -495,7 +701,7 @@ def speichere_preise(results, node_name, job, screenshot_id=None, kabine_erkannt
continue
conn.execute("""
- INSERT INTO prices
+ INSERT OR IGNORE INTO prices
(job_id, scanner, node, preis, waehrung, airline, abflug, ankunft,
von, nach, booking_url, screenshot_id, kabine_erkannt,
plausibel, plausi_grund, preis_korrigiert, korrektur_grund)
diff --git a/hub/src/web.py b/hub/src/web.py
index d02e8ff..77c9614 100644
--- a/hub/src/web.py
+++ b/hub/src/web.py
@@ -338,14 +338,28 @@ async function ladeUebersicht() {
const buchBtn = p.booking_url
? `Öffnen ↗`
: '—';
+ // KI-Verifizierung: entweder _ki-Eintrag (korrigiert) oder ki_verified=1 (bestaetigt)
+ const isKiKorrigiert = p.scanner && p.scanner.endsWith('_ki');
+ const isKiBestaetigt = !isKiKorrigiert && p.ki_verified == 1;
+ const isKiVerified = isKiKorrigiert || isKiBestaetigt;
+ const scannerBase = isKiKorrigiert ? p.scanner.slice(0, -3) : p.scanner;
+ const kiLabel = isKiKorrigiert
+ ? `
👁 KI-korrigiert`
+ : isKiBestaetigt
+ ? `
👁 KI ✓`
+ : '';
const scannerLabel = isMulticity
? `🇭🇰 HKG Stopover
+~${HOTEL_HKG}€ Hotel`
- : p.scanner;
+ : (scannerBase + kiLabel);
const verdaechtig = (ps === 0);
- const preisFarbe = verdaechtig ? '#ef4444' : (isMulticity ? '#a78bfa' : '#34d399');
+ const preisFarbe = verdaechtig ? '#ef4444' : (isMulticity ? '#a78bfa' : isKiKorrigiert ? '#60a5fa' : '#34d399');
+ // Bei bestaetigten Eintraegen: ki_preis_visual anzeigen wenn vorhanden und abweichend
+ const kiPreisHinweis = (isKiBestaetigt && p.ki_preis_visual && Math.abs(p.ki_preis_visual - p.preis) > 5)
+ ? `
KI sieht: ${p.ki_preis_visual}€`
+ : '';
const gesamtHtml = isMulticity
? `${p.preis} €
∑ ~${Math.round(p.preis)+HOTEL_HKG} € inkl. Hotel`
- : `${p.preis} €`;
+ : `${p.preis} €${kiPreisHinweis}`;
const ssBtn = p.screenshot_id
? `