Scraper hat bisher Airline-Filter-Sidebar Preise (z.B. Air China 714EUR) als Flugergebnisse gespeichert. Fix: Header-Preis als Anker holen, Preise unter 80% des Ankerwerts als Sidebar-Artefakte verwerfen.
1146 lines
49 KiB
Python
1146 lines
49 KiB
Python
from seleniumbase import SB
|
||
from datetime import datetime, timedelta
|
||
import re
|
||
|
||
# ── Qualitätsschwellen ────────────────────────────────────────────────────────
|
||
# CX Economy Roundtrip FRA→KTI: 600–1400€ | PE: 700–12000€
|
||
MIN_PREIS_ECONOMY_ROUNDTRIP = 600
|
||
MAX_PREIS_ECONOMY_ROUNDTRIP = 1400
|
||
MIN_PREIS_PE_ROUNDTRIP = 700
|
||
MAX_PREIS_PE_ROUNDTRIP = 12000
|
||
|
||
|
||
def _scrape_disabled(*args, **kwargs):
|
||
"""Deaktivierter Scanner — gibt leere Ergebnisse zurück."""
|
||
print("[SKIP] Scanner deaktiviert")
|
||
return [], ""
|
||
|
||
|
||
def _validate_results(results, scanner_name, kabine="economy"):
|
||
"""Qualitätskontrolle: filtert unplausible Preise raus."""
|
||
if kabine == "economy":
|
||
before = len(results)
|
||
results = [r for r in results
|
||
if MIN_PREIS_ECONOMY_ROUNDTRIP <= r["preis"] <= MAX_PREIS_ECONOMY_ROUNDTRIP]
|
||
dropped = before - len(results)
|
||
if dropped:
|
||
print(f"[QC/{scanner_name}] {dropped} Preise außerhalb "
|
||
f"{MIN_PREIS_ECONOMY_ROUNDTRIP}-{MAX_PREIS_ECONOMY_ROUNDTRIP}€ entfernt")
|
||
elif kabine == "premium_economy":
|
||
before = len(results)
|
||
results = [r for r in results if MIN_PREIS_PE_ROUNDTRIP <= r["preis"] <= MAX_PREIS_PE_ROUNDTRIP]
|
||
dropped = before - len(results)
|
||
if dropped:
|
||
print(f"[QC/{scanner_name}] {dropped} Preise außerhalb "
|
||
f"{MIN_PREIS_PE_ROUNDTRIP}-{MAX_PREIS_PE_ROUNDTRIP}€ entfernt")
|
||
return results
|
||
|
||
|
||
def _check_cabin_on_page(body, title, kabine="premium_economy"):
|
||
"""Prüft ob die Seite die gewünschte Kabinenklasse bestätigt."""
|
||
text = (title + " " + body[:3000]).lower()
|
||
if kabine == "premium_economy":
|
||
pe_keywords = ["premium economy", "premium eco", "premiumeconomy",
|
||
"premium_economy", "kabine: premium", "cabin: premium",
|
||
"prem eco", "w class"]
|
||
eco_only = ["economy" in text and "premium" not in text]
|
||
if any(kw in text for kw in pe_keywords):
|
||
return True
|
||
if eco_only[0]:
|
||
print("[QC] WARNUNG: Seite zeigt 'Economy' OHNE 'Premium' — möglicherweise falsche Kabine!")
|
||
return False
|
||
return True
|
||
|
||
|
||
def _filter_roundtrip_only(results):
|
||
"""Entfernt One-Way/unpassende Daten: nur Roundtrip mit 50–95 Tagen Aufenthalt."""
|
||
# Aufenthalt 2–3 Monate: 50–95 Tage zwischen Hin- und Rückflug
|
||
MIN_AUFENTHALT = 50
|
||
MAX_AUFENTHALT = 95
|
||
filtered = []
|
||
for r in results:
|
||
ab, an = r.get("abflug", ""), r.get("ankunft", "")
|
||
if not ab or not an:
|
||
continue
|
||
if an <= ab:
|
||
continue
|
||
try:
|
||
d_ab = datetime.strptime(ab, "%Y-%m-%d")
|
||
d_an = datetime.strptime(an, "%Y-%m-%d")
|
||
tage = (d_an - d_ab).days
|
||
if MIN_AUFENTHALT <= tage <= MAX_AUFENTHALT:
|
||
filtered.append(r)
|
||
except (ValueError, TypeError):
|
||
pass
|
||
dropped = len(results) - len(filtered)
|
||
if dropped:
|
||
print(f"[QC] {dropped} Daten aussortiert (Aufenthalt außerhalb {MIN_AUFENTHALT}-{MAX_AUFENTHALT} Tage)")
|
||
return filtered
|
||
|
||
|
||
def scrape(scanner, von, nach, tage=30, aufenthalt_tage=60,
|
||
trip_type="roundtrip", kabine="premium_economy",
|
||
gepaeck="1koffer+handgepaeck", airline_filter="",
|
||
layover_min=120, layover_max=300,
|
||
max_flugzeit_h=22, max_stops=2,
|
||
via="", stopover_min_h=20, stopover_max_h=30):
|
||
"""
|
||
Gibt (results, screenshot_b64) zurück.
|
||
results = Liste von Preis-Dicts
|
||
screenshot_b64 = JPEG Full-Page Screenshot als base64-String (leer wenn Fehler)
|
||
"""
|
||
dispatcher = {
|
||
"google_flights": _scrape_disabled,
|
||
"kayak": scrape_kayak,
|
||
"kayak_multicity": scrape_kayak_multicity,
|
||
"momondo": scrape_momondo,
|
||
"wego": _scrape_disabled,
|
||
"traveloka": scrape_traveloka,
|
||
"skyscanner": _scrape_disabled,
|
||
"trip": scrape_trip,
|
||
}
|
||
fn = dispatcher.get(scanner)
|
||
if not fn:
|
||
raise ValueError(f"Unbekannter Scanner: {scanner}")
|
||
if scanner == "kayak_multicity":
|
||
results, screenshot_b64 = fn(von, nach, tage, aufenthalt_tage, kabine, gepaeck,
|
||
airline_filter, via, stopover_min_h, stopover_max_h)
|
||
else:
|
||
results, screenshot_b64 = fn(von, nach, tage, aufenthalt_tage, trip_type, kabine, gepaeck,
|
||
airline_filter, layover_min, layover_max, max_flugzeit_h, max_stops)
|
||
results = _filter_roundtrip_only(results)
|
||
return results, screenshot_b64
|
||
|
||
|
||
def _dismiss_cookie_banner(sb):
|
||
"""Cookie-/Consent-Banner wegklicken — für saubere Screenshots."""
|
||
# Kayak/Momondo: "Alle akzeptieren" Button (häufigstes Format)
|
||
for sel in [
|
||
'//button[contains(., "Alle akzeptieren")]',
|
||
'//button[contains(., "Accept all")]',
|
||
'.kayak-consent-button', '#cookie-accept', '[data-testid="cookie-banner"]',
|
||
'#onetrust-accept-btn-handler', 'button[class*="accept"]',
|
||
'button[title*="akzeptieren"]', '.evidon-banner-acceptbutton',
|
||
'.RxNS-button-content', 'button[id*="accept"]',
|
||
'button[aria-label*="Accept"]', '[aria-label*="Akzeptieren"]',
|
||
]:
|
||
try:
|
||
sb.click(sel, timeout=2)
|
||
print(f"[Cookie] Geklickt: {sel[:50]}")
|
||
sb.sleep(3)
|
||
return True
|
||
except Exception:
|
||
pass
|
||
return False
|
||
|
||
|
||
|
||
def _dismiss_comparison_popup(sb):
|
||
"""Vergleichs-Popups (Opodo, Skyscanner etc.) wegklicken bevor Screenshot gemacht wird."""
|
||
# Erst Escape versuchen (funktioniert bei den meisten Modals)
|
||
try:
|
||
sb.driver.execute_script("document.dispatchEvent(new KeyboardEvent('keydown', {key: 'Escape', keyCode: 27, bubbles: true}));")
|
||
sb.sleep(0.5)
|
||
except Exception:
|
||
pass
|
||
|
||
# Dann gezielt Close-Buttons suchen
|
||
for sel in [
|
||
'button[aria-label*="lose"]',
|
||
'button[aria-label*="chließen"]',
|
||
'button[aria-label*="Schließen"]',
|
||
'[class*="modal"] button[class*="close"]',
|
||
'[class*="dialog"] button[class*="close"]',
|
||
'[class*="overlay"] button[class*="close"]',
|
||
'[class*="popup"] button[class*="close"]',
|
||
'button[class*="dismiss"]',
|
||
'[data-testid*="close"]',
|
||
'//button[contains(@aria-label, "lose")]',
|
||
'//button[contains(., "Schließen")]',
|
||
'//button[contains(., "Nein")]',
|
||
'//button[contains(., "Nicht jetzt")]',
|
||
'//button[contains(., "Vielleicht später")]',
|
||
]:
|
||
try:
|
||
sb.click(sel, timeout=1)
|
||
print(f"[Popup] Geschlossen: {sel[:60]}")
|
||
sb.sleep(0.8)
|
||
return True
|
||
except Exception:
|
||
pass
|
||
|
||
# JavaScript-Fallback: alle sichtbaren Modals/Overlays entfernen
|
||
try:
|
||
removed = sb.driver.execute_script("""
|
||
var removed = 0;
|
||
var selectors = ['[class*="modal"]', '[class*="overlay"]', '[class*="dialog"]',
|
||
'[class*="popup"]', '[role="dialog"]'];
|
||
selectors.forEach(function(sel) {
|
||
document.querySelectorAll(sel).forEach(function(el) {
|
||
var style = window.getComputedStyle(el);
|
||
if (style.display !== 'none' && style.visibility !== 'hidden'
|
||
&& el.offsetHeight > 100) {
|
||
el.remove();
|
||
removed++;
|
||
}
|
||
});
|
||
});
|
||
return removed;
|
||
""")
|
||
if removed:
|
||
print(f"[Popup] JS: {removed} Elemente entfernt")
|
||
sb.sleep(0.5)
|
||
except Exception:
|
||
pass
|
||
|
||
return False
|
||
|
||
|
||
def _take_screenshot(sb):
|
||
"""Full-Page Screenshot via CDP (JPEG 55%, max 3000px). Gibt base64-String zurück."""
|
||
try:
|
||
result = sb.driver.execute_cdp_cmd("Page.captureScreenshot", {
|
||
"format": "jpeg",
|
||
"quality": 55,
|
||
"captureBeyondViewport": True,
|
||
"clip": {"x": 0, "y": 0, "width": 1280, "height": 3000, "scale": 0.75},
|
||
})
|
||
data = result.get("data", "")
|
||
if data:
|
||
print(f"[Screenshot] OK — {len(data)//1024} KB base64")
|
||
return data
|
||
except Exception as e:
|
||
print(f"[Screenshot] CDP-Fehler: {e}")
|
||
try:
|
||
return sb.driver.get_screenshot_as_base64()
|
||
except Exception:
|
||
return ""
|
||
|
||
|
||
def _booking_url_google(von, nach, abflug, rueck, kc):
|
||
# Hash-Fragment wird von headless Chrome ignoriert → tfs-Parameter nutzen
|
||
if rueck:
|
||
return (f"https://www.google.com/travel/flights?hl=de&curr=EUR"
|
||
f"#flt={von}.{nach}.{abflug}*{nach}.{von}.{rueck};c:EUR;e:1;sd:1;t:r;sc:{kc}")
|
||
return (f"https://www.google.com/travel/flights?hl=de&curr=EUR"
|
||
f"#flt={von}.{nach}.{abflug};c:EUR;e:1;sd:1;t:f;sc:{kc}")
|
||
|
||
|
||
def _booking_url_kayak(von, nach, abflug, rueck, kc, bags=1,
|
||
layover_min=120, layover_max=300, airline="",
|
||
max_flugzeit_h=22, max_stops=2):
|
||
"""
|
||
Kayak fs-Filter:
|
||
bfc=1 → min. 1 Freigepäck inklusive
|
||
ctr=120,300 → Umstiegszeit 2–5 Stunden (Minuten)
|
||
duration=-1320 → Max. Gesamtflugzeit (Minuten, hier 22h)
|
||
s=2 → Max. 2 Stopps
|
||
airlines=XX → Airline-Code (CZ, CX, SQ, TG …)
|
||
"""
|
||
filters = []
|
||
if bags:
|
||
filters.append(f"bfc%3D{bags}")
|
||
if layover_min and layover_max:
|
||
filters.append(f"ctr%3D{layover_min}%2C{layover_max}")
|
||
if max_flugzeit_h:
|
||
filters.append(f"duration%3D-{max_flugzeit_h * 60}")
|
||
if max_stops is not None and max_stops < 10:
|
||
filters.append(f"s%3D{max_stops}")
|
||
if airline:
|
||
filters.append(f"airlines%3D{airline}")
|
||
fs = ("&fs=" + "%3B".join(filters)) if filters else ""
|
||
base = f"https://www.kayak.de/flights/{von}-{nach}/{abflug}"
|
||
if rueck:
|
||
return f"{base}/{rueck}?sort=price_a&cabin={kc}¤cy=EUR{fs}"
|
||
return f"{base}?sort=price_a&cabin={kc}¤cy=EUR{fs}"
|
||
|
||
|
||
def _booking_url_momondo(von, nach, abflug, rueck, kc, bags=1,
|
||
layover_min=120, layover_max=300, airline="",
|
||
max_flugzeit_h=22, max_stops=2):
|
||
"""Momondo URL — gleiche Struktur wie Kayak (Booking Holdings), andere Domain."""
|
||
filters = []
|
||
if bags:
|
||
filters.append(f"bfc%3D{bags}")
|
||
if layover_min and layover_max:
|
||
filters.append(f"ctr%3D{layover_min}%2C{layover_max}")
|
||
if max_flugzeit_h:
|
||
filters.append(f"duration%3D-{max_flugzeit_h * 60}")
|
||
if max_stops is not None and max_stops < 10:
|
||
filters.append(f"s%3D{max_stops}")
|
||
if airline:
|
||
filters.append(f"airlines%3D{airline}")
|
||
fs = ("&fs=" + "%3B".join(filters)) if filters else ""
|
||
base = f"https://www.momondo.de/flight-search/{von}-{nach}/{abflug}"
|
||
if rueck:
|
||
return f"{base}/{rueck}?sort=price_a&cabin={kc}¤cy=EUR{fs}"
|
||
return f"{base}?sort=price_a&cabin={kc}¤cy=EUR{fs}"
|
||
|
||
|
||
def _booking_url_trip(von, nach, abflug_fmt, rueck_fmt, kc, von_name, nach_name, airline=""):
|
||
params = f"DDate1={abflug_fmt}&class={kc}&curr=EUR"
|
||
if rueck_fmt:
|
||
params += f"&DDate2={rueck_fmt}"
|
||
if airline:
|
||
params += f"&airline={airline}"
|
||
return (f"https://www.trip.com/flights/{von_name}-to-{nach_name}/"
|
||
f"tickets-{von.lower()}-{nach.lower()}/?{params}")
|
||
|
||
|
||
# ── Kabinen-Codes ──────────────────────────────────────────────────────────────
|
||
KABINE_GOOGLE = {"economy": "e", "premium_economy": "w", "business": "b", "first": "f"}
|
||
KABINE_KAYAK = {"economy": "e", "premium_economy": "w", "business": "b", "first": "f"}
|
||
KABINE_TRIP = {"economy": "Y", "premium_economy": "W", "business": "C", "first": "F"}
|
||
|
||
|
||
def _parse_preis(text):
|
||
if not text:
|
||
return None
|
||
text = text.replace('\xa0', ' ').replace('\u202f', ' ')
|
||
for p in [r'(\d{1,2}[.,]\d{3})\s?€', r'(\d{3,5})\s?€',
|
||
r'€\s?(\d{3,5})', r'EUR\s?(\d{3,5})', r'(\d{3,5})\s?EUR']:
|
||
m = re.search(p, text)
|
||
if m:
|
||
try:
|
||
v = float(m.group(1).replace('.', '').replace(',', ''))
|
||
if 200 < v < 15000:
|
||
return round(v, 2)
|
||
except ValueError:
|
||
pass
|
||
return None
|
||
|
||
|
||
def _preise_aus_body(body, scanner, abflug):
|
||
results = []
|
||
seen = set()
|
||
for m in re.finditer(r'(\d[\d\s\.]{1,5})\s?€|€\s?(\d[\d\s\.]{1,5})', body):
|
||
raw = (m.group(1) or m.group(2)).replace(' ', '').replace('.', '')
|
||
try:
|
||
v = float(raw)
|
||
if 300 < v < 12000 and v not in seen:
|
||
seen.add(v)
|
||
results.append({
|
||
"scanner": scanner, "preis": v, "waehrung": "EUR",
|
||
"airline": "", "abflug": abflug, "ankunft": ""
|
||
})
|
||
except ValueError:
|
||
pass
|
||
results.sort(key=lambda x: x["preis"])
|
||
return results[:10]
|
||
|
||
|
||
|
||
def _kayak_header_preis(sb) -> float | None:
|
||
"""Liest den 'Günstigste Option' Preis aus dem KAYAK-Summary-Header.
|
||
Dieser Wert ist der zuverlässigste Anker — kommt direkt aus den Suchergebnissen."""
|
||
try:
|
||
# JavaScript: suche die summary-bar Elemente
|
||
price = sb.driver.execute_script("""
|
||
// KAYAK zeigt "Günstigste Option" + Preis in einem summary-container
|
||
var containers = document.querySelectorAll('[class*="rec-col"], [class*="recommended"], [class*="summary"], [class*="option-header"]');
|
||
for (var c of containers) {
|
||
var txt = c.innerText || '';
|
||
var m = txt.match(/(\d[\d.]{1,6})\s?€|€\s?(\d[\d.]{1,6})/);
|
||
if (m) {
|
||
var raw = (m[1] || m[2]).replace('.','').replace(',','.');
|
||
var v = parseFloat(raw);
|
||
if (v > 300 && v < 5000) return v;
|
||
}
|
||
}
|
||
// Fallback: suche im Seitentitel / h1
|
||
var h = document.querySelector('h1, [class*="title"]');
|
||
if (h) {
|
||
var m2 = (h.innerText||'').match(/(\d[\d.]{2,6})\s?€/);
|
||
if (m2) return parseFloat(m2[1].replace('.',''));
|
||
}
|
||
return null;
|
||
""")
|
||
if price:
|
||
print(f"[KY] Header-Preis: {price} EUR")
|
||
return float(price)
|
||
except Exception as e:
|
||
print(f"[KY] Header-Preis Fehler: {e}")
|
||
return None
|
||
|
||
|
||
def _filter_sidebar_preise(results: list, anker: float | None, scanner: str) -> list:
|
||
"""Filtert Sidebar-Preise (Airline-Filter, Preisslider) heraus.
|
||
Behalte nur Preise die >= 80% des Anker-Preises sind (Sidebar-Preise sind viel günstiger)."""
|
||
if not anker or not results:
|
||
return results
|
||
min_valid = anker * 0.80
|
||
filtered = [r for r in results if r["preis"] >= min_valid]
|
||
removed = len(results) - len(filtered)
|
||
if removed:
|
||
print(f"[{scanner}] {removed} Sidebar-Preise entfernt (unter {min_valid:.0f} EUR)")
|
||
return filtered if filtered else results # Fallback: alle behalten wenn alle rausgefiltert
|
||
|
||
|
||
|
||
def _consent_google(sb):
|
||
"""Google Consent-Seite (DSGVO) behandeln."""
|
||
if "consent" in sb.get_current_url() or "Bevor Sie" in sb.get_title():
|
||
print("[CONSENT] Google Consent erkannt")
|
||
for sel in ['form[action*="save"] button', 'button[jsname="tHlp8d"]',
|
||
'.lssxud button', 'button[aria-label*="kzeptieren"]']:
|
||
try:
|
||
sb.click(sel, timeout=3)
|
||
sb.sleep(4)
|
||
print(f"[CONSENT] Geklickt: {sel}")
|
||
return True
|
||
except Exception:
|
||
pass
|
||
return False
|
||
|
||
|
||
def _gf_fill_field(sb, selectors, text, field_name):
|
||
"""
|
||
Textfeld in Google Flights füllen.
|
||
Nutzt Keyboard-Navigation (ArrowDown + Return) statt DOM-Klick,
|
||
weil Google-Autocomplete-Dropdowns sonst offen bleiben.
|
||
"""
|
||
from selenium.webdriver.common.keys import Keys
|
||
for sel in selectors:
|
||
try:
|
||
field = sb.find_element(sel, timeout=3)
|
||
# Feld leeren via JS (robuster als .clear() bei React-Inputs)
|
||
sb.execute_script("arguments[0].value = '';", field)
|
||
field.click()
|
||
sb.sleep(0.3)
|
||
field.send_keys(text)
|
||
sb.sleep(2)
|
||
# Ersten Vorschlag per Tastatur auswählen (zuverlässiger als Klick)
|
||
field.send_keys(Keys.ARROW_DOWN)
|
||
sb.sleep(0.5)
|
||
field.send_keys(Keys.RETURN)
|
||
sb.sleep(1)
|
||
# Escape falls Dropdown noch offen
|
||
try:
|
||
field.send_keys(Keys.ESCAPE)
|
||
except Exception:
|
||
pass
|
||
print(f"[GF] {field_name} gesetzt: {text}")
|
||
return True
|
||
except Exception:
|
||
continue
|
||
print(f"[GF] {field_name} fehlgeschlagen — kein Feld gefunden")
|
||
return False
|
||
|
||
|
||
def scrape_google_flights(von, nach, tage=30, aufenthalt_tage=60,
|
||
trip_type="roundtrip", kabine="premium_economy",
|
||
gepaeck="1koffer+handgepaeck", airline_filter="",
|
||
layover_min=120, layover_max=300,
|
||
max_flugzeit_h=22, max_stops=2):
|
||
abflug = (datetime.now() + timedelta(days=tage)).strftime("%Y-%m-%d")
|
||
abflug_de = (datetime.now() + timedelta(days=tage)).strftime("%d.%m.%Y")
|
||
rueck = (datetime.now() + timedelta(days=tage + aufenthalt_tage)).strftime("%Y-%m-%d") \
|
||
if trip_type == "roundtrip" else ""
|
||
kc = KABINE_GOOGLE.get(kabine, "w")
|
||
booking_url = _booking_url_google(von, nach, abflug, rueck, kc)
|
||
|
||
stadtname = {"FRA": "Frankfurt", "HAN": "Hanoi", "KTI": "Phnom Penh",
|
||
"PNH": "Phnom Penh", "BKK": "Bangkok", "SGN": "Ho Chi Minh City"}
|
||
von_name = stadtname.get(von, von)
|
||
nach_name = stadtname.get(nach, nach)
|
||
results = []
|
||
screenshot_b64 = ""
|
||
|
||
print(f"[GF] Suche: {von_name}→{nach_name} {abflug_de}")
|
||
|
||
with SB(uc=True, headless=True, chromium_arg="--no-sandbox --disable-dev-shm-usage") as sb:
|
||
# ── Strategie 1: Direkte URL mit Datums-Parametern ─────────────────
|
||
# Google Flights verarbeitet den Hash-Fragment erst nach JS-Ausführung
|
||
direct_url = (
|
||
f"https://www.google.com/travel/flights?hl=de&curr=EUR"
|
||
f"#flt={von}.{nach}.{abflug}*{nach}.{von}.{rueck}"
|
||
f";c:EUR;e:1;sd:1;t:r;sc:w"
|
||
) if rueck else (
|
||
f"https://www.google.com/travel/flights?hl=de&curr=EUR"
|
||
f"#flt={von}.{nach}.{abflug};c:EUR;e:1;sd:1;t:f;sc:w"
|
||
)
|
||
sb.open(direct_url)
|
||
sb.sleep(8)
|
||
_consent_google(sb)
|
||
sb.sleep(3)
|
||
title_direct = sb.get_title()
|
||
print(f"[GF] URL-Ansatz: {title_direct[:60]}")
|
||
|
||
# Wenn direkte URL Ergebnisse liefert (Titel enthält Städtenamen)
|
||
url_erfolgreich = any(kw in title_direct for kw in
|
||
[von, nach, "FRA", "KTI", "Frankfurt", "Phnom", "Flüge"])
|
||
if not url_erfolgreich:
|
||
# ── Strategie 2: Startseite + Formular befüllen ─────────────────
|
||
print("[GF] Direktlink kein Ergebnis — wechsle zu Formular-Ansatz")
|
||
sb.open("https://www.google.com/travel/flights?hl=de&curr=EUR")
|
||
sb.sleep(5)
|
||
_consent_google(sb)
|
||
sb.sleep(2)
|
||
|
||
# ── 1. Kabine auf "Premium Economy" setzen ──────────────────────────
|
||
try:
|
||
# VfPpkd-Buttons: [0]=Hin+Rück [1]=Economy(Klasse)
|
||
btns = sb.find_elements('button[class*="VfPpkd"]')
|
||
if len(btns) >= 2:
|
||
btns[1].click()
|
||
sb.sleep(1)
|
||
# Option "Premium Economy" im Dropdown auswählen
|
||
for opt_sel in ['[data-value="2"]',
|
||
'li[class*="premium"]',
|
||
'[role="option"]:nth-child(3)']:
|
||
try:
|
||
sb.find_element(opt_sel, timeout=2).click()
|
||
sb.sleep(0.5)
|
||
print(f"[GF] Kabine gesetzt via {opt_sel}")
|
||
break
|
||
except Exception:
|
||
pass
|
||
except Exception as e:
|
||
print(f"[GF] Kabine: {e}")
|
||
|
||
# ── 2. Von-Feld befüllen ────────────────────────────────────────────
|
||
_gf_fill_field(sb, [
|
||
'input[aria-label*="Von"]',
|
||
'input[aria-label*="Abflugort"]',
|
||
'input[placeholder*="Von"]',
|
||
'input[aria-label*="Where from"]',
|
||
], von_name, "Von")
|
||
sb.sleep(1.5) # Warten bis Von-Auswahl abgeschlossen
|
||
|
||
# ── 3. Nach-Feld befüllen ───────────────────────────────────────────
|
||
from selenium.webdriver.common.keys import Keys as _Keys
|
||
nach_gesetzt = False
|
||
|
||
# Versuch 1: Explizite aria-label / role Selektoren
|
||
for nach_sel in [
|
||
'input[role="combobox"]', # Google nutzt combobox für Autocomplete
|
||
'input[aria-label*="Wohin"]',
|
||
'input[aria-label*="Zielort"]',
|
||
'input[aria-label*="Ziel"]',
|
||
'input[placeholder*="Wohin"]',
|
||
'input[aria-label*="Where to"]',
|
||
'input[aria-label*="Destination"]',
|
||
]:
|
||
try:
|
||
# Wenn mehrere combobox-Inputs: zweiten nehmen (1. = Von, 2. = Nach)
|
||
elems = sb.find_elements(nach_sel)
|
||
field = elems[1] if len(elems) >= 2 else (elems[0] if elems else None)
|
||
if field and field != sb.driver.switch_to.active_element:
|
||
sb.execute_script("arguments[0].value = '';", field)
|
||
field.click()
|
||
sb.sleep(0.3)
|
||
field.send_keys(nach_name)
|
||
sb.sleep(2)
|
||
field.send_keys(_Keys.ARROW_DOWN)
|
||
sb.sleep(0.5)
|
||
field.send_keys(_Keys.RETURN)
|
||
sb.sleep(1)
|
||
print(f"[GF] Nach via {nach_sel}: {nach_name}")
|
||
nach_gesetzt = True
|
||
break
|
||
except Exception:
|
||
continue
|
||
|
||
# Versuch 2: JS — zweites Input-Element finden und befüllen
|
||
if not nach_gesetzt:
|
||
try:
|
||
nach_field = sb.execute_script("""
|
||
var inputs = document.querySelectorAll('input[role="combobox"], input[aria-label]');
|
||
for (var i = 0; i < inputs.length; i++) {
|
||
var lbl = inputs[i].getAttribute('aria-label') || '';
|
||
if (lbl.match(/Wohin|Ziel|Destination|Where to/i)) return inputs[i];
|
||
}
|
||
// Fallback: zweites sichtbares Input
|
||
var all = Array.from(document.querySelectorAll('input')).filter(
|
||
e => e.offsetWidth > 0 && e.offsetHeight > 0);
|
||
return all[1] || null;
|
||
""")
|
||
if nach_field:
|
||
sb.execute_script("arguments[0].value = '';", nach_field)
|
||
nach_field.click()
|
||
sb.sleep(0.3)
|
||
nach_field.send_keys(nach_name)
|
||
sb.sleep(2)
|
||
nach_field.send_keys(_Keys.ARROW_DOWN)
|
||
sb.sleep(0.5)
|
||
nach_field.send_keys(_Keys.RETURN)
|
||
sb.sleep(1)
|
||
print(f"[GF] Nach via JS-Input: {nach_name}")
|
||
nach_gesetzt = True
|
||
except Exception as e:
|
||
print(f"[GF] Nach JS-Fehler: {e}")
|
||
|
||
# ── 4. Suchen-Button klicken ────────────────────────────────────────
|
||
from selenium.webdriver.common.keys import Keys
|
||
gesucht = False
|
||
# Variante A: bekannte Selektoren
|
||
for sel in ['button[aria-label*="Suchen"]', 'button[aria-label*="Search"]',
|
||
'button[jsname="vLv7Lb"]', 'button[type="submit"]',
|
||
'button[class*="search"]']:
|
||
try:
|
||
sb.find_element(sel, timeout=2).click()
|
||
print(f"[GF] Suche via Selector: {sel}")
|
||
gesucht = True
|
||
break
|
||
except Exception:
|
||
continue
|
||
|
||
# Variante B: JS — Button mit Text "Suchen" / "Search" finden
|
||
if not gesucht:
|
||
try:
|
||
clicked = sb.execute_script("""
|
||
var btns = document.querySelectorAll('button');
|
||
for (var b of btns) {
|
||
var t = (b.textContent || b.innerText || '').trim();
|
||
if (t === 'Suchen' || t === 'Search') { b.click(); return true; }
|
||
}
|
||
return false;
|
||
""")
|
||
if clicked:
|
||
print("[GF] Suche via JS-Text-Klick")
|
||
gesucht = True
|
||
except Exception:
|
||
pass
|
||
|
||
# Variante C: Enter-Taste auf body (löst Formular-Submit aus)
|
||
if not gesucht:
|
||
try:
|
||
sb.driver.find_element("css selector", "body").send_keys(Keys.RETURN)
|
||
print("[GF] Suche via Enter-Taste")
|
||
gesucht = True
|
||
except Exception:
|
||
pass
|
||
|
||
sb.sleep(14)
|
||
title = sb.get_title()
|
||
body = sb.get_text("body")
|
||
print(f"[GF] Title: {title[:80]} | Body: {len(body)} chars | Suche-OK: {gesucht}")
|
||
|
||
# ── 5. Preise extrahieren ───────────────────────────────────────────
|
||
# a) aria-label Elemente
|
||
try:
|
||
for elem in sb.find_elements('[aria-label*="€"], [aria-label*="EUR"]')[:30]:
|
||
lbl = elem.get_attribute("aria-label") or elem.text
|
||
p = _parse_preis(lbl)
|
||
if p and p > 400:
|
||
results.append({"scanner": "google_flights", "preis": p,
|
||
"waehrung": "EUR", "airline": "",
|
||
"abflug": abflug, "ankunft": rueck,
|
||
"booking_url": booking_url})
|
||
except Exception:
|
||
pass
|
||
|
||
# b) sichtbare Preistexte in Ergebnisliste
|
||
if not results:
|
||
for sel in ['.YMlIz', '.FpEdX', '[class*="price"]', 'span[class*="preis"]']:
|
||
try:
|
||
for elem in sb.find_elements(sel)[:20]:
|
||
p = _parse_preis(elem.text)
|
||
if p and p > 400:
|
||
results.append({"scanner": "google_flights", "preis": p,
|
||
"waehrung": "EUR", "airline": "",
|
||
"abflug": abflug, "ankunft": rueck,
|
||
"booking_url": booking_url})
|
||
if results:
|
||
break
|
||
except Exception:
|
||
pass
|
||
|
||
# c) Body-Regex Fallback
|
||
if not results:
|
||
for r in _preise_aus_body(body, "google_flights", abflug):
|
||
if r["preis"] > 400:
|
||
r["ankunft"] = rueck
|
||
r["booking_url"] = booking_url
|
||
results.append(r)
|
||
|
||
results = [r for r in results if r["preis"] > 400]
|
||
seen = set()
|
||
dedup = []
|
||
for r in results:
|
||
if r["preis"] not in seen:
|
||
seen.add(r["preis"])
|
||
dedup.append(r)
|
||
results = dedup
|
||
|
||
print(f"[GF] Ergebnis: {[r['preis'] for r in results[:5]]}")
|
||
_dismiss_comparison_popup(sb)
|
||
screenshot_b64 = _take_screenshot(sb)
|
||
return results[:10], screenshot_b64
|
||
|
||
|
||
def scrape_kayak(von, nach, tage=30, aufenthalt_tage=60,
|
||
trip_type="roundtrip", kabine="premium_economy",
|
||
gepaeck="1koffer+handgepaeck", airline_filter="",
|
||
layover_min=120, layover_max=300,
|
||
max_flugzeit_h=22, max_stops=2):
|
||
abflug = (datetime.now() + timedelta(days=tage)).strftime("%Y-%m-%d")
|
||
rueck = (datetime.now() + timedelta(days=tage + aufenthalt_tage)).strftime("%Y-%m-%d") if trip_type == "roundtrip" else ""
|
||
kc = KABINE_KAYAK.get(kabine, "w")
|
||
bags = 1 if "koffer" in gepaeck else 0
|
||
booking_url = _booking_url_kayak(von, nach, abflug, rueck, kc, bags,
|
||
layover_min, layover_max, airline_filter,
|
||
max_flugzeit_h, max_stops)
|
||
airline_label = f" [{airline_filter}]" if airline_filter else ""
|
||
print(f"[KY{airline_label}] URL: {booking_url}")
|
||
|
||
results = []
|
||
|
||
with SB(uc=True, headless=True, chromium_arg="--no-sandbox --disable-dev-shm-usage") as sb:
|
||
sb.open(booking_url)
|
||
sb.sleep(15)
|
||
_dismiss_cookie_banner(sb)
|
||
sb.sleep(4)
|
||
|
||
title = sb.get_title()
|
||
body = sb.get_text("body")
|
||
print(f"[KY] Title: {title[:80]}")
|
||
|
||
for sel in ['.price-text', '.f8F1-price-text', 'div[class*="price"] span',
|
||
'span[class*="price"]', '.Iqt3', 'div.nrc6-price', '.price']:
|
||
try:
|
||
elems = sb.find_elements(sel, timeout=2)
|
||
if elems:
|
||
for e in elems[:15]:
|
||
p = _parse_preis(e.text)
|
||
if p:
|
||
results.append({"scanner": "kayak", "preis": p,
|
||
"waehrung": "EUR",
|
||
"airline": airline_filter or "",
|
||
"abflug": abflug, "ankunft": rueck,
|
||
"booking_url": booking_url})
|
||
if results:
|
||
break
|
||
except Exception:
|
||
pass
|
||
|
||
if not results:
|
||
for r in _preise_aus_body(body, "kayak", abflug):
|
||
r["ankunft"] = rueck
|
||
r["booking_url"] = booking_url
|
||
r["airline"] = airline_filter or ""
|
||
results.append(r)
|
||
|
||
# Kabinen-Verifikation: prüfe ob "Premium Economy" in der Seite steht
|
||
pe_confirmed = _check_cabin_on_page(body, title, "premium_economy")
|
||
if not pe_confirmed:
|
||
print(f"[KY{airline_label}] WARNUNG: Premium Economy nicht auf Seite bestätigt!")
|
||
|
||
# Sidebar-Preise herausfiltern: Header-Preis als Ankerwert holen
|
||
anker = _kayak_header_preis(sb)
|
||
results = _filter_sidebar_preise(results, anker, f"kayak{airline_label}")
|
||
results = _validate_results(results, f"kayak{airline_label}", kabine)
|
||
print(f"[KY{airline_label}] Ergebnis: {[r['preis'] for r in results[:5]]}")
|
||
_dismiss_cookie_banner(sb)
|
||
sb.sleep(3)
|
||
_dismiss_comparison_popup(sb)
|
||
screenshot_b64 = _take_screenshot(sb)
|
||
return results[:10], screenshot_b64
|
||
|
||
|
||
def scrape_trip(von, nach, tage=30, aufenthalt_tage=60,
|
||
trip_type="roundtrip", kabine="premium_economy",
|
||
gepaeck="1koffer+handgepaeck", airline_filter="",
|
||
layover_min=120, layover_max=300,
|
||
max_flugzeit_h=22, max_stops=2):
|
||
abflug_fmt = (datetime.now() + timedelta(days=tage)).strftime("%Y%m%d")
|
||
rueck_fmt = (datetime.now() + timedelta(days=tage + aufenthalt_tage)).strftime("%Y%m%d") if trip_type == "roundtrip" else ""
|
||
abflug_iso = (datetime.now() + timedelta(days=tage)).strftime("%Y-%m-%d")
|
||
rueck_iso = (datetime.now() + timedelta(days=tage + aufenthalt_tage)).strftime("%Y-%m-%d") if trip_type == "roundtrip" else ""
|
||
kc = KABINE_TRIP.get(kabine, "W")
|
||
|
||
stadtname = {"FRA": "frankfurt", "HAN": "hanoi", "KTI": "phnom-penh",
|
||
"PNH": "phnom-penh", "BKK": "bangkok", "SGN": "ho-chi-minh-city"}
|
||
von_name = stadtname.get(von, von.lower())
|
||
nach_name = stadtname.get(nach, nach.lower())
|
||
|
||
booking_url = _booking_url_trip(von, nach, abflug_fmt, rueck_fmt, kc, von_name, nach_name,
|
||
airline_filter)
|
||
print(f"[TR] URL: {booking_url}")
|
||
results = []
|
||
|
||
with SB(uc=True, headless=True, chromium_arg="--no-sandbox --disable-dev-shm-usage") as sb:
|
||
sb.open(booking_url)
|
||
sb.sleep(12)
|
||
|
||
title = sb.get_title()
|
||
body = sb.get_text("body")
|
||
print(f"[TR] Title: {title[:80]}")
|
||
|
||
for sel in ['button[id*="accept"]', 'button[class*="accept"]',
|
||
'button[aria-label*="Accept"]', '#onetrust-accept-btn-handler']:
|
||
try:
|
||
sb.click(sel, timeout=2)
|
||
sb.sleep(2)
|
||
break
|
||
except Exception:
|
||
pass
|
||
|
||
for sel in ['.price-box .price', '.flight-price', 'span[class*="price"]',
|
||
'div[class*="price-num"]', 'em[class*="price"]', '.c-price']:
|
||
try:
|
||
elems = sb.find_elements(sel, timeout=2)
|
||
if elems:
|
||
for e in elems[:10]:
|
||
p = _parse_preis(e.text)
|
||
if p:
|
||
results.append({"scanner": "trip", "preis": p,
|
||
"waehrung": "EUR", "airline": "",
|
||
"abflug": abflug_iso, "ankunft": rueck_iso,
|
||
"booking_url": booking_url})
|
||
if results:
|
||
break
|
||
except Exception:
|
||
pass
|
||
|
||
if not results:
|
||
for r in _preise_aus_body(body, "trip", abflug_iso):
|
||
r["ankunft"] = rueck_iso
|
||
r["booking_url"] = booking_url
|
||
results.append(r)
|
||
|
||
pe_confirmed = _check_cabin_on_page(body, title, "premium_economy")
|
||
if not pe_confirmed:
|
||
print("[TR] WARNUNG: Premium Economy nicht auf Seite bestätigt!")
|
||
|
||
results = _validate_results(results, "trip", kabine)
|
||
print(f"[TR] Ergebnis: {[r['preis'] for r in results[:5]]}")
|
||
_dismiss_cookie_banner(sb)
|
||
sb.sleep(2)
|
||
_dismiss_comparison_popup(sb)
|
||
screenshot_b64 = _take_screenshot(sb)
|
||
return results[:10], screenshot_b64
|
||
|
||
|
||
def _booking_url_kayak_multicity(von, nach, via, abflug, via_datum, rueck, kc, bags=1, airline=""):
|
||
"""
|
||
Kayak Multi-City URL: FRA→HKG/DATE1 → HKG→KTI/DATE2 → KTI→FRA/DATE3
|
||
Kabinen-Code: w=Premium Economy
|
||
"""
|
||
filters = []
|
||
if bags:
|
||
filters.append(f"bfc%3D{bags}")
|
||
if airline:
|
||
filters.append(f"airlines%3D{airline}")
|
||
fs = ("&fs=" + "%3B".join(filters)) if filters else ""
|
||
# Kayak Multi-City Format: /flights/FRA-HKG/DATE/HKG-KTI/DATE/KTI-FRA/DATE
|
||
return (f"https://www.kayak.de/flights"
|
||
f"/{von}-{via}/{abflug}"
|
||
f"/{via}-{nach}/{via_datum}"
|
||
f"/{nach}-{von}/{rueck}"
|
||
f"?sort=price_a&cabin={kc}¤cy=EUR{fs}")
|
||
|
||
|
||
def scrape_kayak_multicity(von, nach, tage=30, aufenthalt_tage=60,
|
||
kabine="premium_economy",
|
||
gepaeck="1koffer+handgepaeck",
|
||
airline_filter="",
|
||
via="HKG", stopover_min_h=20, stopover_max_h=30):
|
||
"""
|
||
Multi-City Suche: FRA → HKG (1 Tag Aufenthalt) → KTI → FRA
|
||
Nutzt Cathay Pacific (CX) oder alle Airlines wenn airline_filter leer.
|
||
"""
|
||
abflug = (datetime.now() + timedelta(days=tage)).strftime("%Y-%m-%d")
|
||
via_datum = (datetime.now() + timedelta(days=tage + 1)).strftime("%Y-%m-%d")
|
||
rueck = (datetime.now() + timedelta(days=tage + 1 + aufenthalt_tage)).strftime("%Y-%m-%d")
|
||
kc = KABINE_KAYAK.get(kabine, "w")
|
||
bags = 1 if "koffer" in gepaeck else 0
|
||
airline_label = f" [{airline_filter}]" if airline_filter else ""
|
||
|
||
booking_url = _booking_url_kayak_multicity(von, nach, via, abflug, via_datum, rueck,
|
||
kc, bags, airline_filter)
|
||
|
||
print(f"[MC{airline_label}] Multi-City via {via}: {abflug} → +1T → {rueck}")
|
||
print(f"[MC{airline_label}] URL: {booking_url}")
|
||
|
||
results = []
|
||
|
||
with SB(uc=True, headless=True, chromium_arg="--no-sandbox --disable-dev-shm-usage") as sb:
|
||
sb.open(booking_url)
|
||
sb.sleep(15)
|
||
_dismiss_cookie_banner(sb)
|
||
sb.sleep(4)
|
||
|
||
title = sb.get_title()
|
||
body = sb.get_text("body")
|
||
print(f"[MC] Title: {title[:80]}")
|
||
|
||
for sel in ['.price-text', '.f8F1-price-text', 'div[class*="price"] span',
|
||
'span[class*="price"]', '.Iqt3', 'div.nrc6-price', '.price']:
|
||
try:
|
||
elems = sb.find_elements(sel, timeout=2)
|
||
if elems:
|
||
for e in elems[:15]:
|
||
p = _parse_preis(e.text)
|
||
if p and p > 600:
|
||
results.append({
|
||
"scanner": "kayak_multicity",
|
||
"preis": p,
|
||
"waehrung": "EUR",
|
||
"airline": airline_filter or via,
|
||
"abflug": abflug,
|
||
"ankunft": rueck,
|
||
"booking_url": booking_url,
|
||
})
|
||
if results:
|
||
break
|
||
except Exception:
|
||
pass
|
||
|
||
if not results:
|
||
for r in _preise_aus_body(body, "kayak_multicity", abflug):
|
||
if r["preis"] > 600:
|
||
r["ankunft"] = rueck
|
||
r["booking_url"] = booking_url
|
||
r["airline"] = airline_filter or via
|
||
results.append(r)
|
||
|
||
results = _validate_results(results, f"multicity{airline_label}", kabine)
|
||
print(f"[MC{airline_label}] Ergebnis: {[r['preis'] for r in results[:5]]}")
|
||
_dismiss_cookie_banner(sb)
|
||
sb.sleep(3)
|
||
_dismiss_comparison_popup(sb)
|
||
screenshot_b64 = _take_screenshot(sb)
|
||
return results[:10], screenshot_b64
|
||
|
||
|
||
def scrape_momondo(von, nach, tage=30, aufenthalt_tage=60,
|
||
trip_type="roundtrip", kabine="premium_economy",
|
||
gepaeck="1koffer+handgepaeck", airline_filter="",
|
||
layover_min=120, layover_max=300,
|
||
max_flugzeit_h=22, max_stops=2):
|
||
"""Momondo — gleiche Firma wie Kayak, aber oft andere Preise."""
|
||
abflug = (datetime.now() + timedelta(days=tage)).strftime("%Y-%m-%d")
|
||
rueck = (datetime.now() + timedelta(days=tage + aufenthalt_tage)).strftime("%Y-%m-%d") \
|
||
if trip_type == "roundtrip" else ""
|
||
kc = KABINE_KAYAK.get(kabine, "w")
|
||
bags = 1 if "koffer" in gepaeck else 0
|
||
booking_url = _booking_url_momondo(von, nach, abflug, rueck, kc, bags,
|
||
layover_min, layover_max, airline_filter,
|
||
max_flugzeit_h, max_stops)
|
||
airline_label = f" [{airline_filter}]" if airline_filter else ""
|
||
print(f"[MO{airline_label}] URL: {booking_url}")
|
||
|
||
results = []
|
||
screenshot_b64 = ""
|
||
|
||
with SB(uc=True, headless=True, chromium_arg="--no-sandbox --disable-dev-shm-usage") as sb:
|
||
sb.open(booking_url)
|
||
sb.sleep(8)
|
||
|
||
# Momondo Cookie-Consent wegklicken
|
||
for sel in ['button[class*="accept"]', '.RxNS-button-content',
|
||
'#onetrust-accept-btn-handler', 'button[title*="akzeptieren"]',
|
||
'button[title*="Alle akzeptieren"]', '.evidon-banner-acceptbutton']:
|
||
try:
|
||
sb.find_element(sel, timeout=2).click()
|
||
print(f"[MO] Consent geklickt: {sel}")
|
||
sb.sleep(3)
|
||
break
|
||
except Exception:
|
||
pass
|
||
|
||
# Nach Consent: Seite muss neu laden / Ergebnisse warten
|
||
sb.sleep(12)
|
||
title = sb.get_title()
|
||
body = sb.get_text("body")
|
||
print(f"[MO] Title: {title[:80]} | Body: {len(body)} chars")
|
||
|
||
for sel in ['.price-text', '.f8F1-price-text', 'div[class*="price"] span',
|
||
'span[class*="price"]', '.Iqt3', 'div.nrc6-price', '.price',
|
||
'[class*="resultPrice"]', '.lowest-price']:
|
||
try:
|
||
elems = sb.find_elements(sel)
|
||
if elems:
|
||
for e in elems[:15]:
|
||
p = _parse_preis(e.text)
|
||
if p:
|
||
results.append({"scanner": "momondo", "preis": p,
|
||
"waehrung": "EUR",
|
||
"airline": airline_filter or "",
|
||
"abflug": abflug, "ankunft": rueck,
|
||
"booking_url": booking_url})
|
||
if results:
|
||
break
|
||
except Exception:
|
||
pass
|
||
|
||
if not results:
|
||
for r in _preise_aus_body(body, "momondo", abflug):
|
||
r["ankunft"] = rueck
|
||
r["booking_url"] = booking_url
|
||
r["airline"] = airline_filter or ""
|
||
results.append(r)
|
||
|
||
pe_confirmed = _check_cabin_on_page(body, title, "premium_economy")
|
||
if not pe_confirmed:
|
||
print(f"[MO{airline_label}] WARNUNG: Premium Economy nicht auf Seite bestätigt!")
|
||
|
||
# Sidebar-Preise herausfiltern
|
||
anker_mo = _kayak_header_preis(sb) # Momondo hat gleiches Layout wie Kayak
|
||
results = _filter_sidebar_preise(results, anker_mo, f"momondo{airline_label}")
|
||
results = _validate_results(results, f"momondo{airline_label}", kabine)
|
||
print(f"[MO{airline_label}] Ergebnis: {[r['preis'] for r in results[:5]]}")
|
||
_dismiss_cookie_banner(sb)
|
||
sb.sleep(2)
|
||
_dismiss_comparison_popup(sb)
|
||
screenshot_b64 = _take_screenshot(sb)
|
||
return results[:10], screenshot_b64
|
||
|
||
|
||
def scrape_wego(von, nach, tage=30, aufenthalt_tage=60,
|
||
trip_type="roundtrip", kabine="premium_economy",
|
||
gepaeck="1koffer+handgepaeck", airline_filter="",
|
||
layover_min=120, layover_max=300,
|
||
max_flugzeit_h=22, max_stops=2):
|
||
"""Wego — asiatische Flugsuchmaschine, populär in Südostasien."""
|
||
abflug = (datetime.now() + timedelta(days=tage)).strftime("%Y-%m-%d")
|
||
rueck = (datetime.now() + timedelta(days=tage + aufenthalt_tage)).strftime("%Y-%m-%d") \
|
||
if trip_type == "roundtrip" else ""
|
||
|
||
KABINE_WEGO = {"economy": "economy", "premium_economy": "premiumEconomy",
|
||
"business": "business", "first": "first"}
|
||
kc = KABINE_WEGO.get(kabine, "premiumEconomy")
|
||
|
||
stadtname_wego = {"FRA": "frankfurt", "KTI": "phnom-penh", "HAN": "hanoi",
|
||
"BKK": "bangkok", "SGN": "ho-chi-minh-city", "HKG": "hong-kong"}
|
||
von_slug = stadtname_wego.get(von, von.lower())
|
||
nach_slug = stadtname_wego.get(nach, nach.lower())
|
||
if rueck:
|
||
booking_url = (f"https://www.wego.com/flights/{von.lower()}/{nach.lower()}"
|
||
f"/{abflug}/{rueck}"
|
||
f"?cabin_class={kc}&adults_count=1&sort=price¤cy_code=EUR")
|
||
else:
|
||
booking_url = (f"https://www.wego.com/flights/{von.lower()}/{nach.lower()}"
|
||
f"/{abflug}"
|
||
f"?cabin_class={kc}&adults_count=1&sort=price¤cy_code=EUR")
|
||
|
||
print(f"[WG] URL: {booking_url}")
|
||
results = []
|
||
screenshot_b64 = ""
|
||
|
||
with SB(uc=True, headless=True, chromium_arg="--no-sandbox --disable-dev-shm-usage") as sb:
|
||
sb.open(booking_url)
|
||
sb.sleep(18)
|
||
|
||
title = sb.get_title()
|
||
body = sb.get_text("body")
|
||
print(f"[WG] Title: {title[:80]} | Body: {len(body)} chars")
|
||
|
||
for sel in ['[class*="price"]', '[data-testid*="price"]',
|
||
'.flight-price', 'span[class*="Price"]',
|
||
'.fare-price', '[class*="FarePrice"]']:
|
||
try:
|
||
elems = sb.find_elements(sel)
|
||
if elems:
|
||
for e in elems[:15]:
|
||
p = _parse_preis(e.text)
|
||
if p:
|
||
results.append({"scanner": "wego", "preis": p,
|
||
"waehrung": "EUR", "airline": "",
|
||
"abflug": abflug, "ankunft": rueck,
|
||
"booking_url": booking_url})
|
||
if results:
|
||
break
|
||
except Exception:
|
||
pass
|
||
|
||
if not results:
|
||
for r in _preise_aus_body(body, "wego", abflug):
|
||
r["ankunft"] = rueck
|
||
r["booking_url"] = booking_url
|
||
results.append(r)
|
||
|
||
print(f"[WG] Ergebnis: {[r['preis'] for r in results[:5]]}")
|
||
_dismiss_comparison_popup(sb)
|
||
screenshot_b64 = _take_screenshot(sb)
|
||
return results[:10], screenshot_b64
|
||
|
||
|
||
def _parse_preis_usd(text):
|
||
"""Parst USD-Preise aus Text wie 'USD 1,388.60' und wandelt grob in EUR um."""
|
||
if not text:
|
||
return None
|
||
# USD-Format: 1,388.60 (Komma als Tausender, Punkt als Dezimal)
|
||
for p in [r'USD\s?([\d,]+\.?\d*)', r'\$\s?([\d,]+\.?\d*)']:
|
||
m = re.search(p, text)
|
||
if m:
|
||
try:
|
||
v = float(m.group(1).replace(',', ''))
|
||
eur = round(v * 0.92, 2) # grobe USD→EUR Umrechnung
|
||
if 200 < eur < 15000:
|
||
return eur
|
||
except ValueError:
|
||
pass
|
||
return None
|
||
|
||
|
||
def scrape_traveloka(von, nach, tage=30, aufenthalt_tage=60,
|
||
trip_type="roundtrip", kabine="premium_economy",
|
||
gepaeck="1koffer+handgepaeck", airline_filter="",
|
||
layover_min=120, layover_max=300,
|
||
max_flugzeit_h=22, max_stops=2):
|
||
"""Traveloka — größte Reiseplattform Südostasiens. Preise in USD, werden in EUR umgerechnet."""
|
||
abflug = (datetime.now() + timedelta(days=tage)).strftime("%d-%m-%Y")
|
||
rueck = (datetime.now() + timedelta(days=tage + aufenthalt_tage)).strftime("%d-%m-%Y") \
|
||
if trip_type == "roundtrip" else ""
|
||
abflug_iso = (datetime.now() + timedelta(days=tage)).strftime("%Y-%m-%d")
|
||
rueck_iso = (datetime.now() + timedelta(days=tage + aufenthalt_tage)).strftime("%Y-%m-%d") \
|
||
if trip_type == "roundtrip" else ""
|
||
|
||
KABINE_TV = {"economy": "ECONOMY", "premium_economy": "PREMIUM_ECONOMY",
|
||
"business": "BUSINESS", "first": "FIRST_CLASS"}
|
||
kc = KABINE_TV.get(kabine, "PREMIUM_ECONOMY")
|
||
|
||
if rueck:
|
||
booking_url = (f"https://www.traveloka.com/en-en/flight/fullsearch?"
|
||
f"ap={von}.{nach}&dt={abflug}.{rueck}"
|
||
f"&ps=1.0.0&sc={kc}")
|
||
else:
|
||
booking_url = (f"https://www.traveloka.com/en-en/flight/fullsearch?"
|
||
f"ap={von}.{nach}&dt={abflug}"
|
||
f"&ps=1.0.0&sc={kc}")
|
||
|
||
print(f"[TV] URL: {booking_url}")
|
||
results = []
|
||
screenshot_b64 = ""
|
||
|
||
with SB(uc=True, headless=True, chromium_arg="--no-sandbox --disable-dev-shm-usage") as sb:
|
||
sb.open(booking_url)
|
||
sb.sleep(18)
|
||
|
||
title = sb.get_title()
|
||
body = sb.get_text("body")
|
||
print(f"[TV] Title: {title[:80]} | Body: {len(body)} chars")
|
||
|
||
# Preise aus dem Body-Text extrahieren (USD → EUR)
|
||
seen = set()
|
||
for m in re.finditer(r'USD\s?([\d,]+\.?\d*)', body):
|
||
try:
|
||
usd = float(m.group(1).replace(',', ''))
|
||
eur = round(usd * 0.92)
|
||
if 400 < eur < 12000 and eur not in seen:
|
||
seen.add(eur)
|
||
results.append({"scanner": "traveloka", "preis": eur,
|
||
"waehrung": "EUR", "airline": "",
|
||
"abflug": abflug_iso, "ankunft": rueck_iso,
|
||
"booking_url": booking_url})
|
||
except ValueError:
|
||
pass
|
||
|
||
results.sort(key=lambda x: x["preis"])
|
||
results = _validate_results(results, "traveloka", "premium_economy")
|
||
print(f"[TV] Ergebnis: {[r['preis'] for r in results[:5]]}")
|
||
_dismiss_comparison_popup(sb)
|
||
screenshot_b64 = _take_screenshot(sb)
|
||
return results[:10], screenshot_b64
|
||
|
||
|
||
def scrape_skyscanner(von, nach, tage=30, aufenthalt_tage=60,
|
||
trip_type="roundtrip", kabine="premium_economy",
|
||
gepaeck="1koffer+handgepaeck", airline_filter="",
|
||
layover_min=120, layover_max=300,
|
||
max_flugzeit_h=22, max_stops=2):
|
||
"""Skyscanner hat starken Bot-Schutz — übersprungen."""
|
||
print("[SS] Skyscanner übersprungen (Bot-Detection)")
|
||
return [], ""
|