feat: EPG-Scanner erweitert — 4 Wochen voraus + Seen-Cache fuer nur neue Filme

This commit is contained in:
root 2026-03-16 22:17:33 +07:00
parent 7ab3d5e368
commit 0dcb0fd35a

View file

@ -1,17 +1,18 @@
"""Save.TV Online-Videorecorder — EPG Scanner + Film-Tipps + Aufnahme-Steuerung. """Save.TV Online-Videorecorder — EPG Scanner + Film-Tipps + Aufnahme-Steuerung.
Architektur: Architektur:
- EPG-Daten kommen von Save.TV TvProgramm-Seiten (eingebettetes JSON) - EPG-Daten von Save.TV: TvProgrammFilm.cfm (3 Tage) + TvProgrammFilmHighlights.cfm (4 Wochen)
- Nur TVCATEGORYID 1 (Spielfilm) wird beachtet - Nur TVCATEGORYID 1 (Spielfilm), Spam-Genres rausgefiltert
- LLM bewertet Filme per Titel + Beschreibung + Genre - Seen-Cache: Nur neue Filme werden gemeldet (nicht erneut bei jedem Scan)
- Aufnahmen werden per tcJWriteRecord.cfm angelegt - Aufnahmen per tcJWriteRecord.cfm
""" """
import re import re
import json import json
import logging import logging
import requests import requests
from datetime import datetime from datetime import datetime, timedelta
from pathlib import Path
log = logging.getLogger("savetv") log = logging.getLogger("savetv")
@ -24,20 +25,26 @@ _session_ts = None
SESSION_MAX_AGE = 1800 SESSION_MAX_AGE = 1800
EPG_PAGES = [ EPG_PAGES = [
"/STV/M/obj/TVProgCtr/TvProgrammFilm.cfm",
"/STV/M/obj/TVProgCtr/TvProgrammFilmHighlights.cfm",
"/STV/M/obj/TVProgCtr/TvProgramm2015.cfm", "/STV/M/obj/TVProgCtr/TvProgramm2015.cfm",
"/STV/M/obj/TVProgCtr/TvProgramm2215.cfm", "/STV/M/obj/TVProgCtr/TvProgramm2215.cfm",
] ]
SEEN_CACHE = Path("/tmp/savetv_seen_ids.json")
SEEN_MAX_AGE_DAYS = 30
SPAM_SUBCATEGORIES = { SPAM_SUBCATEGORIES = {
"teleshop", "shopping", "dauerwerbesendung", "volksmusik", "teleshop", "shopping", "dauerwerbesendung", "volksmusik",
"casting", "reality", "quiz/spiel", "comic", "zeichentrick", "casting", "reality", "quiz/spiel", "comic", "zeichentrick",
"erotik", "kindersendung", "erotik", "kindersendung", "sonstige",
} }
GOOD_SUBCATEGORIES = { GOOD_SUBCATEGORIES = {
"action", "thriller", "krimi", "drama", "komödie", "komodie", "action", "thriller", "krimi", "drama", "komödie", "komodie",
"science fiction", "sci-fi", "fantasy", "abenteuer", "horror", "comedy", "science fiction", "sci-fi", "fantasy", "abenteuer",
"western", "historienfilm", "animation", "mystery", "horror", "western", "historienfilm", "animation", "mystery",
"romanze",
} }
TOOLS = [ TOOLS = [
@ -53,8 +60,8 @@ TOOLS = [
"type": "function", "type": "function",
"function": { "function": {
"name": "get_savetv_tipps", "name": "get_savetv_tipps",
"description": "TV-Filmtipps: Sehenswerte Spielfilme aus dem heutigen TV-Programm. " "description": "TV-Filmtipps: Sehenswerte Spielfilme der naechsten Tage/Wochen. "
"Nutze bei 'was laeuft heute', 'gute Filme', 'TV Tipps', 'Fernsehen', 'Save.TV'.", "Nutze bei 'was laeuft', 'gute Filme', 'TV Tipps', 'Fernsehen', 'Save.TV'.",
"parameters": {"type": "object", "properties": {}, "required": []}, "parameters": {"type": "object", "properties": {}, "required": []},
}, },
}, },
@ -76,7 +83,7 @@ TOOLS = [
] ]
SYSTEM_PROMPT_EXTRA = """TV / Save.TV Tools: SYSTEM_PROMPT_EXTRA = """TV / Save.TV Tools:
- get_savetv_tipps: Zeigt sehenswerte Spielfilme aus dem heutigen TV-Programm - get_savetv_tipps: Zeigt sehenswerte Spielfilme der naechsten Tage/Wochen
- savetv_record: Nimmt einen Film per TelecastId auf - savetv_record: Nimmt einen Film per TelecastId auf
- get_savetv_status: Zeigt Archiv und geplante Aufnahmen - get_savetv_status: Zeigt Archiv und geplante Aufnahmen
Wenn der User einen Film aufnehmen will, nutze savetv_record mit der TelecastId. Wenn der User einen Film aufnehmen will, nutze savetv_record mit der TelecastId.
@ -96,7 +103,7 @@ def _init_creds():
pass pass
def _get_session() -> requests.Session | None: def _get_session():
"""Login und Session cachen.""" """Login und Session cachen."""
global _session, _session_ts global _session, _session_ts
_init_creds() _init_creds()
@ -109,15 +116,15 @@ def _get_session() -> requests.Session | None:
s.headers.update({"User-Agent": "Mozilla/5.0 Hausmeister-Bot/1.0"}) s.headers.update({"User-Agent": "Mozilla/5.0 Hausmeister-Bot/1.0"})
try: try:
r = s.post( s.post(
f"{SAVETV_URL}/STV/M/Index.cfm?sk=PREMIUM", SAVETV_URL + "/STV/M/Index.cfm?sk=PREMIUM",
data={"sUsername": SAVETV_USER, "sPassword": SAVETV_PASS, "value": "Login"}, data={"sUsername": SAVETV_USER, "sPassword": SAVETV_PASS, "value": "Login"},
allow_redirects=True, allow_redirects=True,
timeout=15, timeout=15,
) )
cookies = s.cookies.get_dict() cookies = s.cookies.get_dict()
if not cookies.get("savetv_active_login"): if not cookies.get("savetv_active_login"):
log.warning("Save.TV Login fehlgeschlagen (kein savetv_active_login Cookie)") log.warning("Save.TV Login fehlgeschlagen")
return None return None
except Exception as e: except Exception as e:
log.error("Save.TV Login Error: %s", e) log.error("Save.TV Login Error: %s", e)
@ -129,14 +136,33 @@ def _get_session() -> requests.Session | None:
return s return s
def _get_archive(state: int = 0, count: int = 20) -> dict: def _load_seen():
"""Lade gesehene TelecastIDs. Format: {id_str: 'YYYY-MM-DD'}."""
if not SEEN_CACHE.exists():
return {}
try:
data = json.loads(SEEN_CACHE.read_text())
cutoff = (datetime.now() - timedelta(days=SEEN_MAX_AGE_DAYS)).strftime("%Y-%m-%d")
return {k: v for k, v in data.items() if v >= cutoff}
except Exception:
return {}
def _save_seen(seen):
try:
SEEN_CACHE.write_text(json.dumps(seen))
except Exception as e:
log.error("Seen-Cache schreiben: %s", e)
def _get_archive(state=0, count=20):
"""Archiv abrufen. state: 0=geplant, 1=fertig.""" """Archiv abrufen. state: 0=geplant, 1=fertig."""
s = _get_session() s = _get_session()
if not s: if not s:
return {"error": "Login fehlgeschlagen"} return {"error": "Login fehlgeschlagen"}
try: try:
r = s.get( r = s.get(
f"{SAVETV_URL}/STV/M/obj/archive/JSON/VideoArchiveApi.cfm", SAVETV_URL + "/STV/M/obj/archive/JSON/VideoArchiveApi.cfm",
params={ params={
"bAggregateEntries": "false", "bAggregateEntries": "false",
"iEntriesPerPage": str(count), "iEntriesPerPage": str(count),
@ -150,8 +176,14 @@ def _get_archive(state: int = 0, count: int = 20) -> dict:
return {"error": str(e)} return {"error": str(e)}
def _scrape_epg() -> list[dict]: def _scrape_epg():
"""Holt Filme aus den Save.TV Programmseiten (JSON im HTML).""" """Holt Filme aus Save.TV Programmseiten (JSON im HTML).
Quellen:
- TvProgrammFilm.cfm: Alle Filme der naechsten 3 Tage (~35)
- TvProgrammFilmHighlights.cfm: Kuratierte Highlights 4 Wochen (~22)
- TvProgramm2015/2215.cfm: Primetime alle Genres (Filme rausfiltern)
"""
s = _get_session() s = _get_session()
if not s: if not s:
return [] return []
@ -161,7 +193,7 @@ def _scrape_epg() -> list[dict]:
for page_path in EPG_PAGES: for page_path in EPG_PAGES:
try: try:
r = s.get(f"{SAVETV_URL}{page_path}", timeout=15) r = s.get(SAVETV_URL + page_path, timeout=15)
m = re.search( m = re.search(
r'model\s*=\s*(\{"TvCategoryId".*?"SortedTelecasts":\[.*?\]\})', r'model\s*=\s*(\{"TvCategoryId".*?"SortedTelecasts":\[.*?\]\})',
r.text, r.text,
@ -177,17 +209,24 @@ def _scrape_epg() -> list[dict]:
if tid and tid not in seen_ids: if tid and tid not in seen_ids:
seen_ids.add(tid) seen_ids.add(tid)
all_telecasts.append(tc) all_telecasts.append(tc)
count = len(data.get("SortedTelecasts", []))
log.debug("EPG %s: %d Sendungen", page_path.split("/")[-1], count)
except Exception as e: except Exception as e:
log.error("EPG Scrape %s: %s", page_path, e) log.error("EPG Scrape %s: %s", page_path, e)
log.info("EPG: %d Sendungen gesamt", len(all_telecasts)) log.info("EPG gesamt: %d Sendungen aus %d Quellen", len(all_telecasts), len(EPG_PAGES))
return all_telecasts return all_telecasts
def _filter_films(telecasts: list[dict]) -> list[dict]: def _filter_films(telecasts, only_new=False):
"""Filtert auf Spielfilme und bewertet sie.""" """Filtert auf sehenswerte Spielfilme.
only_new=True: Nur Filme die noch nicht im Seen-Cache sind (fuer Cronjob).
"""
films = [] films = []
now = datetime.now() now = datetime.now()
seen = _load_seen() if only_new else {}
for tc in telecasts: for tc in telecasts:
cat_id = tc.get("TVCATEGORYID", 0) cat_id = tc.get("TVCATEGORYID", 0)
@ -211,6 +250,10 @@ def _filter_films(telecasts: list[dict]) -> list[dict]:
if start_dt < now: if start_dt < now:
continue continue
tid = str(int(tc.get("ITELECASTID", 0)))
if only_new and tid in seen:
continue
score = 50 score = 50
if subcat in GOOD_SUBCATEGORIES: if subcat in GOOD_SUBCATEGORIES:
score += 20 score += 20
@ -220,6 +263,8 @@ def _filter_films(telecasts: list[dict]) -> list[dict]:
score += 15 score += 15
elif 14 <= hour <= 19: elif 14 <= hour <= 19:
score += 5 score += 5
elif hour < 6:
score -= 10
desc = tc.get("STHEMA") or tc.get("SFULLSUBTITLE") or "" desc = tc.get("STHEMA") or tc.get("SFULLSUBTITLE") or ""
if len(desc) > 50: if len(desc) > 50:
@ -229,6 +274,10 @@ def _filter_films(telecasts: list[dict]) -> list[dict]:
if already_recorded: if already_recorded:
score -= 30 score -= 30
is_highlight = tc.get("BISTIPOFDAY", False)
if is_highlight:
score += 10
tc["_score"] = score tc["_score"] = score
tc["_start_dt"] = start_dt tc["_start_dt"] = start_dt
films.append(tc) films.append(tc)
@ -237,14 +286,25 @@ def _filter_films(telecasts: list[dict]) -> list[dict]:
return films return films
def _record_telecast(telecast_id: int) -> str: def _mark_seen(films):
"""Markiere Filme als gesehen im Cache."""
seen = _load_seen()
today = datetime.now().strftime("%Y-%m-%d")
for f in films:
tid = str(int(f.get("ITELECASTID", 0)))
if tid != "0":
seen[tid] = today
_save_seen(seen)
def _record_telecast(telecast_id):
"""Aufnahme anlegen.""" """Aufnahme anlegen."""
s = _get_session() s = _get_session()
if not s: if not s:
return "Login fehlgeschlagen" return "Login fehlgeschlagen"
try: try:
r = s.post( r = s.post(
f"{SAVETV_URL}/STV/M/obj/TC/tcJWriteRecord.cfm", SAVETV_URL + "/STV/M/obj/TC/tcJWriteRecord.cfm",
data={"TelecastId": telecast_id, "iRecordingBuffer": 0}, data={"TelecastId": telecast_id, "iRecordingBuffer": 0},
headers={"X-Requested-With": "XMLHttpRequest"}, headers={"X-Requested-With": "XMLHttpRequest"},
timeout=15, timeout=15,
@ -252,7 +312,39 @@ def _record_telecast(telecast_id: int) -> str:
data = r.json() data = r.json()
return data.get("SMESSAGE", "Unbekannte Antwort") return data.get("SMESSAGE", "Unbekannte Antwort")
except Exception as e: except Exception as e:
return f"Fehler: {e}" return "Fehler: " + str(e)
def _format_film(f, with_tid=True):
"""Formatiert einen Film als Text."""
title = f.get("STITLE", "?")
station = f.get("STVSTATIONNAME", "?")
start = f.get("DSTARTDATE", "?")[:16]
subcat = f.get("SSUBCATEGORYNAME", "")
desc = (f.get("STHEMA") or f.get("SFULLSUBTITLE") or "")[:120]
tid = int(f.get("ITELECASTID", 0))
recorded = " [geplant]" if f.get("BEXISTRECORD") else ""
days_until = ""
try:
start_dt = datetime.strptime(f.get("DSTARTDATE", ""), "%Y-%m-%d %H:%M:%S")
delta = (start_dt.date() - datetime.now().date()).days
if delta == 0:
days_until = " (heute)"
elif delta == 1:
days_until = " (morgen)"
else:
days_until = " (in " + str(delta) + " Tagen)"
except (ValueError, TypeError):
pass
lines = [" " + title + recorded + days_until]
lines.append(" " + station + " | " + start + " | " + subcat)
if desc:
lines.append(" " + desc + "...")
if with_tid:
lines.append(" TelecastId " + str(tid))
return "\n".join(lines)
def handle_get_savetv_status(**kw): def handle_get_savetv_status(**kw):
@ -260,34 +352,34 @@ def handle_get_savetv_status(**kw):
planned = _get_archive(state=0, count=10) planned = _get_archive(state=0, count=10)
if "error" in archive: if "error" in archive:
return f"Save.TV Fehler: {archive['error']}" return "Save.TV Fehler: " + archive["error"]
lines = ["📺 Save.TV Status\n"] lines = ["Save.TV Status\n"]
total = int(archive.get("ITOTALENTRIESINARCHIVE", 0)) total = int(archive.get("ITOTALENTRIESINARCHIVE", 0))
lines.append(f"Archiv: {total} Aufnahmen gesamt") lines.append("Archiv: " + str(total) + " Aufnahmen gesamt")
fertig = archive.get("ARRVIDEOARCHIVEENTRIES", []) fertig = archive.get("ARRVIDEOARCHIVEENTRIES", [])
if fertig: if fertig:
lines.append("\n🎬 Letzte fertige Aufnahmen:") lines.append("\nLetzte fertige Aufnahmen:")
for e in fertig[:5]: for e in fertig[:5]:
tc = e.get("STRTELECASTENTRY", {}) tc = e.get("STRTELECASTENTRY", {})
lines.append( lines.append(
f"{tc.get('STITLE', '?')[:40]} | " " " + tc.get("STITLE", "?")[:40] + " | "
f"{tc.get('DSTARTDATE', '?')[:10]} | " + tc.get("DSTARTDATE", "?")[:10] + " | "
f"{tc.get('STVSTATIONNAME', '?')}" + tc.get("STVSTATIONNAME", "?")
) )
geplant = planned.get("ARRVIDEOARCHIVEENTRIES", []) geplant = planned.get("ARRVIDEOARCHIVEENTRIES", [])
plan_total = int(planned.get("ITOTALENTRIES", 0)) plan_total = int(planned.get("ITOTALENTRIES", 0))
if geplant: if geplant:
lines.append(f"\n⏰ Geplante Aufnahmen ({plan_total}):") lines.append("\nGeplante Aufnahmen (" + str(plan_total) + "):")
for e in geplant[:10]: for e in geplant[:10]:
tc = e.get("STRTELECASTENTRY", {}) tc = e.get("STRTELECASTENTRY", {})
lines.append( lines.append(
f"{tc.get('STITLE', '?')[:40]} | " " " + tc.get("STITLE", "?")[:40] + " | "
f"{tc.get('DSTARTDATE', '?')[:16]} | " + tc.get("DSTARTDATE", "?")[:16] + " | "
f"{tc.get('STVSTATIONNAME', '?')}" + tc.get("STVSTATIONNAME", "?")
) )
return "\n".join(lines) return "\n".join(lines)
@ -298,48 +390,46 @@ def handle_get_savetv_tipps(**kw):
if not telecasts: if not telecasts:
return "Konnte keine Programmdaten von Save.TV laden." return "Konnte keine Programmdaten von Save.TV laden."
films = _filter_films(telecasts) films = _filter_films(telecasts, only_new=False)
if not films: if not films:
return "Keine sehenswerten Spielfilme im heutigen Programm gefunden." return "Keine sehenswerten Spielfilme in den naechsten Tagen gefunden."
lines = ["🎬 TV-Filmtipps heute\n"] lines = ["TV-Filmtipps\n"]
for f in films[:8]: for f in films[:10]:
subcat = f.get("SSUBCATEGORYNAME", "") lines.append(_format_film(f))
station = f.get("STVSTATIONNAME", "?")
start = f.get("DSTARTDATE", "?")[:16]
title = f.get("STITLE", "?")
subtitle = f.get("SFULLSUBTITLE") or f.get("SSUBTITLE") or ""
desc = f.get("STHEMA") or ""
tid = int(f.get("ITELECASTID", 0))
recorded = "" if f.get("BEXISTRECORD") else ""
lines.append(f"🎬 {title} {recorded}")
if subtitle and subtitle != title:
lines.append(f" {subtitle[:60]}")
lines.append(f" 📺 {station} | ⏰ {start} | 🎭 {subcat}")
if desc and len(desc) > 10:
lines.append(f" {desc[:120]}...")
lines.append(f" → Aufnahme: TelecastId {tid}")
lines.append("") lines.append("")
lines.append("💡 Sage 'Nimm [Filmname] auf' oder 'Aufnahme TelecastId XXXXX'") lines.append("Sage 'Nimm [Filmname] auf' oder nenne die TelecastId")
return "\n".join(lines) return "\n".join(lines)
def get_new_films():
"""Fuer den Cronjob: Nur NEUE Filme seit dem letzten Scan."""
telecasts = _scrape_epg()
if not telecasts:
return []
films = _filter_films(telecasts, only_new=True)
good_films = [f for f in films if f["_score"] >= 60]
_mark_seen(films)
return good_films
def handle_savetv_record(telecast_id=0, **kw): def handle_savetv_record(telecast_id=0, **kw):
if not telecast_id: if not telecast_id:
return "Keine TelecastId angegeben." return "Keine TelecastId angegeben."
tid = int(telecast_id) tid = int(telecast_id)
telecasts = _scrape_epg() telecasts = _scrape_epg()
title = f"ID {tid}" title = "ID " + str(tid)
for tc in telecasts: for tc in telecasts:
if int(tc.get("ITELECASTID", 0)) == tid: if int(tc.get("ITELECASTID", 0)) == tid:
title = tc.get("STITLE", title) title = tc.get("STITLE", title)
break break
result = _record_telecast(tid) result = _record_telecast(tid)
return f"📺 Save.TV: {result}\n🎬 Sendung: {title}" return "Save.TV: " + result + "\nSendung: " + title
HANDLERS = { HANDLERS = {