homelab-brain/homelab-ai-bot/tools/savetv.py

459 lines
13 KiB
Python

"""Save.TV Online-Videorecorder — EPG Scanner + Film-Tipps + Aufnahme-Steuerung.
Architektur:
- EPG-Daten von Save.TV: TvProgrammFilm.cfm (3 Tage) + TvProgrammFilmHighlights.cfm (4 Wochen)
- Nur TVCATEGORYID 1 (Spielfilm), Spam-Genres rausgefiltert
- Seen-Cache: Nur neue Filme werden gemeldet (nicht erneut bei jedem Scan)
- Aufnahmen per tcJWriteRecord.cfm
"""
import re
import json
import logging
import requests
from datetime import datetime, timedelta
from pathlib import Path
log = logging.getLogger("savetv")
SAVETV_URL = "https://www.save.tv"
SAVETV_USER = ""
SAVETV_PASS = ""
_session = None
_session_ts = None
SESSION_MAX_AGE = 1800
EPG_PAGES = [
"/STV/M/obj/TVProgCtr/TvProgrammFilm.cfm",
"/STV/M/obj/TVProgCtr/TvProgrammFilmHighlights.cfm",
"/STV/M/obj/TVProgCtr/TvProgramm2015.cfm",
"/STV/M/obj/TVProgCtr/TvProgramm2215.cfm",
]
SEEN_CACHE = Path("/tmp/savetv_seen_ids.json")
SEEN_MAX_AGE_DAYS = 30
AUTO_RECORD_SCORE = 80
SUGGEST_SCORE = 60
SPAM_SUBCATEGORIES = {
"teleshop", "shopping", "dauerwerbesendung", "volksmusik",
"casting", "reality", "quiz/spiel", "comic", "zeichentrick",
"erotik", "kindersendung", "sonstige",
}
GOOD_SUBCATEGORIES = {
"action", "thriller", "krimi", "drama", "komödie", "komodie",
"comedy", "science fiction", "sci-fi", "fantasy", "abenteuer",
"horror", "western", "historienfilm", "animation", "mystery",
"romanze",
}
TOOLS = [
{
"type": "function",
"function": {
"name": "get_savetv_status",
"description": "Save.TV Status: Aufnahmen im Archiv, geplante Aufnahmen anzeigen.",
"parameters": {"type": "object", "properties": {}, "required": []},
},
},
{
"type": "function",
"function": {
"name": "get_savetv_tipps",
"description": "TV-Filmtipps: Sehenswerte Spielfilme der naechsten Tage/Wochen. "
"Nutze bei 'was laeuft', 'gute Filme', 'TV Tipps', 'Fernsehen', 'Save.TV'.",
"parameters": {"type": "object", "properties": {}, "required": []},
},
},
{
"type": "function",
"function": {
"name": "savetv_record",
"description": "Save.TV Aufnahme anlegen fuer eine bestimmte TelecastId. "
"Nutze wenn User sagt 'nimm auf', 'aufnehmen', 'record'.",
"parameters": {
"type": "object",
"properties": {
"telecast_id": {"type": "number", "description": "TelecastId der Sendung"}
},
"required": ["telecast_id"],
},
},
},
]
SYSTEM_PROMPT_EXTRA = """TV / Save.TV Tools:
- get_savetv_tipps: Zeigt sehenswerte Spielfilme der naechsten Tage/Wochen
- savetv_record: Nimmt einen Film per TelecastId auf
- get_savetv_status: Zeigt Archiv und geplante Aufnahmen
Wenn der User einen Film aufnehmen will, nutze savetv_record mit der TelecastId.
"""
def _init_creds():
global SAVETV_USER, SAVETV_PASS
if SAVETV_USER:
return
try:
from core import config
cfg = config.parse_config()
SAVETV_USER = cfg.raw.get("SAVETV_USER", "")
SAVETV_PASS = cfg.raw.get("SAVETV_PASS", "")
except Exception:
pass
def _get_session():
"""Login und Session cachen."""
global _session, _session_ts
_init_creds()
now = datetime.now()
if _session and _session_ts and (now - _session_ts).seconds < SESSION_MAX_AGE:
return _session
s = requests.Session()
s.headers.update({"User-Agent": "Mozilla/5.0 Hausmeister-Bot/1.0"})
try:
s.post(
SAVETV_URL + "/STV/M/Index.cfm?sk=PREMIUM",
data={"sUsername": SAVETV_USER, "sPassword": SAVETV_PASS, "value": "Login"},
allow_redirects=True,
timeout=15,
)
cookies = s.cookies.get_dict()
if not cookies.get("savetv_active_login"):
log.warning("Save.TV Login fehlgeschlagen")
return None
except Exception as e:
log.error("Save.TV Login Error: %s", e)
return None
_session = s
_session_ts = now
log.info("Save.TV Login erfolgreich")
return s
def _load_seen():
"""Lade gesehene TelecastIDs. Format: {id_str: 'YYYY-MM-DD'}."""
if not SEEN_CACHE.exists():
return {}
try:
data = json.loads(SEEN_CACHE.read_text())
cutoff = (datetime.now() - timedelta(days=SEEN_MAX_AGE_DAYS)).strftime("%Y-%m-%d")
return {k: v for k, v in data.items() if v >= cutoff}
except Exception:
return {}
def _save_seen(seen):
try:
SEEN_CACHE.write_text(json.dumps(seen))
except Exception as e:
log.error("Seen-Cache schreiben: %s", e)
def _get_archive(state=0, count=20):
"""Archiv abrufen. state: 0=geplant, 1=fertig."""
s = _get_session()
if not s:
return {"error": "Login fehlgeschlagen"}
try:
r = s.get(
SAVETV_URL + "/STV/M/obj/archive/JSON/VideoArchiveApi.cfm",
params={
"bAggregateEntries": "false",
"iEntriesPerPage": str(count),
"iRecordingState": str(state),
},
headers={"X-Requested-With": "XMLHttpRequest"},
timeout=15,
)
return r.json()
except Exception as e:
return {"error": str(e)}
def _scrape_epg():
"""Holt Filme aus Save.TV Programmseiten (JSON im HTML).
Quellen:
- TvProgrammFilm.cfm: Alle Filme der naechsten 3 Tage (~35)
- TvProgrammFilmHighlights.cfm: Kuratierte Highlights 4 Wochen (~22)
- TvProgramm2015/2215.cfm: Primetime alle Genres (Filme rausfiltern)
"""
s = _get_session()
if not s:
return []
all_telecasts = []
seen_ids = set()
for page_path in EPG_PAGES:
try:
r = s.get(SAVETV_URL + page_path, timeout=15)
m = re.search(
r'model\s*=\s*(\{"TvCategoryId".*?"SortedTelecasts":\[.*?\]\})',
r.text,
re.DOTALL,
)
if not m:
log.warning("Kein model-JSON in %s", page_path)
continue
data = json.loads(m.group(1))
for tc in data.get("SortedTelecasts", []):
tid = int(tc.get("ITELECASTID", 0))
if tid and tid not in seen_ids:
seen_ids.add(tid)
all_telecasts.append(tc)
count = len(data.get("SortedTelecasts", []))
log.debug("EPG %s: %d Sendungen", page_path.split("/")[-1], count)
except Exception as e:
log.error("EPG Scrape %s: %s", page_path, e)
log.info("EPG gesamt: %d Sendungen aus %d Quellen", len(all_telecasts), len(EPG_PAGES))
return all_telecasts
def _filter_films(telecasts, only_new=False):
"""Filtert auf sehenswerte Spielfilme.
only_new=True: Nur Filme die noch nicht im Seen-Cache sind (fuer Cronjob).
"""
films = []
now = datetime.now()
seen = _load_seen() if only_new else {}
for tc in telecasts:
cat_id = tc.get("TVCATEGORYID", 0)
if cat_id != 1.0:
continue
title = tc.get("STITLE", "")
if not title or len(title) < 2:
continue
subcat = (tc.get("SSUBCATEGORYNAME") or "").lower()
if subcat in SPAM_SUBCATEGORIES:
continue
start_str = tc.get("DSTARTDATE", "")
try:
start_dt = datetime.strptime(start_str, "%Y-%m-%d %H:%M:%S")
except (ValueError, TypeError):
continue
if start_dt < now:
continue
tid = str(int(tc.get("ITELECASTID", 0)))
if only_new and tid in seen:
continue
score = 50
if subcat in GOOD_SUBCATEGORIES:
score += 20
desc = tc.get("STHEMA") or tc.get("SFULLSUBTITLE") or ""
if len(desc) > 50:
score += 10
already_recorded = tc.get("BEXISTRECORD", False)
if already_recorded:
score -= 30
is_highlight = tc.get("BISTIPOFDAY", False)
if is_highlight:
score += 10
tc["_score"] = score
tc["_start_dt"] = start_dt
films.append(tc)
films.sort(key=lambda x: (-x["_score"], x["_start_dt"]))
return films
def _mark_seen(films):
"""Markiere Filme als gesehen im Cache."""
seen = _load_seen()
today = datetime.now().strftime("%Y-%m-%d")
for f in films:
tid = str(int(f.get("ITELECASTID", 0)))
if tid != "0":
seen[tid] = today
_save_seen(seen)
def _record_telecast(telecast_id):
"""Aufnahme anlegen."""
s = _get_session()
if not s:
return "Login fehlgeschlagen"
try:
r = s.post(
SAVETV_URL + "/STV/M/obj/TC/tcJWriteRecord.cfm",
data={"TelecastId": telecast_id, "iRecordingBuffer": 0},
headers={"X-Requested-With": "XMLHttpRequest"},
timeout=15,
)
data = r.json()
return data.get("SMESSAGE", "Unbekannte Antwort")
except Exception as e:
return "Fehler: " + str(e)
def _format_film(f, with_tid=True):
"""Formatiert einen Film als Text."""
title = f.get("STITLE", "?")
station = f.get("STVSTATIONNAME", "?")
start = f.get("DSTARTDATE", "?")[:16]
subcat = f.get("SSUBCATEGORYNAME", "")
desc = (f.get("STHEMA") or f.get("SFULLSUBTITLE") or "")[:120]
tid = int(f.get("ITELECASTID", 0))
recorded = " [geplant]" if f.get("BEXISTRECORD") else ""
days_until = ""
try:
start_dt = datetime.strptime(f.get("DSTARTDATE", ""), "%Y-%m-%d %H:%M:%S")
delta = (start_dt.date() - datetime.now().date()).days
if delta == 0:
days_until = " (heute)"
elif delta == 1:
days_until = " (morgen)"
else:
days_until = " (in " + str(delta) + " Tagen)"
except (ValueError, TypeError):
pass
lines = [" " + title + recorded + days_until]
lines.append(" " + station + " | " + start + " | " + subcat)
if desc:
lines.append(" " + desc + "...")
if with_tid:
lines.append(" TelecastId " + str(tid))
return "\n".join(lines)
def handle_get_savetv_status(**kw):
archive = _get_archive(state=1, count=5)
planned = _get_archive(state=0, count=10)
if "error" in archive:
return "Save.TV Fehler: " + archive["error"]
lines = ["Save.TV Status\n"]
total = int(archive.get("ITOTALENTRIESINARCHIVE", 0))
lines.append("Archiv: " + str(total) + " Aufnahmen gesamt")
fertig = archive.get("ARRVIDEOARCHIVEENTRIES", [])
if fertig:
lines.append("\nLetzte fertige Aufnahmen:")
for e in fertig[:5]:
tc = e.get("STRTELECASTENTRY", {})
lines.append(
" " + tc.get("STITLE", "?")[:40] + " | "
+ tc.get("DSTARTDATE", "?")[:10] + " | "
+ tc.get("STVSTATIONNAME", "?")
)
geplant = planned.get("ARRVIDEOARCHIVEENTRIES", [])
plan_total = int(planned.get("ITOTALENTRIES", 0))
if geplant:
lines.append("\nGeplante Aufnahmen (" + str(plan_total) + "):")
for e in geplant[:10]:
tc = e.get("STRTELECASTENTRY", {})
lines.append(
" " + tc.get("STITLE", "?")[:40] + " | "
+ tc.get("DSTARTDATE", "?")[:16] + " | "
+ tc.get("STVSTATIONNAME", "?")
)
return "\n".join(lines)
def handle_get_savetv_tipps(**kw):
telecasts = _scrape_epg()
if not telecasts:
return "Konnte keine Programmdaten von Save.TV laden."
films = _filter_films(telecasts, only_new=False)
if not films:
return "Keine sehenswerten Spielfilme in den naechsten Tagen gefunden."
lines = ["TV-Filmtipps\n"]
for f in films[:10]:
lines.append(_format_film(f))
lines.append("")
lines.append("Sage 'Nimm [Filmname] auf' oder nenne die TelecastId")
return "\n".join(lines)
def get_new_films():
"""Fuer den Cronjob: Neue Filme scannen, Top-Filme automatisch aufnehmen.
Returns: (auto_recorded, suggestions)
auto_recorded: Filme die automatisch aufgenommen wurden (Score >= 85)
suggestions: Filme die dem User vorgeschlagen werden (Score 60-84)
"""
telecasts = _scrape_epg()
if not telecasts:
return [], []
films = _filter_films(telecasts, only_new=True)
_mark_seen(films)
auto_recorded = []
suggestions = []
for f in films:
score = f["_score"]
if score < SUGGEST_SCORE:
continue
if f.get("BEXISTRECORD"):
continue
if score >= AUTO_RECORD_SCORE:
tid = int(f.get("ITELECASTID", 0))
result = _record_telecast(tid)
f["_record_result"] = result
auto_recorded.append(f)
log.info("Auto-Aufnahme: %s (Score %d) -> %s",
f.get("STITLE"), score, result)
else:
suggestions.append(f)
return auto_recorded, suggestions
def handle_savetv_record(telecast_id=0, **kw):
if not telecast_id:
return "Keine TelecastId angegeben."
tid = int(telecast_id)
telecasts = _scrape_epg()
title = "ID " + str(tid)
for tc in telecasts:
if int(tc.get("ITELECASTID", 0)) == tid:
title = tc.get("STITLE", title)
break
result = _record_telecast(tid)
return "Save.TV: " + result + "\nSendung: " + title
HANDLERS = {
"get_savetv_status": handle_get_savetv_status,
"get_savetv_tipps": handle_get_savetv_tipps,
"savetv_record": handle_savetv_record,
}