feat: 4x täglich (06/11/18/23h) + Flex-Lauf Di/Mi 23:30 mit ±3 Tage Datumsfenster

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Cursor 2026-02-25 16:06:55 +07:00
parent f85c049aca
commit 4ba004cc76
2 changed files with 107 additions and 46 deletions

View file

@ -1,9 +1,9 @@
import os
import time
import json
import threading
import requests
import schedule
from datetime import datetime
from datetime import datetime, timedelta
from db import init_db, get_conn, log
from ki import auswerten
@ -17,21 +17,20 @@ def get_nodes():
return [dict(n) for n in nodes]
def get_aktive_jobs():
def get_aktive_jobs(flex=False):
"""
flex=False normale Jobs (datum_flex IS NULL or 0)
flex=True alle Jobs, tage wird durch Aufrufer variiert
"""
conn = get_conn()
jobs = conn.execute(
"SELECT * FROM jobs WHERE aktiv = 1"
).fetchall()
jobs = conn.execute("SELECT * FROM jobs WHERE aktiv = 1").fetchall()
conn.close()
return [dict(j) for j in jobs]
def node_ping(node):
try:
r = requests.get(
f"http://{node['tailscale_ip']}:5010/status",
timeout=5
)
r = requests.get(f"http://{node['tailscale_ip']}:5010/status", timeout=5)
return r.status_code == 200
except Exception:
return False
@ -47,23 +46,22 @@ def update_node_status(name, status):
conn.close()
def dispatch_job(node, job):
def dispatch_job(node, job, tage_override=None):
payload = {
"scanner": job["scanner"],
"von": job["von"],
"nach": job["nach"],
"tage": job["tage"],
"scanner": job["scanner"],
"von": job["von"],
"nach": job["nach"],
"tage": tage_override if tage_override is not None else job["tage"],
"aufenthalt_tage": job.get("aufenthalt_tage", 60),
"trip_type": job.get("trip_type", "roundtrip"),
"kabine": job.get("kabine", "premium_economy"),
"gepaeck": job.get("gepaeck", "1koffer+handgepaeck"),
"trip_type": job.get("trip_type", "roundtrip"),
"kabine": job.get("kabine", "premium_economy"),
"gepaeck": job.get("gepaeck", "1koffer+handgepaeck"),
"airline_filter": job.get("airline_filter", ""),
"layover_min": job.get("layover_min", 120),
"layover_max": job.get("layover_max", 300),
"max_flugzeit_h": job.get("max_flugzeit_h", 22),
"max_stops": job.get("max_stops", 2),
}
log(f"Job an {node['name']} ({node['tailscale_ip']}): {payload}")
try:
r = requests.post(
f"http://{node['tailscale_ip']}:5010/job",
@ -72,11 +70,13 @@ def dispatch_job(node, job):
)
if r.status_code == 200:
results = r.json().get("results", [])
log(f"{node['name']}: {len(results)} Preise erhalten")
log(f"{node['name']}: {len(results)} Preise ← {job['scanner']}"
f"{' ['+job.get('airline_filter','')+']' if job.get('airline_filter') else ''}"
f"{' +'+str(tage_override)+'T' if tage_override else ''}")
speichere_preise(results, node["name"], job)
return True
else:
log(f"{node['name']}: Fehler {r.status_code}", "ERROR")
log(f"{node['name']}: Fehler {r.status_code} bei {job['scanner']}", "ERROR")
return False
except Exception as e:
log(f"{node['name']}: Exception {e}", "ERROR")
@ -102,37 +102,75 @@ def speichere_preise(results, node_name, job):
conn.close()
def taeglich_scrapen():
log("=== Täglicher Scraping-Lauf gestartet ===")
def scraping_lauf(label="Standard", flex_tage_liste=None):
"""
Führt alle aktiven Jobs auf allen Nodes aus.
flex_tage_liste: Liste von 'tage'-Werten für Datums-Flexibilität (z.B. [27,28,30,32,33])
"""
log(f"=== Scraping-Lauf [{label}] gestartet ===")
nodes = get_nodes()
jobs = get_aktive_jobs()
jobs = get_aktive_jobs()
if not nodes:
log("Keine aktiven Nodes konfiguriert", "WARN")
return
tage_varianten = flex_tage_liste or [None] # None = Job-Default verwenden
online = 0
for node in nodes:
if node_ping(node):
update_node_status(node["name"], "online")
online += 1
for job in jobs:
dispatch_job(node, job)
for tage_var in tage_varianten:
dispatch_job(node, job, tage_override=tage_var)
else:
log(f"Node {node['name']} nicht erreichbar", "WARN")
update_node_status(node["name"], "offline")
log("Scraping abgeschlossen — KI-Auswertung läuft")
log(f"Scraping [{label}] abgeschlossen — {online}/{len(nodes)} Nodes online")
auswerten()
log("=== Lauf beendet ===")
log(f"=== Lauf [{label}] beendet ===")
def standard_lauf():
"""Normaler 4×-täglich Lauf mit Standard-Datum."""
scraping_lauf(label=datetime.now().strftime("%H:%M"))
def flex_lauf():
"""
Di/Mi Nacht: ±3 Tage Datums-Flexibilität.
Sucht Abflug in 27-33 Tagen statt nur 30 Tagen.
"""
wochentag = datetime.now().weekday() # 0=Mo, 1=Di, 2=Mi
if wochentag not in (1, 2):
log("Flex-Lauf: heute kein Di/Mi — übersprungen")
return
# 7 Varianten: 30±3 Tage
basis = 30
flex_varianten = list(range(basis - 3, basis + 4)) # [27,28,29,30,31,32,33]
log(f"=== Flex-Lauf Di/Mi ±3 Tage: {flex_varianten} ===")
scraping_lauf(label="Flex-Di/Mi", flex_tage_liste=flex_varianten)
def run():
init_db()
log("Scheduler gestartet")
log("Scheduler gestartet — 4× täglich + Flex Di/Mi")
# Täglich um 06:00
schedule.every().day.at("06:00").do(taeglich_scrapen)
# 4× täglich Standard-Scan
schedule.every().day.at("06:00").do(standard_lauf)
schedule.every().day.at("11:00").do(standard_lauf)
schedule.every().day.at("18:00").do(standard_lauf)
schedule.every().day.at("23:00").do(standard_lauf)
log("Nächster Lauf: täglich 06:00 Uhr")
# Di + Mi um 23:30: erweiterter Flex-Lauf mit ±3 Tage Datumsfenster
schedule.every().day.at("23:30").do(flex_lauf)
naechste = [str(j.next_run)[:16] for j in schedule.jobs[:3]]
log(f"Nächste Läufe: {', '.join(naechste)} ...")
while True:
schedule.run_pending()

View file

@ -1,11 +1,10 @@
import os
import threading
from datetime import datetime
from flask import Flask, jsonify, request, render_template_string
from flask_cors import CORS
from db import init_db, get_conn, log
from scheduler import taeglich_scrapen
import schedule
import time
from scheduler import scraping_lauf
app = Flask(__name__)
CORS(app)
@ -78,9 +77,12 @@ BASE_HTML = """<!DOCTYPE html>
</html>"""
OVERVIEW_HTML = BASE_HTML.replace("{% block content %}{% endblock %}", """
<div style="background:#451a03;border:1px solid#92400e;border-radius:8px;padding:0.6rem 1rem;margin-bottom:1.2rem;font-size:0.85rem;color:#fcd34d">
<strong>FRA KTI</strong> &nbsp;·&nbsp; Roundtrip &nbsp;·&nbsp; Premium Economy &nbsp;·&nbsp; 1 Aufgabekoffer + Handgepäck &nbsp;·&nbsp; ~2 Monate Aufenthalt
&nbsp;·&nbsp; <span style="color:#f87171;font-weight:600"> Preise unter 1.000 bitte manuell prüfen</span>
<div style="background:#0c1a3a;border:1px solid#1e40af;border-radius:8px;padding:0.6rem 1rem;margin-bottom:0.6rem;font-size:0.85rem;color:#93c5fd;display:flex;justify-content:space-between;align-items:center;flex-wrap:wrap;gap:0.5rem">
<span> <strong>FRA KTI</strong> &nbsp;·&nbsp; Roundtrip &nbsp;·&nbsp; Premium Economy &nbsp;·&nbsp; 1 Koffer + Handgepäck &nbsp;·&nbsp; ~2 Monate &nbsp;·&nbsp; max 22h / 2 Stopps / Umstieg 2-5h</span>
<span id="schedule-info" style="font-size:0.78rem;color:#60a5fa">Lade Zeitplan...</span>
</div>
<div style="background:#451a03;border:1px solid#92400e;border-radius:8px;padding:0.5rem 1rem;margin-bottom:1.2rem;font-size:0.82rem;color:#fcd34d">
<strong>Preise unter 1.000 </strong> bitte im Dashboard manuell prüfen wahrscheinlich Economy, kein Koffer oder falsches Segment
</div>
<div class="grid-3" style="margin-bottom:1.5rem">
@ -167,14 +169,20 @@ function preisZelle(preis, booking_url) {
}
async function ladeUebersicht() {
const [stats, ki, preise, nodes, vergleich] = await Promise.all([
const [stats, ki, preise, nodes, vergleich, sched] = await Promise.all([
fetch('/api/stats').then(r=>r.json()),
fetch('/api/ki/latest').then(r=>r.json()),
fetch('/api/preise/heute').then(r=>r.json()),
fetch('/api/nodes').then(r=>r.json()),
fetch('/api/preise/vergleich').then(r=>r.json())
fetch('/api/preise/vergleich').then(r=>r.json()),
fetch('/api/schedule').then(r=>r.json())
]);
// Zeitplan-Anzeige
const flexHeute = sched.flex_heute ? ' 🔍 Flex-Lauf heute 23:30' : '';
document.getElementById('schedule-info').textContent =
`Scans: ${sched.taeglich.join(' · ')} Uhr${flexHeute} nächster Flex: ${sched.naechster_flex}`;
const minHeute = stats.min_heute;
document.getElementById('min-preis').textContent = minHeute ? Math.round(minHeute) : '';
document.getElementById('min-preis').style.color = (minHeute && minHeute < PLAUSI_GRENZE) ? '#fbbf24' : '#38bdf8';
@ -466,9 +474,28 @@ def api_logs():
return jsonify([dict(r) for r in rows])
@app.route("/api/schedule")
def api_schedule():
"""Zeigt nächste geplante Läufe."""
heute = datetime.now().strftime("%A") # Wochentag
return jsonify({
"taeglich": ["06:00", "11:00", "18:00", "23:00"],
"flex_lauf": "Di + Mi um 23:30 (±3 Tage Datumsfenster)",
"heute_ist": heute,
"flex_heute": datetime.now().weekday() in (1, 2),
"naechster_flex": "heute 23:30" if datetime.now().weekday() in (1, 2) else
("Di " if datetime.now().weekday() < 1 else
"nächsten Di") + " 23:30"
})
@app.route("/api/scrape/now", methods=["POST"])
def api_scrape_now():
threading.Thread(target=taeglich_scrapen, daemon=True).start()
threading.Thread(
target=scraping_lauf,
kwargs={"label": "Manuell"},
daemon=True
).start()
return jsonify({"message": "Scraping gestartet — läuft im Hintergrund"})
@ -640,13 +667,9 @@ if __name__ == "__main__":
init_db()
log("Flugpreisscanner Hub gestartet")
# Scheduler in eigenem Thread
# Scheduler läuft als eigener Container (scheduler.py)
def run_schedule():
import schedule as s
from scheduler import taeglich_scrapen
s.every().day.at("06:00").do(taeglich_scrapen)
while True:
s.run_pending()
pass # Scheduler-Container übernimmt die Zeitplanung
time.sleep(30)
threading.Thread(target=run_schedule, daemon=True).start()