From a90a73b7cbd2e0b8528911d8fdee2737cff3fbe4 Mon Sep 17 00:00:00 2001 From: Orbitalo Date: Sun, 1 Mar 2026 03:25:01 +0000 Subject: [PATCH] fix: KI-Augen blockiert keine Scraper-Ergebnisse mehr + Economy komplett - scheduler.py: Scraper-Preise haben Vorrang vor KI-Screenshot-Analyse - worker.py: Cabin-Check dynamisch (economy statt hardcoded PE) - cathay_pacific Job deaktiviert (nicht auf Nodes implementiert) - Doppelte momondo/trip Jobs bereinigt - Economy QC-Filter 600-1400 EUR, Roundtrip 50-95 Tage - Gepaeckzuschlag Economy Light +140 EUR Roundtrip - Vision-AI Kabinen+Preis-Klassifizierung - KI-Plausi in Batches, Telegram Bot, Source-Health - Scan-Limits 3/Tag/Quelle, Geo-Skip Asia - .gitignore hinzugefuegt --- .gitignore | 23 +- hub/src/db.py | 149 +++++++++++- hub/src/ki.py | 311 +++++++++++++++--------- hub/src/scheduler.py | 556 ++++++++++++++++++++++++++++++++++--------- node/src/worker.py | 353 +++++++++++++-------------- 5 files changed, 983 insertions(+), 409 deletions(-) diff --git a/.gitignore b/.gitignore index 2282db3..d71c527 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,22 @@ -.env -*.env -.env.* +# Data +hub/data/ +node/data/ + +# Python __pycache__/ *.pyc +*.pyo + +# Backups & Temp *.bak -data/ +*.bak2 +hub/src/check*.py +hub/src/test_*.py +hub/src/final*.py +hub/src/bilanz*.py +hub/src/patch_*.py +hub/src/setup_*.py +node/src/*.bak + +# Environment +.env diff --git a/hub/src/db.py b/hub/src/db.py index 47966be..601c55b 100644 --- a/hub/src/db.py +++ b/hub/src/db.py @@ -1,12 +1,15 @@ import sqlite3 import os +from datetime import datetime, timedelta DB_PATH = os.environ.get("DB_PATH", "/data/flugscanner.db") def get_conn(): - conn = sqlite3.connect(DB_PATH) + conn = sqlite3.connect(DB_PATH, timeout=30) conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA busy_timeout=30000") return conn @@ -92,6 +95,8 @@ def init_db(): "ALTER TABLE prices ADD COLUMN screenshot_id INTEGER", "ALTER TABLE prices ADD COLUMN plausibel INTEGER", "ALTER TABLE prices ADD COLUMN plausi_grund TEXT DEFAULT ''", + "ALTER TABLE prices ADD COLUMN preis_korrigiert REAL", + "ALTER TABLE prices ADD COLUMN korrektur_grund TEXT DEFAULT ''", ]: try: c.execute(col_sql) @@ -140,6 +145,39 @@ def init_db(): ) """) + # Quell-Gesundheit pro Node+Scanner — Gedächtnis des Systems + c.execute(""" + CREATE TABLE IF NOT EXISTS source_health ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + node TEXT NOT NULL, + scanner TEXT NOT NULL, + status TEXT DEFAULT 'unknown', + erfolge_heute INTEGER DEFAULT 0, + fehler_heute INTEGER DEFAULT 0, + letzter_erfolg TEXT, + letzter_fehler TEXT, + fehler_typ TEXT DEFAULT '', + pausiert_bis TEXT, + updated_at TEXT DEFAULT (datetime('now')), + UNIQUE(node, scanner) + ) + """) + + # Scan-Ergebnisse: was die KI auf jedem Screenshot gesehen hat + c.execute(""" + CREATE TABLE IF NOT EXISTS scan_results ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + node TEXT NOT NULL, + scanner TEXT NOT NULL, + screenshot_id INTEGER, + ki_status TEXT NOT NULL, + ki_details TEXT DEFAULT '', + preise_gefunden INTEGER DEFAULT 0, + aktion TEXT DEFAULT '', + created_at TEXT DEFAULT (datetime('now')) + ) + """) + # Standard-Prompt PROMPT_TEXT = """Du bist ein Flugpreis-Analyst. Analysiere Preisdaten fuer folgenden Flug: @@ -206,6 +244,16 @@ HKG_STOPOVER: [Vergleich: Direktverbindung vs. FRA-HKG-KTI Multi-City — lohnt ('kayak_multicity','FRA','KTI',30,60,'multicity', 'premium_economy','1koffer+handgepaeck','', 120,300,22,2,'HKG',20,30,'daily') """) + # Cathay Pacific direkt — immer hinzufügen wenn noch nicht vorhanden + c.execute(""" + INSERT INTO jobs + (scanner, von, nach, tage, aufenthalt_tage, trip_type, kabine, gepaeck, + airline_filter, layover_min, layover_max, max_flugzeit_h, max_stops, + via, stopover_min_h, stopover_max_h, intervall) + SELECT 'cathay_pacific','FRA','KTI',90,75,'roundtrip','economy','1koffer+handgepaeck','CX',120,300,22,2,'',0,0,'daily' + WHERE NOT EXISTS (SELECT 1 FROM jobs WHERE scanner='cathay_pacific') + """) + conn.commit() conn.close() @@ -219,3 +267,102 @@ def log(message, level="INFO"): conn.commit() conn.close() print(f"[{level}] {message}") + + +def source_health_update(node: str, scanner: str, erfolg: bool, fehler_typ: str = ""): + """Aktualisiert die Gesundheit einer Quelle nach einem Scan-Versuch.""" + conn = get_conn() + now = datetime.now().isoformat() + + existing = conn.execute( + "SELECT id FROM source_health WHERE node=? AND scanner=?", + (node, scanner) + ).fetchone() + + if existing: + if erfolg: + conn.execute(""" + UPDATE source_health + SET status='healthy', erfolge_heute=erfolge_heute+1, + letzter_erfolg=?, fehler_typ='', updated_at=? + WHERE node=? AND scanner=? + """, (now, now, node, scanner)) + else: + conn.execute(""" + UPDATE source_health + SET status='unhealthy', fehler_heute=fehler_heute+1, + letzter_fehler=?, fehler_typ=?, updated_at=? + WHERE node=? AND scanner=? + """, (now, fehler_typ, now, node, scanner)) + else: + status = 'healthy' if erfolg else 'unhealthy' + conn.execute(""" + INSERT INTO source_health (node, scanner, status, erfolge_heute, fehler_heute, + letzter_erfolg, letzter_fehler, fehler_typ, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, (node, scanner, status, + 1 if erfolg else 0, 0 if erfolg else 1, + now if erfolg else None, now if not erfolg else None, + fehler_typ, now)) + + conn.commit() + conn.close() + + +def source_health_pause(node: str, scanner: str, stunden: int = 24): + """Pausiert eine Quelle für N Stunden (z.B. nach CAPTCHA).""" + conn = get_conn() + bis = (datetime.now() + timedelta(hours=stunden)).isoformat() + conn.execute(""" + UPDATE source_health SET pausiert_bis=?, status='paused', updated_at=datetime('now') + WHERE node=? AND scanner=? + """, (bis, node, scanner)) + conn.commit() + conn.close() + + +def source_health_ist_pausiert(node: str, scanner: str) -> bool: + """Prüft ob eine Quelle gerade pausiert ist.""" + conn = get_conn() + row = conn.execute( + "SELECT pausiert_bis FROM source_health WHERE node=? AND scanner=?", + (node, scanner) + ).fetchone() + conn.close() + if not row or not row["pausiert_bis"]: + return False + return datetime.fromisoformat(row["pausiert_bis"]) > datetime.now() + + +def source_health_reset_daily(): + """Täglicher Reset der Tageszähler (erfolge_heute, fehler_heute).""" + conn = get_conn() + conn.execute("UPDATE source_health SET erfolge_heute=0, fehler_heute=0") + conn.commit() + conn.close() + + +def source_health_get_all() -> list: + """Gibt alle Source-Health-Einträge zurück, sortiert nach Erfolgen.""" + conn = get_conn() + rows = conn.execute(""" + SELECT node, scanner, status, erfolge_heute, fehler_heute, + letzter_erfolg, letzter_fehler, fehler_typ, pausiert_bis + FROM source_health + ORDER BY erfolge_heute DESC, fehler_heute ASC + """).fetchall() + conn.close() + return [dict(r) for r in rows] + + +def scan_result_save(node: str, scanner: str, screenshot_id: int, + ki_status: str, ki_details: str, preise: int, aktion: str): + """Speichert was die KI auf einem Screenshot gesehen hat.""" + conn = get_conn() + conn.execute(""" + INSERT INTO scan_results (node, scanner, screenshot_id, ki_status, + ki_details, preise_gefunden, aktion) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, (node, scanner, screenshot_id, ki_status, ki_details, preise, aktion)) + conn.commit() + conn.close() diff --git a/hub/src/ki.py b/hub/src/ki.py index 3bbb8fa..93d2c7f 100644 --- a/hub/src/ki.py +++ b/hub/src/ki.py @@ -1,8 +1,7 @@ import os import json -import requests from openai import OpenAI -from db import get_conn, log +from db import get_conn, log, scan_result_save, source_health_update, source_health_pause client = OpenAI( base_url="https://openrouter.ai/api/v1", @@ -11,39 +10,159 @@ client = OpenAI( MODEL = os.environ.get("AI_MODEL", "openai/gpt-4o-mini") -PLAUSI_PROMPT = """Du bist ein Flugpreis-Experte. Prüfe jeden der folgenden Preise auf Plausibilität. +PLAUSI_PROMPT = """Du bist ein Flugpreis-Experte. Pruefe jeden der folgenden Preise auf Plausibilitaet. KONTEXT: -- Strecke: Roundtrip Frankfurt (FRA) → Phnom Penh/Siem Reap (KTI), ca. 2 Monate Aufenthalt -- Kabinenklasse: ECONOMY (normales Economy mit Gepäck) -- Gepäck: 1 großer Koffer + Handgepäck muss inklusive sein -- Ziel-Airlines: Cathay Pacific (CX), Singapore Airlines (SQ), Emirates (EK), Qatar Airways (QR) +- Strecke: Roundtrip Frankfurt (FRA) → Phnom Penh (KTI), ca. 2 Monate Aufenthalt, via Hong Kong (HKG) +- Kabinenklasse: ECONOMY (Cathay Pacific CX) — NICHT Economy Light, NICHT Premium Economy! +- Gepaeck: 1 Koffer + Handgepaeck inklusive +- NUR Cathay Pacific (CX) relevant -PREISREFERENZ für Economy Roundtrip FRA-KTI mit Gepäck: -- Sehr günstig: 700-900 EUR (seltene Deals, plausibel wenn bekannte Airline) -- Normal: 900-1200 EUR -- Teuer: 1200-1600 EUR -- Über 1600 EUR: möglicherweise falsche Kabine oder Business -- Unter 500 EUR: fast sicher Economy Light (ohne Gepäck) — NICHT PLAUSIBEL -- 500-700 EUR: verdächtig, wahrscheinlich ohne Gepäck +PREISREFERENZ fuer CX Economy Roundtrip FRA-KTI via HKG: +- Sehr guenstig: 700-850 EUR (Deals) +- Normal: 850-1100 EUR +- Obergrenze: 1200 EUR (darueber verdaechtig) +- UNTER 600 EUR: nicht plausibel (Economy Light, One-Way, Fehler) +- UEBER 1400 EUR: verdaechtig (andere Airline, andere Kabine) -PRÜFREGELN: -1. Preis unter 500 EUR → NICHT PLAUSIBEL (Economy Light ohne Gepäck) -2. Preis 500-700 EUR → VERDÄCHTIG (prüfen ob ohne Gepäck) -3. Preis 700-1600 EUR mit bekannter Airline → PLAUSIBEL -4. Preis über 1600 EUR → VERDÄCHTIG (möglicherweise Business oder falsche Kabine) -5. kayak_multicity (HKG Stopover): 50-150 EUR teurer als Direkt ist normal -6. Wenn ein Scanner deutlich günstiger als alle anderen: VERDÄCHTIG +PRUEFREGELN: +1. Preis unter 600 EUR → NICHT PLAUSIBEL +2. Preis 600-700 EUR → VERDAECHTIG (moeglicherweise Economy Light) +3. Preis 700-1200 EUR → PLAUSIBEL fuer CX Economy +4. Preis ueber 1200 EUR → VERDAECHTIG +5. Nur CX-Relevanz: Andere Airlines koennen ignoriert werden -PREISE ZU PRÜFEN: +PREISE ZU PRUEFEN: {preise_liste} -Antworte NUR mit gültigem JSON-Array. Für jeden Preis: -{{"id": , "plausibel": true/false, "grund": ""}}""" +Antworte NUR mit gueltigem JSON-Array. Fuer jeden Preis: +{{"id": , "plausibel": true/false, "grund": ""}} + +Beispiel: +[ + {{"id": 123, "plausibel": true, "grund": "1350 EUR fuer CX PE Roundtrip ist marktgerecht"}}, + {{"id": 124, "plausibel": false, "grund": "436 EUR ist Economy-Preis, nicht PE mit Gepaeck"}} +]""" + + +# ── Screenshot-Analyse: "Was siehst du?" ───────────────────────────────────── + +SCREENSHOT_ANALYSE_PROMPT = """Du siehst einen Screenshot einer Flugsuche-Website. + +Analysiere das Bild und antworte NUR mit gueltigem JSON: + +{{ + "status": "", + "preise": [], + "airlines": [], + "details": "" +}} + +Moegliche Status-Werte: +- "PRICES_FOUND" — Flugpreise sind sichtbar auf der Seite +- "COOKIE_BANNER" — Ein Cookie/Consent-Banner verdeckt die Ergebnisse +- "EMPTY_PAGE" — Seite geladen aber keine Ergebnisse (Skeleton, Ladeanimation, "Keine Ergebnisse") +- "CAPTCHA" — Bot-Erkennung, Captcha, oder "Bitte bestaetigen Sie"-Seite +- "ERROR_PAGE" — Fehlerseite, 404, Timeout-Meldung +- "UNKNOWN" — Kann die Seite nicht einordnen + +WICHTIG: +- Preise NUR in EUR angeben. USD-Preise mit 0.92 umrechnen. +- Nur Preise die DEUTLICH LESBAR sind. Keine gerundeten oder geschaetzten Preise. +- Bei "EMPTY_PAGE": Skelett-Platzhalter (graue Kaesten ohne Zahlen) = leer. +- Bei Preisen die sichtbar sind aber durch Banner teilweise verdeckt: "COOKIE_BANNER". +""" + + +def analyse_screenshot(screenshot_b64: str, node: str, scanner: str, + screenshot_id: int = None) -> dict: + """Analysiert einen Screenshot via Vision AI. + + Returns dict mit: + status: PRICES_FOUND|COOKIE_BANNER|EMPTY_PAGE|CAPTCHA|ERROR_PAGE|UNKNOWN + preise: Liste von Preisen (float) + airlines: Liste von Airlines + details: Kurze Beschreibung + aktion: PROCESS|RETRY_COOKIES|SKIP|PAUSE_NODE + """ + fallback = { + "status": "UNKNOWN", "preise": [], "airlines": [], + "details": "KI-Analyse fehlgeschlagen", "aktion": "SKIP" + } + + if not screenshot_b64: + return fallback + + try: + response = client.chat.completions.create( + model=MODEL, + messages=[{ + "role": "user", + "content": [ + {"type": "text", "text": SCREENSHOT_ANALYSE_PROMPT}, + {"type": "image_url", "image_url": { + "url": f"data:image/jpeg;base64,{screenshot_b64}" + }} + ] + }], + max_tokens=500, + temperature=0.1, + ) + antwort = response.choices[0].message.content.strip() + + if "```" in antwort: + antwort = antwort.split("```")[1] + if antwort.startswith("json"): + antwort = antwort[4:] + + result = json.loads(antwort) + status = result.get("status", "UNKNOWN") + preise = result.get("preise", []) + airlines = result.get("airlines", []) + details = result.get("details", "")[:200] + + # Aktion ableiten aus Status + aktion_map = { + "PRICES_FOUND": "PROCESS", + "COOKIE_BANNER": "RETRY_COOKIES", + "EMPTY_PAGE": "SKIP", + "CAPTCHA": "PAUSE_NODE", + "ERROR_PAGE": "SKIP", + "UNKNOWN": "SKIP", + } + aktion = aktion_map.get(status, "SKIP") + + # Source-Health aktualisieren + if status == "PRICES_FOUND": + source_health_update(node, scanner, erfolg=True) + elif status == "CAPTCHA": + source_health_update(node, scanner, erfolg=False, fehler_typ="captcha") + source_health_pause(node, scanner, stunden=24) + elif status in ("EMPTY_PAGE", "ERROR_PAGE"): + source_health_update(node, scanner, erfolg=False, fehler_typ=status.lower()) + elif status == "COOKIE_BANNER": + source_health_update(node, scanner, erfolg=False, fehler_typ="cookie_banner") + + ergebnis = { + "status": status, "preise": preise, "airlines": airlines, + "details": details, "aktion": aktion, + } + + scan_result_save(node, scanner, screenshot_id, status, details, len(preise), aktion) + log(f"KI-Augen {node}/{scanner}: {status} — {len(preise)} Preise — → {aktion}") + + return ergebnis + + except json.JSONDecodeError as e: + log(f"KI-Augen JSON-Fehler: {e}", "WARN") + return fallback + except Exception as e: + log(f"KI-Augen Fehler: {e}", "WARN") + return fallback def plausibilitaetspruefung(von="FRA", nach="KTI"): - """Prüft alle ungeprüften Economy-Preise des aktuellen Laufs via KI.""" + """Prüft alle ungeprüften Preise des aktuellen Laufs via KI.""" log("KI-Plausibilitätsprüfung gestartet") conn = get_conn() @@ -53,29 +172,28 @@ def plausibilitaetspruefung(von="FRA", nach="KTI"): WHERE von=? AND nach=? AND plausibel IS NULL AND date(scraped_at) = date('now') - AND kabine_erkannt IN ('Economy', 'Economy Light', 'Unbekannt') - OR (von=? AND nach=? AND plausibel IS NULL - AND date(scraped_at) = date('now') - AND kabine_erkannt IS NULL) ORDER BY preis ASC - """, (von, nach, von, nach)).fetchall() + """, (von, nach)).fetchall() if not ungepruefte: - log("Keine ungeprüften Economy-Preise — Plausibilitätsprüfung übersprungen") + log("Keine ungeprüften Preise — Plausibilitätsprüfung übersprungen") conn.close() return + # In Batches aufteilen (max 25 Preise pro KI-Call) BATCH_SIZE = 25 batches = [ungepruefte[i:i+BATCH_SIZE] for i in range(0, len(ungepruefte), BATCH_SIZE)] - plausibel_total = verdaechtig_total = 0 + plausibel_total = 0 + verdaechtig_total = 0 for batch_nr, batch in enumerate(batches): preise_liste = "\n".join([ f" ID {p['id']}: {p['preis']:.0f} EUR — Scanner: {p['scanner']} — " - f"Airline: {p['airline'] or 'k.A.'} — Abflug: {p['abflug']}" + f"Node: {p['node']} — Airline: {p['airline'] or 'k.A.'} — Abflug: {p['abflug']}" for p in batch ]) + prompt = PLAUSI_PROMPT.format(preise_liste=preise_liste) try: @@ -86,22 +204,28 @@ def plausibilitaetspruefung(von="FRA", nach="KTI"): temperature=0.1, ) antwort = response.choices[0].message.content.strip() + if "```" in antwort: antwort = antwort.split("```")[1] if antwort.startswith("json"): antwort = antwort[4:] ergebnisse = json.loads(antwort) + for e in ergebnisse: - pid = e.get("id") + pid = e.get("id") ist_plausibel = 1 if e.get("plausibel") else 0 - grund = e.get("grund", "")[:200] + grund = e.get("grund", "")[:200] + conn.execute( "UPDATE prices SET plausibel=?, plausi_grund=? WHERE id=?", (ist_plausibel, grund, pid) ) - if ist_plausibel: plausibel_total += 1 - else: verdaechtig_total += 1 + if ist_plausibel: + plausibel_total += 1 + else: + verdaechtig_total += 1 + conn.commit() except json.JSONDecodeError as e: @@ -117,52 +241,25 @@ def plausibilitaetspruefung(von="FRA", nach="KTI"): def _regelbasierte_plausi(conn, preise): - """Fallback wenn KI nicht erreichbar: regelbasiert für Economy.""" - log("Regelbasierte Plausibilitätsprüfung (Economy) als Fallback") + """Fallback wenn KI nicht erreichbar: Economy CX 700-1200 EUR.""" + log("Regelbasierte Plausibilitätsprüfung (CX Economy) als Fallback") for p in preise: preis = p["preis"] - if preis < 500: + if preis < 600: conn.execute("UPDATE prices SET plausibel=0, plausi_grund=? WHERE id=?", - ("Unter 500€ — wahrscheinlich Economy Light ohne Gepäck", p["id"])) + ("Unter 600€ — vermutlich Economy Light oder Fehler", p["id"])) elif preis < 700: conn.execute("UPDATE prices SET plausibel=0, plausi_grund=? WHERE id=?", - ("500-700€ — verdächtig, wahrscheinlich ohne Gepäck", p["id"])) - elif preis > 1800: + ("600-700€ — verdächtig, wahrscheinlich Economy Light", p["id"])) + elif preis > 1400: conn.execute("UPDATE prices SET plausibel=0, plausi_grund=? WHERE id=?", - ("Über 1800€ — möglicherweise Business Class", p["id"])) + ("Über 1400€ — vermutlich andere Kabine/Airline", p["id"])) else: conn.execute("UPDATE prices SET plausibel=1, plausi_grund=? WHERE id=?", - ("Preis im Economy-Roundtrip-Bereich", p["id"])) + ("Preis im CX Economy-Bereich", p["id"])) conn.commit() -def get_openrouter_guthaben() -> dict: - """Fragt OpenRouter-Guthaben ab.""" - api_key = os.environ.get("OPENROUTER_API_KEY", "") - if not api_key: - return {"fehler": "Kein API-Key konfiguriert"} - try: - r = requests.get( - "https://openrouter.ai/api/v1/auth/key", - headers={"Authorization": f"Bearer {api_key}"}, - timeout=10 - ) - if r.status_code == 200: - d = r.json().get("data", {}) - limit = d.get("limit") - usage = d.get("usage", 0) - verbleibend = round((limit - usage), 4) if limit else None - return { - "limit": limit, - "usage": round(usage, 4), - "verbleibend": verbleibend, - "is_free": d.get("is_free_tier", False), - } - return {"fehler": f"HTTP {r.status_code}"} - except Exception as e: - return {"fehler": str(e)} - - def get_prompt(): conn = get_conn() row = conn.execute( @@ -176,34 +273,20 @@ def auswerten(von="FRA", nach="KTI"): log("KI-Auswertung gestartet") conn = get_conn() - # Nur Economy-Preise die plausibel sind preise_heute = conn.execute(""" - SELECT scanner, node, preis, airline, abflug, kabine_erkannt + SELECT scanner, node, preis, airline, abflug FROM prices WHERE von=? AND nach=? - AND date(scraped_at) = date('now') - AND plausibel = 1 - AND kabine_erkannt IN ('Economy', 'Economy Light', 'Unbekannt') + AND date(scraped_at) = date('now') + AND (plausibel = 1 OR plausibel IS NULL) ORDER BY preis ASC """, (von, nach)).fetchall() - qualitaet = conn.execute(""" - SELECT - COUNT(*) as gesamt, - SUM(CASE WHEN kabine_erkannt='Economy' THEN 1 ELSE 0 END) as eco, - SUM(CASE WHEN kabine_erkannt='Economy Light' THEN 1 ELSE 0 END) as light, - SUM(CASE WHEN kabine_erkannt='Premium Economy' THEN 1 ELSE 0 END) as pe - FROM prices - WHERE von=? AND nach=? AND date(scraped_at) = date('now') - """, (von, nach)).fetchone() - preisverlauf = conn.execute(""" SELECT date(scraped_at) as tag, MIN(preis) as min_preis, AVG(preis) as avg_preis FROM prices WHERE von=? AND nach=? - AND scraped_at >= datetime('now', '-30 days') - AND kabine_erkannt IN ('Economy', 'Economy Light', 'Unbekannt') - AND plausibel = 1 + AND scraped_at >= datetime('now', '-30 days') GROUP BY date(scraped_at) ORDER BY tag """, (von, nach)).fetchall() @@ -212,44 +295,31 @@ def auswerten(von="FRA", nach="KTI"): SELECT AVG(preis) as avg, MIN(preis) as min, MAX(preis) as max FROM prices WHERE von=? AND nach=? - AND scraped_at >= datetime('now', '-30 days') - AND kabine_erkannt IN ('Economy', 'Economy Light', 'Unbekannt') - AND plausibel = 1 + AND scraped_at >= datetime('now', '-30 days') """, (von, nach)).fetchone() conn.close() if not preise_heute: - log("Keine plausiblen Economy-Preise heute — KI-Auswertung übersprungen", "WARN") + log("Keine Preise für heute — KI-Auswertung übersprungen", "WARN") return - qualitaet_hinweis = ( - f"DATENQUALITÄT HEUTE: {qualitaet['eco'] or 0} Economy, " - f"{qualitaet['light'] or 0} Economy Light gescannt. " - f"Nur plausible Roundtrip-Preise mit Gepäck werden ausgewertet.\n" - ) - preise_heute_str = "\n".join([ - f" {p['scanner']}: {p['preis']} EUR — {p['airline'] or 'k.A.'} " - f"({p['kabine_erkannt'] or '?'})" + f" {p['scanner']} ({p['node']}): {p['preis']} EUR — {p['airline'] or 'k.A.'}" for p in preise_heute ]) verlauf_str = "\n".join([ f" {p['tag']}: min {p['min_preis']:.0f} EUR, avg {p['avg_preis']:.0f} EUR" for p in preisverlauf - ]) or " (noch keine Verlaufsdaten)" + ]) prompt_template = get_prompt() - if not prompt_template: - log("Kein KI-Auswertungs-Prompt in DB — übersprungen", "WARN") - return - - prompt = qualitaet_hinweis + "\n" + prompt_template.format( + prompt = prompt_template.format( preise_heute=preise_heute_str, preisverlauf=verlauf_str, - avg=f"{stats['avg']:.0f}" if stats and stats['avg'] else "?", - min=f"{stats['min']:.0f}" if stats and stats['min'] else "?", - max=f"{stats['max']:.0f}" if stats and stats['max'] else "?" + avg=f"{stats['avg']:.0f}" if stats['avg'] else "?", + min=f"{stats['min']:.0f}" if stats['min'] else "?", + max=f"{stats['max']:.0f}" if stats['max'] else "?" ) try: @@ -259,19 +329,28 @@ def auswerten(von="FRA", nach="KTI"): max_tokens=500 ) analyse = response.choices[0].message.content + log(f"KI-Antwort erhalten: {analyse[:100]}...") guenstigster = preise_heute[0] - if "JETZT BUCHEN" in analyse: empfehlung = "JETZT BUCHEN" - elif "WARTEN" in analyse: empfehlung = "WARTEN" - else: empfehlung = "NEUTRAL" + empfehlung = "" + if "JETZT BUCHEN" in analyse: + empfehlung = "JETZT BUCHEN" + elif "WARTEN" in analyse: + empfehlung = "WARTEN" + else: + empfehlung = "NEUTRAL" conn = get_conn() conn.execute(""" INSERT INTO analyses (von, nach, guenstigster_preis, guenstigster_anbieter, ki_empfehlung, ki_analyse) VALUES (?, ?, ?, ?, ?, ?) - """, (von, nach, guenstigster["preis"], - f"{guenstigster['scanner']}", empfehlung, analyse)) + """, ( + von, nach, + guenstigster["preis"], + f"{guenstigster['scanner']} ({guenstigster['node']})", + empfehlung, analyse + )) conn.commit() conn.close() log("KI-Auswertung gespeichert") diff --git a/hub/src/scheduler.py b/hub/src/scheduler.py index 995600d..b73f16e 100644 --- a/hub/src/scheduler.py +++ b/hub/src/scheduler.py @@ -1,12 +1,15 @@ import os +import re import time import random import threading import requests import schedule from datetime import datetime, timedelta -from db import init_db, get_conn, log -from ki import auswerten, plausibilitaetspruefung +from db import (init_db, get_conn, log, source_health_update, + source_health_ist_pausiert, source_health_reset_daily, + source_health_get_all, scan_result_save) +from ki import auswerten, plausibilitaetspruefung, analyse_screenshot from openai import OpenAI # ── OpenRouter Vision Client ────────────────────────────────────────────────── @@ -34,15 +37,67 @@ def telegram_send(msg: str): # ── Zero-Result-Tracking (in-memory, pro Job-ID) ───────────────────────────── _null_ergebnis_zaehler: dict[str, int] = {} # key = "node:job_id" -ALERT_NACH_N_NULLLAEUFEN = 3 +ALERT_NACH_N_NULLLAEUFEN = 999 # Keine Einzel-Alerts mehr — nur Tagesbilanz um 20:00 + +# ── HARTE REGELN — diese entscheidet NICHT die KI ──────────────────────────── +MAX_SCANS_PRO_TAG_PRO_QUELLE = 3 # Nie mehr als 3 Scans/Tag/Scanner+Node +MIN_STUNDEN_ZWISCHEN_SCANS = 4 # Min 4h zwischen Scans derselben Quelle +MAX_RETRIES_PRO_FENSTER = 1 # Max 1 Cookie-Retry pro Scan-Fenster +PREIS_HARD_MIN = 500 # Unter 500€ nie speichern +PREIS_HARD_MAX = 2000 # Über 2000€ nie speichern +ABWEICHUNG_FLAG_PROZENT = 30 # >30% vom 7-Tage-Schnitt → Flag, nicht melden + +# Gepäckzuschlag: Economy Light → geschätzter Economy-Preis +# CX Langstrecke Roundtrip: ~60-80€ pro Leg, 3 Legs bei Multi-City +GEPAECK_ZUSCHLAG = { + "multicity": 200, # FRA→HKG + HKG→KTI + KTI→FRA = 3 Legs + "roundtrip": 140, # FRA→KTI + KTI→FRA = 2 Legs +} # Scanner die aus Asien (Cambodia) nicht funktionieren - Geo-Block NODE_SCANNER_SKIP = { "flugscanner-asia": {"momondo", "traveloka"}, } + +def _scan_erlaubt(node_name: str, scanner: str) -> bool: + """Prüft ob ein Scan für diese Node+Scanner Kombination heute noch erlaubt ist.""" + conn = get_conn() + + # Wie oft wurde heute schon gescannt? + heute_count = conn.execute(""" + SELECT COUNT(*) as n FROM scan_results + WHERE node=? AND scanner=? AND date(created_at)=date('now') + """, (node_name, scanner)).fetchone()["n"] + + if heute_count >= MAX_SCANS_PRO_TAG_PRO_QUELLE: + conn.close() + log(f"⛔ {node_name}/{scanner}: {heute_count}/{MAX_SCANS_PRO_TAG_PRO_QUELLE} Scans heute — Limit erreicht") + return False + + # Wann war der letzte Scan? + letzter = conn.execute(""" + SELECT created_at FROM scan_results + WHERE node=? AND scanner=? + ORDER BY created_at DESC LIMIT 1 + """, (node_name, scanner)).fetchone() + + conn.close() + + if letzter and letzter["created_at"]: + try: + letzter_ts = datetime.fromisoformat(letzter["created_at"]) + diff_h = (datetime.now() - letzter_ts).total_seconds() / 3600 + if diff_h < MIN_STUNDEN_ZWISCHEN_SCANS: + log(f"⏰ {node_name}/{scanner}: Letzter Scan vor {diff_h:.1f}h — min {MIN_STUNDEN_ZWISCHEN_SCANS}h nötig") + return False + except (ValueError, TypeError): + pass + + return True + # ── Vision Prompt (angepasst für Economy) ──────────────────────────────────── -VISION_PROMPT = """Du siehst einen Screenshot einer Flugsuche-Website (Kayak, Momondo etc.). +VISION_PROMPT = """Du siehst einen Screenshot einer Flugsuche-Website (Kayak, Momondo, Cathay Pacific etc.). AUFGABE: Bestimme welche Kabinenklasse in den SUCHERGEBNISSEN gezeigt wird. @@ -56,14 +111,57 @@ IGNORIERE: ❌ Empfehlungsboxen oben auf der Seite ❌ Texte die nicht zu konkreten Flugergebnissen gehören -KLASSIFIZIERUNG: -- "Economy Light" → "Economy Light", "Basic", "Light", "Nur Handgepäck", "Hand baggage" -- "Economy" → "Economy" ohne "Premium" davor -- "Premium Economy" → "Premium Economy" oder "W Class" bei Flugergebnissen -- "Business" → "Business" bei Flugergebnissen -- "Unbekannt" → Ladescreen, Captcha, Cookie-Banner, keine Ergebnisse +KLASSIFIZIERUNG — REIHENFOLGE WICHTIG: +1. "Economy Light" → WENN "Light", "Basic", "eco light", "Hand baggage only" oder "Nur Handgepäck" bei den Flügen sichtbar: IMMER "Economy Light" +2. "Economy" → NUR wenn Standard-Economy MIT Gepäck sichtbar (ohne "Light"/"Basic") +3. "Premium Economy" → "Premium Economy" oder "W Class" +4. "Business" → "Business" +5. "Unbekannt" → Ladescreen, Captcha, Cookie-Banner, keine Ergebnisse -Antworte NUR mit dem einen passenden Begriff. Keine Erklärung.""" +REGEL: Kayak/Momondo zeigen oft "Economy Light" als erste Option — das ist NICHT Economy! Antworte NUR mit dem einen Begriff.""" + +VISION_PREIS_PROMPT = """Du siehst einen Screenshot von Kayak Flugsuchergebnissen (Multi-City FRA→HKG→KTI). + +AUFGABE: Was ist der GÜNSTIGSTE Preis in EUR für Economy MIT Freigepäck (1 Koffer)? +— NICHT Economy Light / Basic / Nur Handgepäck +— Sondern Essential, Flex oder "Economy" mit Gepäck inklusive + +Antworte NUR mit der Zahl (z.B. 1030) oder "keiner" wenn du keinen solchen Preis siehst.""" + + +def vision_preis_economy_mit_gepaeck(screenshot_b64: str) -> float | None: + """Vision liefert den Preis für Economy MIT Gepäck (nicht Light). Verhindert Light-Preis-Fehler.""" + if not screenshot_b64: + return None + try: + response = _vision_client.chat.completions.create( + model="openai/gpt-4o-mini", + max_tokens=20, + messages=[{ + "role": "user", + "content": [ + {"type": "text", "text": VISION_PREIS_PROMPT}, + {"type": "image_url", "image_url": { + "url": f"data:image/jpeg;base64,{screenshot_b64}" + }} + ] + }] + ) + txt = response.choices[0].message.content.strip().lower() + if "keiner" in txt or "none" in txt or "nicht" in txt: + return None + m = re.search(r'\d{3,5}', txt) + if m: + v = float(m.group(0)) + # Unter 850€ = fast immer Economy Light, nicht Economy+Gepäck + if 850 <= v <= 1500: + return v + if 600 <= v < 850: + log(f"Vision-Preis {v:.0f}€ zu niedrig für Economy+Gepäck — vermutlich Light, verworfen") + return None + except Exception as e: + log(f"Vision-Preis-Extraktion fehlgeschlagen: {e}", "WARN") + return None def klassifiziere_screenshot(screenshot_b64: str) -> str: @@ -103,7 +201,7 @@ def cleanup_alte_screenshots(tage=30): conn = get_conn() cur = conn.execute(""" DELETE FROM screenshots - WHERE created_at < datetime('now', ?) + WHERE scraped_at < datetime('now', ?) """, (f"-{tage} days",)) deleted = cur.rowcount conn.commit() @@ -190,38 +288,92 @@ def dispatch_job(node, job, tage_override=None): f"{airline_label}{via_label}" f"{' +'+str(tage_override)+'T' if tage_override else ''}") - # ── Zero-Result-Alert ───────────────────────────────────────── - if len(results) == 0: - zkey = f"{node['name']}:{job_id}" - _null_ergebnis_zaehler[zkey] = _null_ergebnis_zaehler.get(zkey, 0) + 1 - zaehler = _null_ergebnis_zaehler[zkey] - log(f"⚠ {job['scanner']} liefert 0 Preise ({zaehler}/{ALERT_NACH_N_NULLLAEUFEN})", "WARN") - if zaehler >= ALERT_NACH_N_NULLLAEUFEN: - telegram_send( - f"⚠️ Flugscanner-Alert\n" - f"Scanner {job['scanner']} (Job #{job_id}) liefert " - f"seit {zaehler} Läufen 0 Preise.\n" - f"Möglicherweise Anti-Bot-Erkennung oder Seite verändert." - ) - else: - zkey = f"{node['name']}:{job_id}" - _null_ergebnis_zaehler[zkey] = 0 # Reset bei Erfolg - # ───────────────────────────────────────────────────────────── + screenshot_id = speichere_screenshot(screenshot_b64, node["name"], job) - screenshot_id = speichere_screenshot(screenshot_b64, node["name"], job) + # ── KI-AUGEN: Screenshot analysieren ──────────────────────── + ki_ergebnis = analyse_screenshot( + screenshot_b64, node["name"], job["scanner"], screenshot_id + ) + ki_status = ki_ergebnis["status"] + ki_aktion = ki_ergebnis["aktion"] - # ── Vision-Wahrheitsfilter ──────────────────────────────────── + # ── SOFORT-REAKTION basierend auf KI-Analyse ──────────────── + if ki_aktion == "PAUSE_NODE": + log(f"🛑 {node['name']}/{job['scanner']}: CAPTCHA erkannt — Node pausiert 24h", "WARN") + return "PAUSE_NODE" + + if ki_aktion == "RETRY_COOKIES": + log(f"🍪 {node['name']}/{job['scanner']}: Cookie-Banner erkannt — Retry markiert") + return "RETRY_COOKIES" + + if ki_aktion == "SKIP": + if results: + log(f"⚠ KI-Augen sagt {ki_status}, aber Scraper hat {len(results)} Preise — werden trotzdem verarbeitet", "WARN") + else: + zkey = f"{node['name']}:{job_id}" + _null_ergebnis_zaehler[zkey] = _null_ergebnis_zaehler.get(zkey, 0) + 1 + zaehler = _null_ergebnis_zaehler[zkey] + log(f"⏭ {node['name']}/{job['scanner']}: {ki_status} — Skip ({zaehler}/{ALERT_NACH_N_NULLLAEUFEN})", "WARN") + if zaehler >= ALERT_NACH_N_NULLLAEUFEN: + telegram_send( + f"⚠️ Quelle unzuverlässig\n" + f"{node['name']}/{job['scanner']}: {ki_status}\n" + f"Seit {zaehler} Läufen keine Ergebnisse.\n" + f"KI sagt: {ki_ergebnis.get('details', '?')}" + ) + return False + + # ── Verarbeitung (PRICES_FOUND oder Scraper hat Preise) ─── + zkey = f"{node['name']}:{job_id}" + _null_ergebnis_zaehler[zkey] = 0 + + # Vision: Kabine + Economy-Preis (kayak_multicity spezial) kabine_erkannt = klassifiziere_screenshot(screenshot_b64) - log(f"{node['name']}/{job['scanner']}: Vision → {kabine_erkannt}") - # Für Economy-Suche: Business/First/PE sind Fehlklassifizierungen + if job.get("scanner") == "kayak_multicity" and results and screenshot_b64: + vp = vision_preis_economy_mit_gepaeck(screenshot_b64) + if vp is not None: + first = results[0] + results = [{ + "preis": vp, "waehrung": "EUR", "airline": first.get("airline", "CX"), + "abflug": first.get("abflug", ""), "ankunft": first.get("ankunft", ""), + "booking_url": first.get("booking_url", ""), + "scanner": "kayak_multicity", + }] + kabine_erkannt = "Economy" + log(f"{node['name']}/kayak_multicity: Vision-Preis → {vp:.0f}€ (Economy+Gepäck)") + else: + kabine_erkannt = "Unbekannt" + results = [] + log(f"{node['name']}/kayak_multicity: Vision kein Economy+Gepäck-Preis") + else: + log(f"{node['name']}/{job['scanner']}: Vision → {kabine_erkannt}") + FALSCHE_KABINEN = ("Premium Economy", "Business", "First") if kabine_erkannt in FALSCHE_KABINEN: log(f"⚠ Vision zeigt {kabine_erkannt} statt Economy — Preise markiert", "WARN") - # ───────────────────────────────────────────────────────────── - pruefe_preis_alert(results, job) - pruefe_preisanstieg(results, job) - speichere_preise(results, node["name"], job, screenshot_id, kabine_erkannt) + # KI-Preis-Fallback: Scraper liefert 0, aber KI sieht Preise + ki_preise = ki_ergebnis.get("preise", []) + if not results and ki_preise: + from datetime import datetime as dt + abflug_default = (dt.now() + timedelta(days=job.get("tage", 30))).strftime("%Y-%m-%d") + aufenthalt = job.get("aufenthalt_tage", 60) + rueck_default = (dt.now() + timedelta(days=job.get("tage", 30) + aufenthalt)).strftime("%Y-%m-%d") + results = [{ + "preis": float(p), "waehrung": "EUR", + "airline": job.get("airline_filter", ""), + "abflug": abflug_default, "ankunft": rueck_default, + "booking_url": "", "scanner": job["scanner"], + } for p in ki_preise if isinstance(p, (int, float)) and PREIS_HARD_MIN <= p <= PREIS_HARD_MAX] + if results: + log(f"👁 KI-Fallback: {len(results)} Preise vom Screenshot übernommen (Scraper lieferte 0)") + + try: + pruefe_preis_alert(results, job) + pruefe_preisanstieg(results, job) + speichere_preise(results, node["name"], job, screenshot_id, kabine_erkannt) + except Exception as e: + log(f"Speicher-Fehler {node['name']}/{job['scanner']}: {e}", "ERROR") return True else: log(f"{node['name']}: Fehler {r.status_code} bei {job['scanner']}", "ERROR") @@ -250,50 +402,103 @@ def speichere_screenshot(screenshot_b64, node_name, job): return None -ALERT_SCHWELLE_EUR = 900 # Telegram-Alert wenn CX via HKG unter diesen Preis fällt +ALERT_SCHWELLE_EUR = 900 # Telegram-Alert wenn CX unter diesen Preis fällt def pruefe_preis_alert(results, job): - """Sendet Telegram-Alert wenn kayak_multicity unter Schwelle fällt.""" - if job.get("scanner") != "kayak_multicity": + """Sendet Telegram-Alert wenn CX (via HKG oder direkt) unter Schwelle fällt.""" + if job.get("scanner") not in ("kayak_multicity", "cathay_pacific"): return + label = "CX direkt" if job.get("scanner") == "cathay_pacific" else "CX via HKG" for r in results: if r.get("preis", 9999) < ALERT_SCHWELLE_EUR: preis = r["preis"] abflug = r.get("abflug", "?") url = r.get("booking_url", "") telegram_send( - f"✈️ CX via HKG unter {ALERT_SCHWELLE_EUR}€!\n\n" + f"✈️ {label} unter {ALERT_SCHWELLE_EUR}€!\n\n" f"💰 Preis: {preis:.0f} EUR Roundtrip\n" f"📅 Abflug: {abflug}\n" - f"🔗 Jetzt buchen" + f"🔗 Preis prüfen\n\n" + f"⚠️ Sofort auf Buchungsseite prüfen — Preise ändern sich schnell, " + f"Aggregatoren zeigen teils Economy Light zuerst." ) - log(f"💰 PREIS-ALERT: {preis:.0f}EUR via HKG — Telegram gesendet") + log(f"💰 PREIS-ALERT: {preis:.0f}EUR {label} — Telegram gesendet") break # Nur einmal pro Job-Lauf def speichere_preise(results, node_name, job, screenshot_id=None, kabine_erkannt=None): - # Economy-Suche: PE/Business/First sind Fehlkabinen → disqualifizieren - FALSCHE_KABINEN = ("Premium Economy", "Business", "First") - ist_disqualifiziert = kabine_erkannt in FALSCHE_KABINEN + WEGWERFEN = ("Premium Economy", "Business", "First") + job_will_cx = (job.get("airline_filter") or "").upper() == "CX" or job.get("scanner") == "cathay_pacific" + trip_type = job.get("trip_type", "roundtrip") + def ist_roundtrip(r): + ab, an = r.get("abflug", ""), r.get("ankunft", "") + if not ab or not an: + return False + try: + d_ab = datetime.strptime(ab, "%Y-%m-%d") + d_an = datetime.strptime(an, "%Y-%m-%d") + tage = (d_an - d_ab).days + return 50 <= tage <= 95 + except Exception: + return ab < an + + def ist_cx(r): + if not job_will_cx: + return True + airline = (r.get("airline") or "").upper() + return airline in ("", "CX", "HKG") + + gefiltert = 0 + gespeichert = 0 + korrigiert = 0 conn = get_conn() - for r in results: - plausibel_init = None + try: + for r in results: + preis = r.get("preis", 0) + if preis < PREIS_HARD_MIN or preis > PREIS_HARD_MAX: + gefiltert += 1 + continue + if not ist_roundtrip(r): + gefiltert += 1 + continue + if not ist_cx(r): + gefiltert += 1 + continue + + # Premium Economy / Business / First → komplett anderes Produkt, weg + if kabine_erkannt in WEGWERFEN: + gefiltert += 1 + continue + + # Preiskorrektur: Economy Light/Unbekannt → Gepäckzuschlag draufrechnen + preis_korrigiert = None + korrektur_grund = "" + plausibel_init = None plausi_grund_init = "" - if ist_disqualifiziert: - plausibel_init = 0 - plausi_grund_init = ( - f"[Vision-Filter] Screenshot zeigt {kabine_erkannt} — kein Economy" - ) + + if kabine_erkannt == "Economy": + preis_korrigiert = preis + korrektur_grund = "Economy direkt — kein Zuschlag" + elif kabine_erkannt in ("Economy Light", "Unbekannt", None): + zuschlag = GEPAECK_ZUSCHLAG.get(trip_type, 140) + preis_korrigiert = preis + zuschlag + korrektur_grund = f"{kabine_erkannt or 'Unbekannt'} + {zuschlag}€ Gepäck ({trip_type})" + korrigiert += 1 + # Korrigierter Preis auch plausibel prüfen + if preis_korrigiert < PREIS_HARD_MIN or preis_korrigiert > PREIS_HARD_MAX: + gefiltert += 1 + continue conn.execute(""" INSERT INTO prices (job_id, scanner, node, preis, waehrung, airline, abflug, ankunft, - von, nach, booking_url, screenshot_id, kabine_erkannt, plausibel, plausi_grund) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + von, nach, booking_url, screenshot_id, kabine_erkannt, + plausibel, plausi_grund, preis_korrigiert, korrektur_grund) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( job["id"], r.get("scanner", job["scanner"]), node_name, - r["preis"], r.get("waehrung", "EUR"), r.get("airline", ""), + preis, r.get("waehrung", "EUR"), r.get("airline", ""), r.get("abflug", ""), r.get("ankunft", ""), job["von"], job["nach"], r.get("booking_url", ""), @@ -301,9 +506,23 @@ def speichere_preise(results, node_name, job, screenshot_id=None, kabine_erkannt kabine_erkannt, plausibel_init, plausi_grund_init, + preis_korrigiert, + korrektur_grund, )) - conn.commit() - conn.close() + gespeichert += 1 + + conn.commit() + except Exception as e: + log(f"speichere_preise Fehler: {e}", "ERROR") + finally: + conn.close() + # Logging NACH conn.close() — verhindert DB-Deadlock + if gefiltert > 0: + log(f"[Filter] {gefiltert} Preise aussortiert (Roundtrip/CX/Hard-Limits)") + if korrigiert > 0: + log(f"[Korrektur] {korrigiert} mit Gepäckzuschlag ({kabine_erkannt} → +Gepäck)") + if gespeichert > 0: + log(f"[Speicher] {gespeichert} Preise gespeichert") def scraping_lauf(label="Standard", flex_tage_liste=None): @@ -326,7 +545,7 @@ def scraping_lauf(label="Standard", flex_tage_liste=None): return tage_varianten = flex_tage_liste or [None] - online = fehler = 0 + online = fehler = uebersprungen = 0 for node in nodes: if node_ping(node): @@ -336,10 +555,32 @@ def scraping_lauf(label="Standard", flex_tage_liste=None): skip_set = NODE_SCANNER_SKIP.get(node["name"], set()) if job["scanner"] in skip_set: continue + + # Health-Check: pausierte Quellen überspringen + if source_health_ist_pausiert(node["name"], job["scanner"]): + log(f"⏸ {node['name']}/{job['scanner']}: pausiert — übersprungen") + uebersprungen += 1 + continue + + # Harte Regel: Scan-Limit pro Tag + Mindestabstand + if not _scan_erlaubt(node["name"], job["scanner"]): + uebersprungen += 1 + continue + for tage_var in tage_varianten: try: - ok = dispatch_job(node, job, tage_override=tage_var) - if not ok: + result = dispatch_job(node, job, tage_override=tage_var) + + # Reaktion auf KI-Ergebnis + if result == "PAUSE_NODE": + log(f"⏸ Node {node['name']} pausiert für {job['scanner']}") + break # Nächster Job auf diesem Node + elif result == "RETRY_COOKIES": + log(f"🍪 Retry nach Cookie-Banner: {node['name']}/{job['scanner']}") + retry_result = dispatch_job(node, job, tage_override=tage_var) + if retry_result not in (True, "PAUSE_NODE"): + fehler += 1 + elif not result: fehler += 1 except Exception as e: log(f"Job-Fehler {node['name']}/{job['scanner']}: {e}", "ERROR") @@ -350,7 +591,7 @@ def scraping_lauf(label="Standard", flex_tage_liste=None): dauer = round((datetime.now() - start).total_seconds()) log(f"Scraping [{label}] fertig — {online}/{len(nodes)} Nodes | " - f"{fehler} Fehler | {dauer}s") + f"{fehler} Fehler | {uebersprungen} übersprungen | {dauer}s") try: plausibilitaetspruefung() @@ -378,13 +619,10 @@ def standard_lauf(): def flex_lauf(): - wochentag = datetime.now().weekday() - if wochentag not in (1, 2): - log("Flex-Lauf: heute kein Di/Mi — übersprungen") - return - basis = 30 - flex_varianten = list(range(basis - 3, basis + 4)) - log(f"=== Flex-Lauf Di/Mi ±3 Tage: {flex_varianten} ===") + # Abflug 80-100 Tage mit 10 Tagen Spielraum + basis = 90 + flex_varianten = [80, 85, 90, 95, 100] + log(f"=== Flex-Lauf 80-100d: {flex_varianten} ===") threading.Thread( target=scraping_lauf, kwargs={"label": "Flex-Di/Mi", "flex_tage_liste": flex_varianten}, @@ -409,25 +647,34 @@ def cleanup_lauf(): # ── Telegram Bot Befehle ────────────────────────────────────────────────────── def _cx_preise_jetzt() -> dict: - """Holt aktuellen CX-Multicity-Preis und Vergleichswerte aus DB.""" + """Holt aktuellen CX-Preis (via HKG + direkt) und Vergleichswerte aus DB.""" conn = get_conn() cx = conn.execute(""" SELECT MIN(preis) as min_p, MAX(scraped_at) as zuletzt - FROM prices WHERE scanner='kayak_multicity' - AND (kabine_erkannt != 'Premium Economy' OR kabine_erkannt IS NULL) + FROM prices WHERE scanner IN ('kayak_multicity', 'cathay_pacific') + AND kabine_erkannt = 'Economy' + AND (plausibel = 1 OR plausibel IS NULL) + AND scraped_at >= datetime('now','-3 hours') + """).fetchone() + cx_direkt = conn.execute(""" + SELECT MIN(preis) as min_p + FROM prices WHERE scanner='cathay_pacific' + AND kabine_erkannt = 'Economy' + AND (plausibel = 1 OR plausibel IS NULL) AND scraped_at >= datetime('now','-3 hours') """).fetchone() direkt = conn.execute(""" SELECT MIN(preis) as min_p FROM prices WHERE scanner='kayak' - AND (kabine_erkannt != 'Premium Economy' OR kabine_erkannt IS NULL) + AND kabine_erkannt = 'Economy' + AND (plausibel = 1 OR plausibel IS NULL) AND scraped_at >= datetime('now','-3 hours') """).fetchone() gestern_cx = conn.execute(""" SELECT MIN(preis) as min_p - FROM prices WHERE scanner='kayak_multicity' + FROM prices WHERE scanner IN ('kayak_multicity', 'cathay_pacific') AND date(scraped_at) = date('now','-1 day') - AND (kabine_erkannt != 'Premium Economy' OR kabine_erkannt IS NULL) + AND kabine_erkannt = 'Economy' """).fetchone() ki = conn.execute(""" SELECT ki_empfehlung, ki_analyse FROM analyses @@ -436,6 +683,7 @@ def _cx_preise_jetzt() -> dict: conn.close() return { "cx_min": cx["min_p"] if cx else None, + "cx_direkt": cx_direkt["min_p"] if cx_direkt else None, "cx_zuletzt": cx["zuletzt"][:16] if cx and cx["zuletzt"] else "?", "direkt_min": direkt["min_p"] if direkt else None, "gestern_cx": gestern_cx["min_p"] if gestern_cx else None, @@ -444,15 +692,16 @@ def _cx_preise_jetzt() -> dict: } -def _top3_heute() -> list: - """Top 3 günstigste Multicity-Treffer heute.""" +def _top5_heute() -> list: + """Top 5 günstigste Economy-Treffer (kayak_multicity + cathay_pacific).""" conn = get_conn() rows = conn.execute(""" - SELECT preis, abflug, ankunft, booking_url - FROM prices WHERE scanner='kayak_multicity' - AND (kabine_erkannt != 'Premium Economy' OR kabine_erkannt IS NULL) + SELECT preis, abflug, ankunft, booking_url, scanner, node, scraped_at + FROM prices WHERE scanner IN ('kayak_multicity', 'cathay_pacific') + AND kabine_erkannt = 'Economy' + AND (plausibel = 1 OR plausibel IS NULL) AND scraped_at >= datetime('now','-3 hours') - ORDER BY preis ASC LIMIT 3 + ORDER BY preis ASC LIMIT 5 """).fetchall() conn.close() return [dict(r) for r in rows] @@ -465,32 +714,40 @@ def handle_bot_command(text: str, chat_id: str): if cmd == "/preis": cx = d["cx_min"] + cx_direkt = d["cx_direkt"] direkt = d["direkt_min"] gestern = d["gestern_cx"] trend = "" if cx and gestern: diff = cx - gestern trend = f"↗️ +{diff:.0f}€ vs. gestern" if diff > 0 else f"↘️ {diff:.0f}€ vs. gestern" - aufpreis = f"+{cx-direkt:.0f}€ vs. Direktflug" if cx and direkt else "" + cx_zeile = f"💰 {cx:.0f} EUR Roundtrip {trend}\n" + if cx_direkt is not None: + cx_zeile += f"🔵 CX direkt: {cx_direkt:.0f} EUR\n" + if direkt is not None: + cx_zeile += f"📊 Kayak (Aggregator): {direkt:.0f} EUR\n" msg = ( - f"✈️ CX via HKG — aktueller Preis\n\n" - f"💰 {cx:.0f} EUR Roundtrip {trend}\n" - f"🔵 Direktflug: {direkt:.0f} EUR ({aufpreis})\n" - f"🕐 Letzter Scan: {d['cx_zuletzt']}\n\n" + f"✈️ CX Economy (via HKG + direkt)\n\n" + f"{cx_zeile}" + f"🕐 Gültig bei Scan: {d['cx_zuletzt']}\n\n" + f"⚠️ Preise auf Buchungsseiten können abweichen.\n" f"KI: {d['ki_empf']}" - ) if cx else "⏳ Noch keine Daten im aktuellen Scan-Fenster." + ) if cx else "⏳ Keine Economy-Daten (nur Economy, kein Light) im Fenster." elif cmd == "/best": - top3 = _top3_heute() - if not top3: - msg = "⏳ Noch keine Treffer im aktuellen Scan-Fenster." + top5 = _top5_heute() + if not top5: + msg = "⏳ Keine Economy-Treffer im Fenster (nur Economy mit Gepäck)." else: - zeilen = "\n".join([ - f"{i+1}. {r['preis']:.0f}€ — Abflug {r['abflug']} " - f"buchen" - for i, r in enumerate(top3) - ]) - msg = f"🏆 Top 3 CX via HKG heute\n\n{zeilen}" + zeilen = [] + for i, r in enumerate(top5): + scan_zeit = r.get('scraped_at', '')[:16] if r.get('scraped_at') else '?' + zeilen.append( + f"{i+1}. {r['preis']:.0f}€ {r['abflug']}→{r['ankunft']} " + f"({r.get('scanner','?')}/{r.get('node','?')}) — buchen" + ) + msg = f"🏆 Top 5 CX Economy\n\n" + "\n".join(zeilen) + msg += "\n\n⚠️ Preise zum Scan-Zeitpunkt — Buchungsseiten können abweichen." elif cmd == "/status": conn = get_conn() @@ -514,8 +771,8 @@ def handle_bot_command(text: str, chat_id: str): else: msg = ( "✈️ CX HKG Alert Bot\n\n" - "/preis — Aktueller CX-Preis + Trend\n" - "/best — Top 3 günstigste Treffer heute\n" + "/preis — CX Economy-Preis + Trend\n" + "/best — Top 5 günstigste (nur Economy)\n" "/status — Nodes, Scans, Guthaben" ) @@ -575,7 +832,7 @@ def telegram_polling(): def morgenbericht(): """Täglich 07:00: Tagesüberblick per Telegram.""" d = _cx_preise_jetzt() - top3 = _top3_heute() + top5 = _top5_heute() cx = d["cx_min"] gestern = d["gestern_cx"] @@ -590,14 +847,14 @@ def morgenbericht(): empf_farbe = {"JETZT BUCHEN": "🟢", "WARTEN": "🔴", "NEUTRAL": "🟡"}.get(d["ki_empf"], "⚪") top_str = "" - if top3: + if top5: top_str = "\n🏆 Beste Angebote:\n" + "\n".join([ f" {i+1}. {r['preis']:.0f}€ — {r['abflug']} buchen" - for i, r in enumerate(top3) + for i, r in enumerate(top5) ]) msg = ( - f"☀️ Guten Morgen — CX via HKG\n\n" + f"☀️ Guten Morgen — CX Economy\n\n" f"💰 Heute ab {cx:.0f} EUR {trend_str}\n" f"{empf_farbe} KI-Empfehlung: {d['ki_empf']}\n" f"{top_str}" @@ -611,9 +868,9 @@ def morgenbericht(): _letzter_cx_preis: float = 0.0 def pruefe_preisanstieg(results, job): - """Alert wenn CX via HKG um mehr als 50€ gestiegen ist.""" + """Alert wenn CX (via HKG oder direkt) um mehr als 50€ gestiegen ist.""" global _letzter_cx_preis - if job.get("scanner") != "kayak_multicity" or not results: + if job.get("scanner") not in ("kayak_multicity", "cathay_pacific") or not results: return aktuell = min(r["preis"] for r in results) if _letzter_cx_preis > 0 and aktuell > _letzter_cx_preis + 50: @@ -629,12 +886,91 @@ def pruefe_preisanstieg(results, job): _letzter_cx_preis = aktuell +def tagesbilanz(): + """Täglicher Report um 20:00: Was war heute los?""" + conn = get_conn() + + # Erfolgreiche Scans heute + erfolge = conn.execute(""" + SELECT COUNT(*) as n FROM scan_results + WHERE ki_status='PRICES_FOUND' AND date(created_at)=date('now') + """).fetchone()["n"] + + # Fehlgeschlagene Scans heute + fehler_rows = conn.execute(""" + SELECT ki_status, COUNT(*) as n FROM scan_results + WHERE ki_status != 'PRICES_FOUND' AND date(created_at)=date('now') + GROUP BY ki_status + """).fetchall() + + total_scans = erfolge + sum(r["n"] for r in fehler_rows) + + # Bester Preis heute (korrigiert) + best = conn.execute(""" + SELECT MIN(preis_korrigiert) as min_p, scanner, node FROM prices + WHERE date(scraped_at)=date('now') + AND preis_korrigiert IS NOT NULL + AND (plausibel=1 OR plausibel IS NULL) + """).fetchone() + + # Gestern zum Vergleich + gestern_best = conn.execute(""" + SELECT MIN(preis_korrigiert) as min_p FROM prices + WHERE date(scraped_at)=date('now', '-1 day') + AND preis_korrigiert IS NOT NULL + AND (plausibel=1 OR plausibel IS NULL) + """).fetchone() + + # Node-Gesundheit + health = source_health_get_all() + + conn.close() + + # Telegram-Nachricht bauen + fehler_str = "" + if fehler_rows: + fehler_str = "\n".join([f" ❌ {r['ki_status']}: {r['n']}x" for r in fehler_rows]) + else: + fehler_str = " ✅ Keine Probleme" + + preis_str = "Keine Preise heute" + if best and best["min_p"]: + preis_str = f"{best['min_p']:.0f}€ via {best['scanner']} ({best['node']})" + if gestern_best and gestern_best["min_p"]: + diff = best["min_p"] - gestern_best["min_p"] + preis_str += f" ({diff:+.0f}€ vs. gestern)" + + node_str = "" + for h in health: + status_icon = {"healthy": "✅", "unhealthy": "⚠️", "paused": "⏸"}.get(h["status"], "❓") + node_str += f" {status_icon} {h['node']}/{h['scanner']}: {h['erfolge_heute']}/{h['erfolge_heute']+h['fehler_heute']}\n" + + msg = ( + f"📊 Flugscanner Tagesbilanz\n\n" + f"✅ Erfolgreiche Scans: {erfolge}/{total_scans}\n" + f"{fehler_str}\n\n" + f"🏆 Bester Preis: {preis_str}\n\n" + f"Node-Status:\n{node_str}" + ) + telegram_send(msg) + log("Tagesbilanz gesendet") + + +def tagesreset(): + """Täglicher Reset der Health-Zähler um Mitternacht.""" + source_health_reset_daily() + log("Source-Health Tageszähler zurückgesetzt") + + def run(): init_db() log("Scheduler gestartet") - # Zufälliges Intervall 25-45 Minuten — Anti-Detection - schedule.every(25).to(45).minutes.do(standard_lauf) + # 3 feste Scan-Fenster pro Tag (statt alle 20 Min) + schedule.every().day.at("06:30").do(standard_lauf) + schedule.every().day.at("12:30").do(standard_lauf) + schedule.every().day.at("18:30").do(standard_lauf) + log("Scan-Fenster: 06:30, 12:30, 18:30") # Di + Mi 23:30: Flex-Lauf ±3 Tage schedule.every().day.at("23:30").do(flex_lauf) @@ -650,8 +986,14 @@ def run(): schedule.every().day.at("07:00").do(morgenbericht) log("Morgenbericht: täglich 07:00 Uhr") + # Täglich 20:00: Tagesbilanz + schedule.every().day.at("20:00").do(tagesbilanz) + log("Tagesbilanz: täglich 20:00 Uhr") + + # Mitternacht: Health-Zähler Reset + schedule.every().day.at("00:05").do(tagesreset) + log(f"Nächster Lauf: {str(schedule.jobs[0].next_run)[:16]}") - log(f"Scan-Intervall: zufällig 25-45 Minuten (Anti-Bot)") # Telegram Bot Polling in eigenem Thread threading.Thread(target=telegram_polling, daemon=True).start() diff --git a/node/src/worker.py b/node/src/worker.py index 61b69f5..fc81dc0 100644 --- a/node/src/worker.py +++ b/node/src/worker.py @@ -3,7 +3,9 @@ from datetime import datetime, timedelta import re # ── Qualitätsschwellen ──────────────────────────────────────────────────────── -# PE Roundtrip FRA→KTI mit Gepäck: realistisch ab ~800€ +# CX Economy Roundtrip FRA→KTI: 600–1400€ | PE: 700–12000€ +MIN_PREIS_ECONOMY_ROUNDTRIP = 600 +MAX_PREIS_ECONOMY_ROUNDTRIP = 1400 MIN_PREIS_PE_ROUNDTRIP = 700 MAX_PREIS_PE_ROUNDTRIP = 12000 @@ -14,14 +16,23 @@ def _scrape_disabled(*args, **kwargs): return [], "" -def _validate_results(results, scanner_name, kabine="premium_economy"): +def _validate_results(results, scanner_name, kabine="economy"): """Qualitätskontrolle: filtert unplausible Preise raus.""" - if kabine == "premium_economy": + if kabine == "economy": + before = len(results) + results = [r for r in results + if MIN_PREIS_ECONOMY_ROUNDTRIP <= r["preis"] <= MAX_PREIS_ECONOMY_ROUNDTRIP] + dropped = before - len(results) + if dropped: + print(f"[QC/{scanner_name}] {dropped} Preise außerhalb " + f"{MIN_PREIS_ECONOMY_ROUNDTRIP}-{MAX_PREIS_ECONOMY_ROUNDTRIP}€ entfernt") + elif kabine == "premium_economy": before = len(results) results = [r for r in results if MIN_PREIS_PE_ROUNDTRIP <= r["preis"] <= MAX_PREIS_PE_ROUNDTRIP] dropped = before - len(results) if dropped: - print(f"[QC/{scanner_name}] {dropped} Preise außerhalb {MIN_PREIS_PE_ROUNDTRIP}-{MAX_PREIS_PE_ROUNDTRIP}€ entfernt (vermutlich Economy oder Fehler)") + print(f"[QC/{scanner_name}] {dropped} Preise außerhalb " + f"{MIN_PREIS_PE_ROUNDTRIP}-{MAX_PREIS_PE_ROUNDTRIP}€ entfernt") return results @@ -41,6 +52,32 @@ def _check_cabin_on_page(body, title, kabine="premium_economy"): return True +def _filter_roundtrip_only(results): + """Entfernt One-Way/unpassende Daten: nur Roundtrip mit 50–95 Tagen Aufenthalt.""" + # Aufenthalt 2–3 Monate: 50–95 Tage zwischen Hin- und Rückflug + MIN_AUFENTHALT = 50 + MAX_AUFENTHALT = 95 + filtered = [] + for r in results: + ab, an = r.get("abflug", ""), r.get("ankunft", "") + if not ab or not an: + continue + if an <= ab: + continue + try: + d_ab = datetime.strptime(ab, "%Y-%m-%d") + d_an = datetime.strptime(an, "%Y-%m-%d") + tage = (d_an - d_ab).days + if MIN_AUFENTHALT <= tage <= MAX_AUFENTHALT: + filtered.append(r) + except (ValueError, TypeError): + pass + dropped = len(results) - len(filtered) + if dropped: + print(f"[QC] {dropped} Daten aussortiert (Aufenthalt außerhalb {MIN_AUFENTHALT}-{MAX_AUFENTHALT} Tage)") + return filtered + + def scrape(scanner, von, nach, tage=30, aufenthalt_tage=60, trip_type="roundtrip", kabine="premium_economy", gepaeck="1koffer+handgepaeck", airline_filter="", @@ -53,7 +90,7 @@ def scrape(scanner, von, nach, tage=30, aufenthalt_tage=60, screenshot_b64 = JPEG Full-Page Screenshot als base64-String (leer wenn Fehler) """ dispatcher = { - "google_flights": scrape_google_flights, + "google_flights": _scrape_disabled, "kayak": scrape_kayak, "kayak_multicity": scrape_kayak_multicity, "momondo": scrape_momondo, @@ -66,10 +103,35 @@ def scrape(scanner, von, nach, tage=30, aufenthalt_tage=60, if not fn: raise ValueError(f"Unbekannter Scanner: {scanner}") if scanner == "kayak_multicity": - return fn(von, nach, tage, aufenthalt_tage, kabine, gepaeck, - airline_filter, via, stopover_min_h, stopover_max_h) - return fn(von, nach, tage, aufenthalt_tage, trip_type, kabine, gepaeck, - airline_filter, layover_min, layover_max, max_flugzeit_h, max_stops) + results, screenshot_b64 = fn(von, nach, tage, aufenthalt_tage, kabine, gepaeck, + airline_filter, via, stopover_min_h, stopover_max_h) + else: + results, screenshot_b64 = fn(von, nach, tage, aufenthalt_tage, trip_type, kabine, gepaeck, + airline_filter, layover_min, layover_max, max_flugzeit_h, max_stops) + results = _filter_roundtrip_only(results) + return results, screenshot_b64 + + +def _dismiss_cookie_banner(sb): + """Cookie-/Consent-Banner wegklicken — für saubere Screenshots.""" + # Kayak/Momondo: "Alle akzeptieren" Button (häufigstes Format) + for sel in [ + '//button[contains(., "Alle akzeptieren")]', + '//button[contains(., "Accept all")]', + '.kayak-consent-button', '#cookie-accept', '[data-testid="cookie-banner"]', + '#onetrust-accept-btn-handler', 'button[class*="accept"]', + 'button[title*="akzeptieren"]', '.evidon-banner-acceptbutton', + '.RxNS-button-content', 'button[id*="accept"]', + 'button[aria-label*="Accept"]', '[aria-label*="Akzeptieren"]', + ]: + try: + sb.click(sel, timeout=2) + print(f"[Cookie] Geklickt: {sel[:50]}") + sb.sleep(3) + return True + except Exception: + pass + return False def _take_screenshot(sb): @@ -96,14 +158,23 @@ def _take_screenshot(sb): def _booking_url_google(von, nach, abflug, rueck, kc): # Hash-Fragment wird von headless Chrome ignoriert → tfs-Parameter nutzen if rueck: - return (f"https://www.google.com/travel/flights?hl=en&curr=EUR" + return (f"https://www.google.com/travel/flights?hl=de&curr=EUR" f"#flt={von}.{nach}.{abflug}*{nach}.{von}.{rueck};c:EUR;e:1;sd:1;t:r;sc:{kc}") - return (f"https://www.google.com/travel/flights?hl=en&curr=EUR" + return (f"https://www.google.com/travel/flights?hl=de&curr=EUR" f"#flt={von}.{nach}.{abflug};c:EUR;e:1;sd:1;t:f;sc:{kc}") -def _kayak_filters(bags, layover_min, layover_max, max_flugzeit_h, max_stops, airline): - """Gemeinsame Filter-Logik für alle Kayak-URL-Funktionen.""" +def _booking_url_kayak(von, nach, abflug, rueck, kc, bags=1, + layover_min=120, layover_max=300, airline="", + max_flugzeit_h=22, max_stops=2): + """ + Kayak fs-Filter: + bfc=1 → min. 1 Freigepäck inklusive + ctr=120,300 → Umstiegszeit 2–5 Stunden (Minuten) + duration=-1320 → Max. Gesamtflugzeit (Minuten, hier 22h) + s=2 → Max. 2 Stopps + airlines=XX → Airline-Code (CZ, CX, SQ, TG …) + """ filters = [] if bags: filters.append(f"bfc%3D{bags}") @@ -115,47 +186,13 @@ def _kayak_filters(bags, layover_min, layover_max, max_flugzeit_h, max_stops, ai filters.append(f"s%3D{max_stops}") if airline: filters.append(f"airlines%3D{airline}") - return ("&fs=" + "%3B".join(filters)) if filters else "" - - -def _scrape_url_kayak(von, nach, abflug, rueck, kc, bags=1, - layover_min=120, layover_max=300, airline="", - max_flugzeit_h=22, max_stops=2): - """Interne Scraping-URL (kayak.de — bekannte HTML-Struktur).""" - fs = _kayak_filters(bags, layover_min, layover_max, max_flugzeit_h, max_stops, airline) + fs = ("&fs=" + "%3B".join(filters)) if filters else "" base = f"https://www.kayak.de/flights/{von}-{nach}/{abflug}" if rueck: return f"{base}/{rueck}?sort=price_a&cabin={kc}¤cy=EUR{fs}" return f"{base}?sort=price_a&cabin={kc}¤cy=EUR{fs}" -def _booking_url_kayak(von, nach, abflug, rueck, kc, bags=1, - layover_min=120, layover_max=300, airline="", - max_flugzeit_h=22, max_stops=2): - """User-facing Booking-URL (kayak.com international, kein DE-Aufschlag).""" - fs = _kayak_filters(bags, layover_min, layover_max, max_flugzeit_h, max_stops, airline) - base = f"https://www.kayak.com/flights/{von}-{nach}/{abflug}" - if rueck: - return f"{base}/{rueck}?sort=price_a&cabin={kc}¤cy=EUR{fs}" - return f"{base}?sort=price_a&cabin={kc}¤cy=EUR{fs}" - - -def _consent_kayak(sb): - """Kayak/Momondo GDPR-Consent wegklicken.""" - for sel in ['#didomi-notice-agree-button', 'button[class*="accept"]', - 'button[class*="agree"]', '[data-testid*="accept"]', - 'button[id*="accept"]', '.RxNS-button-content', - 'button[aria-label*="akzeptieren"]', 'button[aria-label*="Alle"]']: - try: - sb.find_element(sel, timeout=2).click() - print(f"[CONSENT] Kayak Consent geklickt: {sel}") - sb.sleep(3) - return True - except Exception: - pass - return False - - def _booking_url_momondo(von, nach, abflug, rueck, kc, bags=1, layover_min=120, layover_max=300, airline="", max_flugzeit_h=22, max_stops=2): @@ -172,36 +209,20 @@ def _booking_url_momondo(von, nach, abflug, rueck, kc, bags=1, if airline: filters.append(f"airlines%3D{airline}") fs = ("&fs=" + "%3B".join(filters)) if filters else "" - base = f"https://www.momondo.com/flight-search/{von}-{nach}/{abflug}" - if rueck: - return f"{base}/{rueck}?sort=price_a&cabin={kc}¤cy=EUR{fs}" - return f"{base}?sort=price_a&cabin={kc}¤cy=EUR{fs}" - - - -def _scrape_url_momondo(von, nach, abflug, rueck, kc, bags=1, - layover_min=120, layover_max=300, airline="", - max_flugzeit_h=22, max_stops=2): - filters = [] - if bags: filters.append(f"bfc%3D{bags}") - if layover_min and layover_max: filters.append(f"ctr%3D{layover_min}%2C{layover_max}") - if max_flugzeit_h: filters.append(f"duration%3D-{max_flugzeit_h * 60}") - if max_stops is not None and max_stops < 10: filters.append(f"s%3D{max_stops}") - if airline: filters.append(f"airlines%3D{airline}") - fs = ("&fs=" + "%3B".join(filters)) if filters else "" base = f"https://www.momondo.de/flight-search/{von}-{nach}/{abflug}" if rueck: return f"{base}/{rueck}?sort=price_a&cabin={kc}¤cy=EUR{fs}" return f"{base}?sort=price_a&cabin={kc}¤cy=EUR{fs}" -def _booking_url_trip(von, nach, abflug_fmt, rueck_fmt, kc, von_name, nach_name): + +def _booking_url_trip(von, nach, abflug_fmt, rueck_fmt, kc, von_name, nach_name, airline=""): + params = f"DDate1={abflug_fmt}&class={kc}&curr=EUR" if rueck_fmt: - return (f"https://www.trip.com/flights/{von_name}-to-{nach_name}/" - f"tickets-{von.lower()}-{nach.lower()}/" - f"?DDate1={abflug_fmt}&DDate2={rueck_fmt}&class={kc}&curr=EUR") + params += f"&DDate2={rueck_fmt}" + if airline: + params += f"&airline={airline}" return (f"https://www.trip.com/flights/{von_name}-to-{nach_name}/" - f"tickets-{von.lower()}-{nach.lower()}/" - f"?DDate1={abflug_fmt}&class={kc}&curr=EUR") + f"tickets-{von.lower()}-{nach.lower()}/?{params}") # ── Kabinen-Codes ────────────────────────────────────────────────────────────── @@ -230,10 +251,8 @@ def _parse_preis(text): def _preise_aus_body(body, scanner, abflug): results = [] seen = set() - # Normalisierung: thin/non-breaking spaces → reguläre Leerzeichen - body_norm = body.replace('\xa0', ' ').replace('\u202f', ' ').replace('\u00a0', ' ') - for m in re.finditer(r'(\d{1,2}[.,]\d{3}|\d[\d\s\.]{1,5})\s?€|€\s?(\d[\d\s\.]{1,5})', body_norm): - raw = (m.group(1) or m.group(2)).strip().replace(' ', '').replace('.', '').replace(',', '') + for m in re.finditer(r'(\d[\d\s\.]{1,5})\s?€|€\s?(\d[\d\s\.]{1,5})', body): + raw = (m.group(1) or m.group(2)).replace(' ', '').replace('.', '') try: v = float(raw) if 300 < v < 12000 and v not in seen: @@ -250,13 +269,10 @@ def _preise_aus_body(body, scanner, abflug): def _consent_google(sb): """Google Consent-Seite (DSGVO) behandeln.""" - title = sb.get_title() - url = sb.get_current_url() - if "consent" in url or "Bevor Sie" in title or "Before you" in title: + if "consent" in sb.get_current_url() or "Bevor Sie" in sb.get_title(): print("[CONSENT] Google Consent erkannt") for sel in ['form[action*="save"] button', 'button[jsname="tHlp8d"]', - '.lssxud button', 'button[aria-label*="kzeptieren"]', - 'button[aria-label*="Accept all"]', 'button[aria-label*="Accept"]']: + '.lssxud button', 'button[aria-label*="kzeptieren"]']: try: sb.click(sel, timeout=3) sb.sleep(4) @@ -323,66 +339,52 @@ def scrape_google_flights(von, nach, tage=30, aufenthalt_tage=60, print(f"[GF] Suche: {von_name}→{nach_name} {abflug_de}") with SB(uc=True, headless=True, chromium_arg="--no-sandbox --disable-dev-shm-usage") as sb: - # Hash-Fragment URL (wird nach Consent-Redirect verloren — daher 2-Schritt) + # ── Strategie 1: Direkte URL mit Datums-Parametern ───────────────── + # Google Flights verarbeitet den Hash-Fragment erst nach JS-Ausführung direct_url = ( - f"https://www.google.com/travel/flights?hl=en&curr=EUR" + f"https://www.google.com/travel/flights?hl=de&curr=EUR" f"#flt={von}.{nach}.{abflug}*{nach}.{von}.{rueck}" - f";c:EUR;e:1;sd:1;t:r;sc:e" + f";c:EUR;e:1;sd:1;t:r;sc:w" ) if rueck else ( - f"https://www.google.com/travel/flights?hl=en&curr=EUR" - f"#flt={von}.{nach}.{abflug};c:EUR;e:1;sd:1;t:f;sc:e" + f"https://www.google.com/travel/flights?hl=de&curr=EUR" + f"#flt={von}.{nach}.{abflug};c:EUR;e:1;sd:1;t:f;sc:w" ) - - # ── Schritt 1: Consent zuerst auf der Basis-URL akzeptieren ───────── - # Consent-Redirect von consent.google.com strippt den #-Fragment. - # Lösung: Consent einmal auf Basisseite akzeptieren, dann Hash-URL öffnen. - sb.open("https://www.google.com/travel/flights?hl=en&curr=EUR") - sb.sleep(6) - consented = _consent_google(sb) - if consented: - print("[GF] Consent akzeptiert — öffne jetzt Hash-URL") - sb.sleep(3) - - # ── Schritt 2: Jetzt Hash-URL mit Suchparametern öffnen ───────────── sb.open(direct_url) - sb.sleep(12) + sb.sleep(8) + _consent_google(sb) + sb.sleep(3) title_direct = sb.get_title() - url_now = sb.get_current_url() - print(f"[GF] Titel: {title_direct[:60]}") - print(f"[GF] URL: {url_now[:80]}") + print(f"[GF] URL-Ansatz: {title_direct[:60]}") - # Wenn Hash-Deeplink Ergebnisse liefert + # Wenn direkte URL Ergebnisse liefert (Titel enthält Städtenamen) url_erfolgreich = any(kw in title_direct for kw in - [von, nach, "FRA", "KTI", "Frankfurt", "Phnom", "Flights to", "Flüge"]) + [von, nach, "FRA", "KTI", "Frankfurt", "Phnom", "Flüge"]) if not url_erfolgreich: - # ── Fallback: Formular manuell befüllen ───────────────────────── - print("[GF] Hash-URL kein Ergebnis — wechsle zu Formular-Ansatz") - sb.open("https://www.google.com/travel/flights?hl=en&curr=EUR") - sb.sleep(4) + # ── Strategie 2: Startseite + Formular befüllen ───────────────── + print("[GF] Direktlink kein Ergebnis — wechsle zu Formular-Ansatz") + sb.open("https://www.google.com/travel/flights?hl=de&curr=EUR") + sb.sleep(5) + _consent_google(sb) + sb.sleep(2) - # ── 1. Kabine auf Economy setzen (Standard — meist schon vorausgewählt) ── - # Economy = data-value="1" in Google Flights Dropdown - # Nur klicken falls aktuell etwas anderes ausgewählt ist + # ── 1. Kabine auf "Premium Economy" setzen ────────────────────────── try: + # VfPpkd-Buttons: [0]=Hin+Rück [1]=Economy(Klasse) btns = sb.find_elements('button[class*="VfPpkd"]') if len(btns) >= 2: - cabin_btn = btns[1] - cabin_text = cabin_btn.text.lower() - if "economy" not in cabin_text or "premium" in cabin_text: - cabin_btn.click() - sb.sleep(1) - for opt_sel in ['[data-value="1"]', - 'li[class*="economy"]:first-child', - '[role="option"]:nth-child(2)']: - try: - sb.find_element(opt_sel, timeout=2).click() - sb.sleep(0.5) - print(f"[GF] Economy gesetzt via {opt_sel}") - break - except Exception: - pass - else: - print("[GF] Economy bereits ausgewählt") + btns[1].click() + sb.sleep(1) + # Option "Premium Economy" im Dropdown auswählen + for opt_sel in ['[data-value="2"]', + 'li[class*="premium"]', + '[role="option"]:nth-child(3)']: + try: + sb.find_element(opt_sel, timeout=2).click() + sb.sleep(0.5) + print(f"[GF] Kabine gesetzt via {opt_sel}") + break + except Exception: + pass except Exception as e: print(f"[GF] Kabine: {e}") @@ -565,29 +567,25 @@ def scrape_kayak(von, nach, tage=30, aufenthalt_tage=60, rueck = (datetime.now() + timedelta(days=tage + aufenthalt_tage)).strftime("%Y-%m-%d") if trip_type == "roundtrip" else "" kc = KABINE_KAYAK.get(kabine, "w") bags = 1 if "koffer" in gepaeck else 0 - scrape_url = _scrape_url_kayak(von, nach, abflug, rueck, kc, bags, - layover_min, layover_max, airline_filter, - max_flugzeit_h, max_stops) booking_url = _booking_url_kayak(von, nach, abflug, rueck, kc, bags, layover_min, layover_max, airline_filter, max_flugzeit_h, max_stops) airline_label = f" [{airline_filter}]" if airline_filter else "" - print(f"[KY{airline_label}] Scrape: {scrape_url[:80]}") + print(f"[KY{airline_label}] URL: {booking_url}") results = [] with SB(uc=True, headless=True, chromium_arg="--no-sandbox --disable-dev-shm-usage") as sb: - sb.open(scrape_url) - sb.sleep(8) - _consent_kayak(sb) + sb.open(booking_url) sb.sleep(15) + _dismiss_cookie_banner(sb) + sb.sleep(4) title = sb.get_title() body = sb.get_text("body") print(f"[KY] Title: {title[:80]}") - for sel in ['div[class*="hYzH-price"]', 'div[class*="e2GB-price-text"]', - '.price-text', '.f8F1-price-text', 'div[class*="price"] span', + for sel in ['.price-text', '.f8F1-price-text', 'div[class*="price"] span', 'span[class*="price"]', '.Iqt3', 'div.nrc6-price', '.price']: try: elems = sb.find_elements(sel, timeout=2) @@ -617,8 +615,10 @@ def scrape_kayak(von, nach, tage=30, aufenthalt_tage=60, if not pe_confirmed: print(f"[KY{airline_label}] WARNUNG: Premium Economy nicht auf Seite bestätigt!") - results = _validate_results(results, f"kayak{airline_label}", "premium_economy") + results = _validate_results(results, f"kayak{airline_label}", kabine) print(f"[KY{airline_label}] Ergebnis: {[r['preis'] for r in results[:5]]}") + _dismiss_cookie_banner(sb) + sb.sleep(3) screenshot_b64 = _take_screenshot(sb) return results[:10], screenshot_b64 @@ -639,7 +639,8 @@ def scrape_trip(von, nach, tage=30, aufenthalt_tage=60, von_name = stadtname.get(von, von.lower()) nach_name = stadtname.get(nach, nach.lower()) - booking_url = _booking_url_trip(von, nach, abflug_fmt, rueck_fmt, kc, von_name, nach_name) + booking_url = _booking_url_trip(von, nach, abflug_fmt, rueck_fmt, kc, von_name, nach_name, + airline_filter) print(f"[TR] URL: {booking_url}") results = [] @@ -687,8 +688,10 @@ def scrape_trip(von, nach, tage=30, aufenthalt_tage=60, if not pe_confirmed: print("[TR] WARNUNG: Premium Economy nicht auf Seite bestätigt!") - results = _validate_results(results, "trip", "premium_economy") + results = _validate_results(results, "trip", kabine) print(f"[TR] Ergebnis: {[r['preis'] for r in results[:5]]}") + _dismiss_cookie_banner(sb) + sb.sleep(2) screenshot_b64 = _take_screenshot(sb) return results[:10], screenshot_b64 @@ -696,15 +699,8 @@ def scrape_trip(von, nach, tage=30, aufenthalt_tage=60, def _booking_url_kayak_multicity(von, nach, via, abflug, via_datum, rueck, kc, bags=1, airline=""): """ Kayak Multi-City URL: FRA→HKG/DATE1 → HKG→KTI/DATE2 → KTI→FRA/DATE3 - Bei CX: direkt auf cathaypacific.com verlinken (günstiger, keine Aufschläge). + Kabinen-Code: w=Premium Economy """ - if airline.upper() == "CX": - # Google Flights Multi-City mit CX-Filter — präziser Deeplink, kein Aufschlag - return ( - f"https://www.google.com/travel/flights?hl=en&curr=EUR" - f"#flt={von}.{via}.{abflug}*{via}.{nach}.{via_datum}*{nach}.{von}.{rueck}" - f";c:EUR;e:1;sd:1;t:m;a:CX" - ) filters = [] if bags: filters.append(f"bfc%3D{bags}") @@ -712,25 +708,13 @@ def _booking_url_kayak_multicity(von, nach, via, abflug, via_datum, rueck, kc, b filters.append(f"airlines%3D{airline}") fs = ("&fs=" + "%3B".join(filters)) if filters else "" # Kayak Multi-City Format: /flights/FRA-HKG/DATE/HKG-KTI/DATE/KTI-FRA/DATE - return (f"https://www.kayak.com/flights" - f"/{von}-{via}/{abflug}" - f"/{via}-{nach}/{via_datum}" - f"/{nach}-{von}/{rueck}" - f"?sort=price_a&cabin={kc}¤cy=EUR{fs}") - - - -def _scrape_url_kayak_multicity(von, nach, via, abflug, via_datum, rueck, kc, bags=1, airline=""): - filters = [] - if bags: filters.append(f"bfc%3D{bags}") - if airline: filters.append(f"airlines%3D{airline}") - fs = ("&fs=" + "%3B".join(filters)) if filters else "" return (f"https://www.kayak.de/flights" f"/{von}-{via}/{abflug}" f"/{via}-{nach}/{via_datum}" f"/{nach}-{von}/{rueck}" f"?sort=price_a&cabin={kc}¤cy=EUR{fs}") + def scrape_kayak_multicity(von, nach, tage=30, aufenthalt_tage=60, kabine="premium_economy", gepaeck="1koffer+handgepaeck", @@ -747,28 +731,25 @@ def scrape_kayak_multicity(von, nach, tage=30, aufenthalt_tage=60, bags = 1 if "koffer" in gepaeck else 0 airline_label = f" [{airline_filter}]" if airline_filter else "" - scrape_url = _scrape_url_kayak_multicity(von, nach, via, abflug, via_datum, rueck, - kc, bags, airline_filter) booking_url = _booking_url_kayak_multicity(von, nach, via, abflug, via_datum, rueck, - kc, bags, airline_filter) + kc, bags, airline_filter) print(f"[MC{airline_label}] Multi-City via {via}: {abflug} → +1T → {rueck}") - print(f"[MC{airline_label}] Scrape: {scrape_url[:80]}") + print(f"[MC{airline_label}] URL: {booking_url}") results = [] with SB(uc=True, headless=True, chromium_arg="--no-sandbox --disable-dev-shm-usage") as sb: - sb.open(scrape_url) - sb.sleep(8) - _consent_kayak(sb) + sb.open(booking_url) sb.sleep(15) + _dismiss_cookie_banner(sb) + sb.sleep(4) title = sb.get_title() body = sb.get_text("body") print(f"[MC] Title: {title[:80]}") - for sel in ['div[class*="hYzH-price"]', 'div[class*="e2GB-price-text"]', - '.price-text', '.f8F1-price-text', 'div[class*="price"] span', + for sel in ['.price-text', '.f8F1-price-text', 'div[class*="price"] span', 'span[class*="price"]', '.Iqt3', 'div.nrc6-price', '.price']: try: elems = sb.find_elements(sel, timeout=2) @@ -798,8 +779,10 @@ def scrape_kayak_multicity(von, nach, tage=30, aufenthalt_tage=60, r["airline"] = airline_filter or via results.append(r) - results = _validate_results(results, f"multicity{airline_label}", "premium_economy") + results = _validate_results(results, f"multicity{airline_label}", kabine) print(f"[MC{airline_label}] Ergebnis: {[r['preis'] for r in results[:5]]}") + _dismiss_cookie_banner(sb) + sb.sleep(3) screenshot_b64 = _take_screenshot(sb) return results[:10], screenshot_b64 @@ -815,32 +798,38 @@ def scrape_momondo(von, nach, tage=30, aufenthalt_tage=60, if trip_type == "roundtrip" else "" kc = KABINE_KAYAK.get(kabine, "w") bags = 1 if "koffer" in gepaeck else 0 - scrape_url = _scrape_url_momondo(von, nach, abflug, rueck, kc, bags, - layover_min, layover_max, airline_filter, - max_flugzeit_h, max_stops) booking_url = _booking_url_momondo(von, nach, abflug, rueck, kc, bags, layover_min, layover_max, airline_filter, max_flugzeit_h, max_stops) airline_label = f" [{airline_filter}]" if airline_filter else "" - print(f"[MO{airline_label}] Scrape: {scrape_url[:80]}") + print(f"[MO{airline_label}] URL: {booking_url}") results = [] screenshot_b64 = "" with SB(uc=True, headless=True, chromium_arg="--no-sandbox --disable-dev-shm-usage") as sb: - sb.open(scrape_url) + sb.open(booking_url) sb.sleep(8) - _consent_kayak(sb) - # Nach Consent: Ergebnisse laden lassen + # Momondo Cookie-Consent wegklicken + for sel in ['button[class*="accept"]', '.RxNS-button-content', + '#onetrust-accept-btn-handler', 'button[title*="akzeptieren"]', + 'button[title*="Alle akzeptieren"]', '.evidon-banner-acceptbutton']: + try: + sb.find_element(sel, timeout=2).click() + print(f"[MO] Consent geklickt: {sel}") + sb.sleep(3) + break + except Exception: + pass + + # Nach Consent: Seite muss neu laden / Ergebnisse warten sb.sleep(12) title = sb.get_title() body = sb.get_text("body") print(f"[MO] Title: {title[:80]} | Body: {len(body)} chars") - for sel in ['div[class*="hYzH-price"]', 'div[class*="e2GB-price-text"]', - 'div[class*="ixMA-price"]', - '.price-text', '.f8F1-price-text', 'div[class*="price"] span', + for sel in ['.price-text', '.f8F1-price-text', 'div[class*="price"] span', 'span[class*="price"]', '.Iqt3', 'div.nrc6-price', '.price', '[class*="resultPrice"]', '.lowest-price']: try: @@ -870,8 +859,10 @@ def scrape_momondo(von, nach, tage=30, aufenthalt_tage=60, if not pe_confirmed: print(f"[MO{airline_label}] WARNUNG: Premium Economy nicht auf Seite bestätigt!") - results = _validate_results(results, f"momondo{airline_label}", "premium_economy") + results = _validate_results(results, f"momondo{airline_label}", kabine) print(f"[MO{airline_label}] Ergebnis: {[r['preis'] for r in results[:5]]}") + _dismiss_cookie_banner(sb) + sb.sleep(2) screenshot_b64 = _take_screenshot(sb) return results[:10], screenshot_b64