flugpreisscanner/hub/src/db.py
Orbitalo a90a73b7cb fix: KI-Augen blockiert keine Scraper-Ergebnisse mehr + Economy komplett
- scheduler.py: Scraper-Preise haben Vorrang vor KI-Screenshot-Analyse
- worker.py: Cabin-Check dynamisch (economy statt hardcoded PE)
- cathay_pacific Job deaktiviert (nicht auf Nodes implementiert)
- Doppelte momondo/trip Jobs bereinigt
- Economy QC-Filter 600-1400 EUR, Roundtrip 50-95 Tage
- Gepaeckzuschlag Economy Light +140 EUR Roundtrip
- Vision-AI Kabinen+Preis-Klassifizierung
- KI-Plausi in Batches, Telegram Bot, Source-Health
- Scan-Limits 3/Tag/Quelle, Geo-Skip Asia
- .gitignore hinzugefuegt
2026-03-01 03:25:54 +00:00

368 lines
14 KiB
Python

import sqlite3
import os
from datetime import datetime, timedelta
DB_PATH = os.environ.get("DB_PATH", "/data/flugscanner.db")
def get_conn():
conn = sqlite3.connect(DB_PATH, timeout=30)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=30000")
return conn
def init_db():
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = get_conn()
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS nodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
tailscale_ip TEXT NOT NULL,
location TEXT,
last_seen TEXT,
status TEXT DEFAULT 'unknown'
)
""")
c.execute("""
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
scanner TEXT NOT NULL,
von TEXT NOT NULL,
nach TEXT NOT NULL,
tage INTEGER DEFAULT 30,
aufenthalt_tage INTEGER DEFAULT 60,
trip_type TEXT DEFAULT 'roundtrip',
kabine TEXT DEFAULT 'premium_economy',
gepaeck TEXT DEFAULT '1koffer+handgepaeck',
airline_filter TEXT DEFAULT '',
layover_min INTEGER DEFAULT 120,
layover_max INTEGER DEFAULT 300,
max_flugzeit_h INTEGER DEFAULT 22,
max_stops INTEGER DEFAULT 2,
via TEXT DEFAULT '',
stopover_min_h INTEGER DEFAULT 0,
stopover_max_h INTEGER DEFAULT 0,
intervall TEXT DEFAULT 'daily',
aktiv INTEGER DEFAULT 1,
created_at TEXT DEFAULT (datetime('now'))
)
""")
for col_sql in [
"ALTER TABLE jobs ADD COLUMN gepaeck TEXT DEFAULT '1koffer+handgepaeck'",
"ALTER TABLE jobs ADD COLUMN aufenthalt_tage INTEGER DEFAULT 60",
"ALTER TABLE jobs ADD COLUMN trip_type TEXT DEFAULT 'roundtrip'",
"ALTER TABLE jobs ADD COLUMN kabine TEXT DEFAULT 'premium_economy'",
"ALTER TABLE jobs ADD COLUMN airline_filter TEXT DEFAULT ''",
"ALTER TABLE jobs ADD COLUMN layover_min INTEGER DEFAULT 120",
"ALTER TABLE jobs ADD COLUMN layover_max INTEGER DEFAULT 300",
"ALTER TABLE jobs ADD COLUMN max_flugzeit_h INTEGER DEFAULT 22",
"ALTER TABLE jobs ADD COLUMN max_stops INTEGER DEFAULT 2",
"ALTER TABLE jobs ADD COLUMN via TEXT DEFAULT ''",
"ALTER TABLE jobs ADD COLUMN stopover_min_h INTEGER DEFAULT 0",
"ALTER TABLE jobs ADD COLUMN stopover_max_h INTEGER DEFAULT 0",
]:
try:
c.execute(col_sql)
except Exception:
pass
c.execute("""
CREATE TABLE IF NOT EXISTS prices (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id INTEGER,
scanner TEXT NOT NULL,
node TEXT NOT NULL,
preis REAL NOT NULL,
waehrung TEXT DEFAULT 'EUR',
airline TEXT,
abflug TEXT,
ankunft TEXT,
von TEXT,
nach TEXT,
booking_url TEXT,
screenshot_id INTEGER,
scraped_at TEXT DEFAULT (datetime('now'))
)
""")
for col_sql in [
"ALTER TABLE prices ADD COLUMN booking_url TEXT",
"ALTER TABLE prices ADD COLUMN screenshot_id INTEGER",
"ALTER TABLE prices ADD COLUMN plausibel INTEGER",
"ALTER TABLE prices ADD COLUMN plausi_grund TEXT DEFAULT ''",
"ALTER TABLE prices ADD COLUMN preis_korrigiert REAL",
"ALTER TABLE prices ADD COLUMN korrektur_grund TEXT DEFAULT ''",
]:
try:
c.execute(col_sql)
except Exception:
pass
c.execute("""
CREATE TABLE IF NOT EXISTS screenshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id INTEGER,
node TEXT,
scanner TEXT,
screenshot_b64 TEXT,
scraped_at TEXT DEFAULT (datetime('now'))
)
""")
c.execute("""
CREATE TABLE IF NOT EXISTS analyses (
id INTEGER PRIMARY KEY AUTOINCREMENT,
von TEXT,
nach TEXT,
guenstigster_preis REAL,
guenstigster_anbieter TEXT,
ki_empfehlung TEXT,
ki_analyse TEXT,
created_at TEXT DEFAULT (datetime('now'))
)
""")
c.execute("""
CREATE TABLE IF NOT EXISTS prompts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
inhalt TEXT NOT NULL,
updated_at TEXT DEFAULT (datetime('now'))
)
""")
c.execute("""
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
level TEXT DEFAULT 'INFO',
message TEXT,
created_at TEXT DEFAULT (datetime('now'))
)
""")
# Quell-Gesundheit pro Node+Scanner — Gedächtnis des Systems
c.execute("""
CREATE TABLE IF NOT EXISTS source_health (
id INTEGER PRIMARY KEY AUTOINCREMENT,
node TEXT NOT NULL,
scanner TEXT NOT NULL,
status TEXT DEFAULT 'unknown',
erfolge_heute INTEGER DEFAULT 0,
fehler_heute INTEGER DEFAULT 0,
letzter_erfolg TEXT,
letzter_fehler TEXT,
fehler_typ TEXT DEFAULT '',
pausiert_bis TEXT,
updated_at TEXT DEFAULT (datetime('now')),
UNIQUE(node, scanner)
)
""")
# Scan-Ergebnisse: was die KI auf jedem Screenshot gesehen hat
c.execute("""
CREATE TABLE IF NOT EXISTS scan_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
node TEXT NOT NULL,
scanner TEXT NOT NULL,
screenshot_id INTEGER,
ki_status TEXT NOT NULL,
ki_details TEXT DEFAULT '',
preise_gefunden INTEGER DEFAULT 0,
aktion TEXT DEFAULT '',
created_at TEXT DEFAULT (datetime('now'))
)
""")
# Standard-Prompt
PROMPT_TEXT = """Du bist ein Flugpreis-Analyst. Analysiere Preisdaten fuer folgenden Flug:
STRECKE: ROUNDTRIP Frankfurt (FRA) to Phnom Penh Techo Airport (KTI)
KABINE: Premium Economy | GEPAECK: 1 Aufgabekoffer + Handgepaeck | AUFENTHALT: ~2 Monate
BEVORZUGTE AIRLINES:
- China Southern (CZ): Hub Guangzhou (CAN)
- Cathay Pacific (CX): Hub Hongkong (HKG)
- Singapore Airlines (SQ): Hub Singapur (SIN)
- Thai Airways (TG): Hub Bangkok (BKK)
- Vietnam Airlines (VN): Hub Hanoi (HAN) - Durchgepaeck FRA-KTI
UMSTIEG-REGEL: Umstiegszeit an asiatischen Hubs MUSS 2-5 Stunden sein (120-300 Minuten).
Zu kurz (<2h) = Gepaeck-Risiko. Zu lang (>5h) = unzumutbare Wartezeit.
WICHTIG: Preise unter 1000 EUR sind fuer Roundtrip PE + Koffer + 2 Monate hoechstwahrscheinlich unplausibel.
Aktuelle Preise (Anbieter | Node | Airline | Preis):
{preise_heute}
Verlauf 30 Tage:
{preisverlauf}
Statistik: Durchschnitt {avg} EUR | Min {min} EUR | Max {max} EUR
Antworte auf Deutsch:
EMPFEHLUNG: [JETZT BUCHEN / WARTEN / NEUTRAL]
BEGRUENDUNG: [1-2 Saetze]
BESTER_PREIS: [Anbieter + Airline + Preis + Node]
BESTE_AIRLINE: [welche der 4 Airlines gerade am guenstigsten]
TREND: [STEIGEND / FALLEND / STABIL]
GEO_UNTERSCHIED: [DE-Scanner vs. KH-Scanner Preisdifferenz]
PLAUSI_CHECK: [Preise unter 1000 EUR einzeln einordnen - was stimmt da nicht]
HKG_STOPOVER: [Vergleich: Direktverbindung vs. FRA-HKG-KTI Multi-City — lohnt sich der HKG-Tag? Preis + ca. 150 EUR Hotel HKG einrechnen]"""
c.execute("INSERT OR IGNORE INTO prompts (name, inhalt) VALUES (?, ?)",
("ki_auswertung", PROMPT_TEXT))
# Standard-Nodes
c.execute("""
INSERT OR IGNORE INTO nodes (name, tailscale_ip, location) VALUES
('flugscanner-asia', '100.112.190.22', 'Kambodscha'),
('flugscanner-mu', '100.75.182.15', 'Muldenstein DE')
""")
# Standard-Jobs nur einfügen wenn noch gar keine Jobs vorhanden
# Verhindert Duplikate bei Container-Neustarts
job_count = c.execute("SELECT COUNT(*) FROM jobs").fetchone()[0]
if job_count == 0:
c.execute("""
INSERT INTO jobs
(scanner, von, nach, tage, aufenthalt_tage, trip_type, kabine, gepaeck,
airline_filter, layover_min, layover_max, max_flugzeit_h, max_stops,
via, stopover_min_h, stopover_max_h, intervall)
VALUES
('kayak', 'FRA','KTI',30,60,'roundtrip','premium_economy','1koffer+handgepaeck','', 120,300,22,2,'', 0, 0,'daily'),
('trip', 'FRA','KTI',30,60,'roundtrip','premium_economy','1koffer+handgepaeck','', 120,300,22,2,'', 0, 0,'daily'),
('kayak', 'FRA','KTI',30,60,'roundtrip','premium_economy','1koffer+handgepaeck','CZ',120,300,22,2,'', 0, 0,'daily'),
('kayak', 'FRA','KTI',30,60,'roundtrip','premium_economy','1koffer+handgepaeck','CX',120,300,22,2,'', 0, 0,'daily'),
('kayak', 'FRA','KTI',30,60,'roundtrip','premium_economy','1koffer+handgepaeck','SQ',120,300,22,2,'', 0, 0,'daily'),
('kayak', 'FRA','KTI',30,60,'roundtrip','premium_economy','1koffer+handgepaeck','TG',120,300,22,2,'', 0, 0,'daily'),
('kayak_multicity','FRA','KTI',30,60,'multicity', 'premium_economy','1koffer+handgepaeck','CX',120,300,22,2,'HKG',20,30,'daily'),
('kayak_multicity','FRA','KTI',30,60,'multicity', 'premium_economy','1koffer+handgepaeck','', 120,300,22,2,'HKG',20,30,'daily')
""")
# Cathay Pacific direkt — immer hinzufügen wenn noch nicht vorhanden
c.execute("""
INSERT INTO jobs
(scanner, von, nach, tage, aufenthalt_tage, trip_type, kabine, gepaeck,
airline_filter, layover_min, layover_max, max_flugzeit_h, max_stops,
via, stopover_min_h, stopover_max_h, intervall)
SELECT 'cathay_pacific','FRA','KTI',90,75,'roundtrip','economy','1koffer+handgepaeck','CX',120,300,22,2,'',0,0,'daily'
WHERE NOT EXISTS (SELECT 1 FROM jobs WHERE scanner='cathay_pacific')
""")
conn.commit()
conn.close()
def log(message, level="INFO"):
conn = get_conn()
conn.execute(
"INSERT INTO logs (level, message) VALUES (?, ?)",
(level, message)
)
conn.commit()
conn.close()
print(f"[{level}] {message}")
def source_health_update(node: str, scanner: str, erfolg: bool, fehler_typ: str = ""):
"""Aktualisiert die Gesundheit einer Quelle nach einem Scan-Versuch."""
conn = get_conn()
now = datetime.now().isoformat()
existing = conn.execute(
"SELECT id FROM source_health WHERE node=? AND scanner=?",
(node, scanner)
).fetchone()
if existing:
if erfolg:
conn.execute("""
UPDATE source_health
SET status='healthy', erfolge_heute=erfolge_heute+1,
letzter_erfolg=?, fehler_typ='', updated_at=?
WHERE node=? AND scanner=?
""", (now, now, node, scanner))
else:
conn.execute("""
UPDATE source_health
SET status='unhealthy', fehler_heute=fehler_heute+1,
letzter_fehler=?, fehler_typ=?, updated_at=?
WHERE node=? AND scanner=?
""", (now, fehler_typ, now, node, scanner))
else:
status = 'healthy' if erfolg else 'unhealthy'
conn.execute("""
INSERT INTO source_health (node, scanner, status, erfolge_heute, fehler_heute,
letzter_erfolg, letzter_fehler, fehler_typ, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (node, scanner, status,
1 if erfolg else 0, 0 if erfolg else 1,
now if erfolg else None, now if not erfolg else None,
fehler_typ, now))
conn.commit()
conn.close()
def source_health_pause(node: str, scanner: str, stunden: int = 24):
"""Pausiert eine Quelle für N Stunden (z.B. nach CAPTCHA)."""
conn = get_conn()
bis = (datetime.now() + timedelta(hours=stunden)).isoformat()
conn.execute("""
UPDATE source_health SET pausiert_bis=?, status='paused', updated_at=datetime('now')
WHERE node=? AND scanner=?
""", (bis, node, scanner))
conn.commit()
conn.close()
def source_health_ist_pausiert(node: str, scanner: str) -> bool:
"""Prüft ob eine Quelle gerade pausiert ist."""
conn = get_conn()
row = conn.execute(
"SELECT pausiert_bis FROM source_health WHERE node=? AND scanner=?",
(node, scanner)
).fetchone()
conn.close()
if not row or not row["pausiert_bis"]:
return False
return datetime.fromisoformat(row["pausiert_bis"]) > datetime.now()
def source_health_reset_daily():
"""Täglicher Reset der Tageszähler (erfolge_heute, fehler_heute)."""
conn = get_conn()
conn.execute("UPDATE source_health SET erfolge_heute=0, fehler_heute=0")
conn.commit()
conn.close()
def source_health_get_all() -> list:
"""Gibt alle Source-Health-Einträge zurück, sortiert nach Erfolgen."""
conn = get_conn()
rows = conn.execute("""
SELECT node, scanner, status, erfolge_heute, fehler_heute,
letzter_erfolg, letzter_fehler, fehler_typ, pausiert_bis
FROM source_health
ORDER BY erfolge_heute DESC, fehler_heute ASC
""").fetchall()
conn.close()
return [dict(r) for r in rows]
def scan_result_save(node: str, scanner: str, screenshot_id: int,
ki_status: str, ki_details: str, preise: int, aktion: str):
"""Speichert was die KI auf einem Screenshot gesehen hat."""
conn = get_conn()
conn.execute("""
INSERT INTO scan_results (node, scanner, screenshot_id, ki_status,
ki_details, preise_gefunden, aktion)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (node, scanner, screenshot_id, ki_status, ki_details, preise, aktion))
conn.commit()
conn.close()