flugpreisscanner/hub/src/scheduler.py
Cursor a03a58d01e feat: Screenshot-Spalte im Dashboard — Full-Page CDP Screenshots
- worker.py: _take_screenshot() via Chrome CDP (JPEG 55%, max 3000px)
- worker.py: alle Scraper geben (results, screenshot_b64) Tuple zurück
- agent.py: screenshot_b64 in API-Response enthalten
- scheduler.py: speichere_screenshot() Funktion + Verknüpfung mit prices
- db.py: screenshots-Tabelle + screenshot_id FK in prices
- web.py: /api/screenshot/<id> Endpoint (base64→JPEG Response)
- web.py: 📷 Button in Preistabelle → Lightbox mit Full-Page Screenshot

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-25 20:51:10 +07:00

242 lines
7.7 KiB
Python

import os
import time
import threading
import requests
import schedule
from datetime import datetime, timedelta
from db import init_db, get_conn, log
from ki import auswerten
# Verhindert dass zwei Läufe gleichzeitig laufen
_scan_lock = threading.Lock()
_lauf_aktiv = False
def get_nodes():
conn = get_conn()
nodes = conn.execute(
"SELECT * FROM nodes WHERE status != 'disabled'"
).fetchall()
conn.close()
return [dict(n) for n in nodes]
def get_aktive_jobs(flex=False):
"""
flex=False → normale Jobs (datum_flex IS NULL or 0)
flex=True → alle Jobs, tage wird durch Aufrufer variiert
"""
conn = get_conn()
jobs = conn.execute("SELECT * FROM jobs WHERE aktiv = 1").fetchall()
conn.close()
return [dict(j) for j in jobs]
def node_ping(node):
try:
r = requests.get(f"http://{node['tailscale_ip']}:5010/status", timeout=5)
return r.status_code == 200
except Exception:
return False
def update_node_status(name, status):
conn = get_conn()
conn.execute(
"UPDATE nodes SET status=?, last_seen=datetime('now') WHERE name=?",
(status, name)
)
conn.commit()
conn.close()
def dispatch_job(node, job, tage_override=None):
payload = {
"scanner": job["scanner"],
"von": job["von"],
"nach": job["nach"],
"tage": tage_override if tage_override is not None else job["tage"],
"aufenthalt_tage": job.get("aufenthalt_tage", 60),
"trip_type": job.get("trip_type", "roundtrip"),
"kabine": job.get("kabine", "premium_economy"),
"gepaeck": job.get("gepaeck", "1koffer+handgepaeck"),
"airline_filter": job.get("airline_filter", ""),
"layover_min": job.get("layover_min", 120),
"layover_max": job.get("layover_max", 300),
"max_flugzeit_h": job.get("max_flugzeit_h", 22),
"max_stops": job.get("max_stops", 2),
"via": job.get("via", ""),
"stopover_min_h": job.get("stopover_min_h", 20),
"stopover_max_h": job.get("stopover_max_h", 30),
}
try:
r = requests.post(
f"http://{node['tailscale_ip']}:5010/job",
json=payload,
timeout=300
)
if r.status_code == 200:
data = r.json()
results = data.get("results", [])
screenshot_b64 = data.get("screenshot_b64", "")
via_label = f" via {job.get('via','')}" if job.get('via') else ""
log(f"{node['name']}: {len(results)} Preise ← {job['scanner']}"
f"{' ['+job.get('airline_filter','')+']' if job.get('airline_filter') else ''}"
f"{via_label}"
f"{' +'+str(tage_override)+'T' if tage_override else ''}")
screenshot_id = speichere_screenshot(screenshot_b64, node["name"], job)
speichere_preise(results, node["name"], job, screenshot_id)
return True
else:
log(f"{node['name']}: Fehler {r.status_code} bei {job['scanner']}", "ERROR")
return False
except Exception as e:
log(f"{node['name']}: Exception {e}", "ERROR")
update_node_status(node["name"], "offline")
return False
def speichere_screenshot(screenshot_b64, node_name, job):
"""Speichert Screenshot in DB, gibt screenshot_id zurück (oder None)."""
if not screenshot_b64:
return None
try:
conn = get_conn()
cur = conn.execute("""
INSERT INTO screenshots (job_id, node, scanner, screenshot_b64)
VALUES (?, ?, ?, ?)
""", (job["id"], node_name, job["scanner"], screenshot_b64))
screenshot_id = cur.lastrowid
conn.commit()
conn.close()
return screenshot_id
except Exception as e:
log(f"Screenshot-Speicher-Fehler: {e}", "WARN")
return None
def speichere_preise(results, node_name, job, screenshot_id=None):
conn = get_conn()
for r in results:
conn.execute("""
INSERT INTO prices
(job_id, scanner, node, preis, waehrung, airline, abflug, ankunft, von, nach, booking_url, screenshot_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
job["id"], r.get("scanner", job["scanner"]), node_name,
r["preis"], r.get("waehrung", "EUR"), r.get("airline", ""),
r.get("abflug", ""), r.get("ankunft", ""),
job["von"], job["nach"],
r.get("booking_url", ""),
screenshot_id,
))
conn.commit()
conn.close()
def scraping_lauf(label="Standard", flex_tage_liste=None):
"""
Führt alle aktiven Jobs auf allen Nodes aus.
Übersprungen wenn ein anderer Lauf noch aktiv ist (Lock-Schutz).
"""
global _lauf_aktiv
if not _scan_lock.acquire(blocking=False):
log(f"Lauf [{label}] übersprungen — vorheriger Lauf noch aktiv", "WARN")
return
_lauf_aktiv = True
start = datetime.now()
try:
log(f"=== Scraping-Lauf [{label}] gestartet ===")
nodes = get_nodes()
jobs = get_aktive_jobs()
if not nodes:
log("Keine aktiven Nodes konfiguriert", "WARN")
return
tage_varianten = flex_tage_liste or [None]
online = 0
fehler = 0
preise_gesamt = 0
for node in nodes:
if node_ping(node):
update_node_status(node["name"], "online")
online += 1
for job in jobs:
for tage_var in tage_varianten:
try:
ok = dispatch_job(node, job, tage_override=tage_var)
if not ok:
fehler += 1
except Exception as e:
log(f"Job-Fehler {node['name']}/{job['scanner']}: {e}", "ERROR")
fehler += 1
else:
log(f"Node {node['name']} nicht erreichbar", "WARN")
update_node_status(node["name"], "offline")
dauer = round((datetime.now() - start).total_seconds())
log(f"Scraping [{label}] fertig — {online}/{len(nodes)} Nodes | "
f"{fehler} Fehler | {dauer}s Laufzeit")
try:
auswerten()
except Exception as e:
log(f"KI-Fehler: {e}", "ERROR")
log(f"=== Lauf [{label}] beendet ===")
finally:
_lauf_aktiv = False
_scan_lock.release()
def standard_lauf():
"""30-Minuten-Takt — in eigenem Thread damit der Scheduler nicht blockiert."""
threading.Thread(
target=scraping_lauf,
kwargs={"label": datetime.now().strftime("%H:%M")},
daemon=True
).start()
def flex_lauf():
"""Di/Mi 23:30 — ±3 Tage Datumsfenster."""
wochentag = datetime.now().weekday()
if wochentag not in (1, 2):
log("Flex-Lauf: heute kein Di/Mi — übersprungen")
return
basis = 30
flex_varianten = list(range(basis - 3, basis + 4))
log(f"=== Flex-Lauf Di/Mi ±3 Tage: {flex_varianten} ===")
threading.Thread(
target=scraping_lauf,
kwargs={"label": "Flex-Di/Mi", "flex_tage_liste": flex_varianten},
daemon=True
).start()
def run():
init_db()
log("Scheduler gestartet — alle 30 Minuten + Flex Di/Mi 23:30")
# Alle 30 Minuten Standard-Scan
schedule.every(30).minutes.do(standard_lauf)
# Di + Mi um 23:30: erweiterter Flex-Lauf mit ±3 Tage Datumsfenster
schedule.every().day.at("23:30").do(flex_lauf)
log(f"Nächster Lauf: {str(schedule.jobs[0].next_run)[:16]}")
while True:
schedule.run_pending()
time.sleep(30)
if __name__ == "__main__":
run()