homelab-brain/homelab-ai-bot/tools/savetv.py

677 lines
21 KiB
Python

"""Save.TV Online-Videorecorder — EPG Scanner + Film-Tipps + Aufnahme-Steuerung.
Architektur:
- EPG-Daten von Save.TV: TvProgrammFilm.cfm (3 Tage) + TvProgrammFilmHighlights.cfm (4 Wochen)
- Nur TVCATEGORYID 1 (Spielfilm), Spam-Genres rausgefiltert
- Seen-Cache: Nur neue Filme werden gemeldet (nicht erneut bei jedem Scan)
- Aufnahmen per tcJWriteRecord.cfm
"""
import re
import json
import logging
import requests
from datetime import datetime, timedelta
from pathlib import Path
log = logging.getLogger("savetv")
SAVETV_URL = "https://www.save.tv"
SAVETV_USER = ""
SAVETV_PASS = ""
_session = None
_session_ts = None
SESSION_MAX_AGE = 1800
EPG_PAGES = [
"/STV/M/obj/TVProgCtr/TvProgrammFilm.cfm",
"/STV/M/obj/TVProgCtr/TvProgrammFilmHighlights.cfm",
"/STV/M/obj/TVProgCtr/TvProgramm2015.cfm",
"/STV/M/obj/TVProgCtr/TvProgramm2215.cfm",
]
SEEN_CACHE = Path("/tmp/savetv_seen_ids.json")
SEEN_MAX_AGE_DAYS = 30
AUTO_RECORD_SCORE = 80
SUGGEST_SCORE = 60
SPAM_SUBCATEGORIES = {
"teleshop", "shopping", "dauerwerbesendung", "volksmusik",
"casting", "reality", "quiz/spiel", "comic", "zeichentrick",
"erotik", "kindersendung", "sonstige",
}
GOOD_SUBCATEGORIES = {
"action", "thriller", "krimi", "drama", "komödie", "komodie",
"comedy", "science fiction", "sci-fi", "fantasy", "abenteuer",
"horror", "western", "historienfilm", "animation", "mystery",
"romanze",
}
TOOLS = [
{
"type": "function",
"function": {
"name": "get_savetv_status",
"description": "Save.TV Status: Aufnahmen im Archiv, geplante Aufnahmen anzeigen.",
"parameters": {"type": "object", "properties": {}, "required": []},
},
},
{
"type": "function",
"function": {
"name": "get_savetv_tipps",
"description": "TV-Filmtipps: Sehenswerte Spielfilme der naechsten Tage/Wochen. "
"Nutze bei 'was laeuft', 'gute Filme', 'TV Tipps', 'Fernsehen', 'Save.TV'.",
"parameters": {"type": "object", "properties": {}, "required": []},
},
},
{
"type": "function",
"function": {
"name": "get_savetv_archive_filme",
"description": "Save.TV Archiv-Filme bewerten: Alle fertigen Aufnahmen holen, "
"nach Qualitaet bewerten, deduplizieren. Zeigt Top-Filme, dringende "
"(bald ablaufend) und weitere. Nutze bei 'gute Filme im Archiv', "
"'welche Filme habe ich', 'was ist sehenswert', 'Archiv bewerten'.",
"parameters": {"type": "object", "properties": {}, "required": []},
},
},
{
"type": "function",
"function": {
"name": "savetv_record",
"description": "Save.TV Aufnahme anlegen fuer eine bestimmte TelecastId. "
"Nutze wenn User sagt 'nimm auf', 'aufnehmen', 'record'.",
"parameters": {
"type": "object",
"properties": {
"telecast_id": {"type": "number", "description": "TelecastId der Sendung"}
},
"required": ["telecast_id"],
},
},
},
]
SYSTEM_PROMPT_EXTRA = """TV / Save.TV Tools:
- get_savetv_tipps: Zeigt sehenswerte Spielfilme der naechsten Tage/Wochen
- get_savetv_archive_filme: Bewertet alle fertigen Aufnahmen im Archiv nach Qualitaet
- savetv_record: Nimmt einen Film per TelecastId auf
- get_savetv_status: Zeigt Archiv und geplante Aufnahmen
Wenn der User nach Archiv-Filmen/Bewertung fragt, nutze get_savetv_archive_filme.
WICHTIG bei Archiv-Bewertung: Das Tool liefert KINO-HIGHLIGHTS (echte Kinofilme, Klassiker,
preisgekroente Filme) getrennt von deutschem Fernsehprogramm. Praesentiere dem User die
KINO-HIGHLIGHTS zuerst und erklaere kurz warum jeder Film sehenswert ist (Regisseur, Preise,
Stars). Hebe DRINGEND ablaufende Kino-Highlights besonders hervor — die muss er schnell sichern.
"""
def _init_creds():
global SAVETV_USER, SAVETV_PASS
if SAVETV_USER:
return
try:
from core import config
cfg = config.parse_config()
SAVETV_USER = cfg.raw.get("SAVETV_USER", "")
SAVETV_PASS = cfg.raw.get("SAVETV_PASS", "")
except Exception:
pass
def _get_session():
"""Login und Session cachen."""
global _session, _session_ts
_init_creds()
now = datetime.now()
if _session and _session_ts and (now - _session_ts).seconds < SESSION_MAX_AGE:
return _session
s = requests.Session()
s.headers.update({"User-Agent": "Mozilla/5.0 Hausmeister-Bot/1.0"})
try:
s.post(
SAVETV_URL + "/STV/M/Index.cfm?sk=PREMIUM",
data={"sUsername": SAVETV_USER, "sPassword": SAVETV_PASS, "value": "Login"},
allow_redirects=True,
timeout=15,
)
cookies = s.cookies.get_dict()
if not cookies.get("savetv_active_login"):
log.warning("Save.TV Login fehlgeschlagen")
return None
except Exception as e:
log.error("Save.TV Login Error: %s", e)
return None
_session = s
_session_ts = now
log.info("Save.TV Login erfolgreich")
return s
def _load_seen():
"""Lade gesehene TelecastIDs. Format: {id_str: 'YYYY-MM-DD'}."""
if not SEEN_CACHE.exists():
return {}
try:
data = json.loads(SEEN_CACHE.read_text())
cutoff = (datetime.now() - timedelta(days=SEEN_MAX_AGE_DAYS)).strftime("%Y-%m-%d")
return {k: v for k, v in data.items() if v >= cutoff}
except Exception:
return {}
def _save_seen(seen):
try:
SEEN_CACHE.write_text(json.dumps(seen))
except Exception as e:
log.error("Seen-Cache schreiben: %s", e)
def _get_archive(state=0, count=20):
"""Archiv abrufen (POST, wie Web-UI). state: 0=geplant, 1=fertig."""
s = _get_session()
if not s:
return {"error": "Login fehlgeschlagen"}
try:
end = datetime.now().strftime("%Y-%m-%d")
start = (datetime.now() - timedelta(days=60)).strftime("%Y-%m-%d")
r = s.post(
SAVETV_URL + "/STV/M/obj/archive/JSON/VideoArchiveApi.cfm",
data={
"bAggregateEntries": "false",
"iEntriesPerPage": str(count),
"iRecordingState": str(state),
"dStartdate": start,
"dEnddate": end,
},
headers={"X-Requested-With": "XMLHttpRequest"},
timeout=15,
)
return r.json()
except Exception as e:
return {"error": str(e)}
def _get_full_archive():
"""Alle fertigen Aufnahmen paginiert holen."""
s = _get_session()
if not s:
return []
end = datetime.now().strftime("%Y-%m-%d")
start = (datetime.now() - timedelta(days=60)).strftime("%Y-%m-%d")
all_entries = []
for page in range(1, 20):
try:
r = s.post(
SAVETV_URL + "/STV/M/obj/archive/JSON/VideoArchiveApi.cfm",
data={
"bAggregateEntries": "false",
"iEntriesPerPage": "100",
"iCurrentPage": str(page),
"iRecordingState": "1",
"dStartdate": start,
"dEnddate": end,
},
headers={"X-Requested-With": "XMLHttpRequest"},
timeout=15,
)
data = r.json()
entries = data.get("ARRVIDEOARCHIVEENTRIES", [])
if not entries:
break
all_entries.extend(entries)
except Exception as e:
log.error("Archive page %d: %s", page, e)
break
return all_entries
def _scrape_epg():
"""Holt Filme aus Save.TV Programmseiten (JSON im HTML).
Quellen:
- TvProgrammFilm.cfm: Alle Filme der naechsten 3 Tage (~35)
- TvProgrammFilmHighlights.cfm: Kuratierte Highlights 4 Wochen (~22)
- TvProgramm2015/2215.cfm: Primetime alle Genres (Filme rausfiltern)
"""
s = _get_session()
if not s:
return []
all_telecasts = []
seen_ids = set()
for page_path in EPG_PAGES:
try:
r = s.get(SAVETV_URL + page_path, timeout=15)
m = re.search(
r'model\s*=\s*(\{"TvCategoryId".*?"SortedTelecasts":\[.*?\]\})',
r.text,
re.DOTALL,
)
if not m:
log.warning("Kein model-JSON in %s", page_path)
continue
data = json.loads(m.group(1))
for tc in data.get("SortedTelecasts", []):
tid = int(tc.get("ITELECASTID", 0))
if tid and tid not in seen_ids:
seen_ids.add(tid)
all_telecasts.append(tc)
count = len(data.get("SortedTelecasts", []))
log.debug("EPG %s: %d Sendungen", page_path.split("/")[-1], count)
except Exception as e:
log.error("EPG Scrape %s: %s", page_path, e)
log.info("EPG gesamt: %d Sendungen aus %d Quellen", len(all_telecasts), len(EPG_PAGES))
return all_telecasts
def _filter_films(telecasts, only_new=False):
"""Filtert auf sehenswerte Spielfilme.
only_new=True: Nur Filme die noch nicht im Seen-Cache sind (fuer Cronjob).
"""
films = []
now = datetime.now()
seen = _load_seen() if only_new else {}
for tc in telecasts:
cat_id = tc.get("TVCATEGORYID", 0)
if cat_id != 1.0:
continue
title = tc.get("STITLE", "")
if not title or len(title) < 2:
continue
subcat = (tc.get("SSUBCATEGORYNAME") or "").lower()
if subcat in SPAM_SUBCATEGORIES:
continue
start_str = tc.get("DSTARTDATE", "")
try:
start_dt = datetime.strptime(start_str, "%Y-%m-%d %H:%M:%S")
except (ValueError, TypeError):
continue
if start_dt < now:
continue
tid = str(int(tc.get("ITELECASTID", 0)))
if only_new and tid in seen:
continue
score = 50
if subcat in GOOD_SUBCATEGORIES:
score += 20
desc = tc.get("STHEMA") or tc.get("SFULLSUBTITLE") or ""
if len(desc) > 50:
score += 10
already_recorded = tc.get("BEXISTRECORD", False)
if already_recorded:
score -= 30
is_highlight = tc.get("BISTIPOFDAY", False)
if is_highlight:
score += 10
tc["_score"] = score
tc["_start_dt"] = start_dt
films.append(tc)
films.sort(key=lambda x: (-x["_score"], x["_start_dt"]))
return films
def _mark_seen(films):
"""Markiere Filme als gesehen im Cache."""
seen = _load_seen()
today = datetime.now().strftime("%Y-%m-%d")
for f in films:
tid = str(int(f.get("ITELECASTID", 0)))
if tid != "0":
seen[tid] = today
_save_seen(seen)
def _record_telecast(telecast_id):
"""Aufnahme anlegen."""
s = _get_session()
if not s:
return "Login fehlgeschlagen"
try:
r = s.post(
SAVETV_URL + "/STV/M/obj/TC/tcJWriteRecord.cfm",
data={"TelecastId": telecast_id, "iRecordingBuffer": 0},
headers={"X-Requested-With": "XMLHttpRequest"},
timeout=15,
)
data = r.json()
return data.get("SMESSAGE", "Unbekannte Antwort")
except Exception as e:
return "Fehler: " + str(e)
def _format_film(f, with_tid=True):
"""Formatiert einen Film als Text."""
title = f.get("STITLE", "?")
station = f.get("STVSTATIONNAME", "?")
start = f.get("DSTARTDATE", "?")[:16]
subcat = f.get("SSUBCATEGORYNAME", "")
desc = (f.get("STHEMA") or f.get("SFULLSUBTITLE") or "")[:120]
tid = int(f.get("ITELECASTID", 0))
recorded = " [geplant]" if f.get("BEXISTRECORD") else ""
days_until = ""
try:
start_dt = datetime.strptime(f.get("DSTARTDATE", ""), "%Y-%m-%d %H:%M:%S")
delta = (start_dt.date() - datetime.now().date()).days
if delta == 0:
days_until = " (heute)"
elif delta == 1:
days_until = " (morgen)"
else:
days_until = " (in " + str(delta) + " Tagen)"
except (ValueError, TypeError):
pass
lines = [" " + title + recorded + days_until]
lines.append(" " + station + " | " + start + " | " + subcat)
if desc:
lines.append(" " + desc + "...")
if with_tid:
lines.append(" TelecastId " + str(tid))
return "\n".join(lines)
DOKU_KEYWORDS = {
"schlangen", "giftig", "gefährlich", "tiere", "tierwelt",
"wildtiere", "safari", "ozean", "meer", "ozeane",
"doku", "dokumentation", "reportage", "magazin",
"botschafter der meere", "dynastie", "leoparden",
"schildkröten", "wale", "australien", "afrika",
"gehirn unter strom",
}
KNOWN_CINEMA = {
"gravity", "blood diamond", "dunkirk",
"die fabelhafte welt der amelie", "children of men",
"die üblichen verdächtigen", "mord im orient express",
"aviator", "12 monkeys", "bullet train", "salt",
"anatomie eines falls", "i, tonya", "die wannseekonferenz",
"die vögel", "gosford park",
"albert nobbs", "der mit dem wolf tanzt", "zeugin der anklage",
"crazy heart", "talk to me", "das massaker von katyn",
"fallende blätter", "das lehrerzimmer", "die aussprache",
"old henry", "unternehmen petticoat",
"die reise zum mittelpunkt der erde",
"tagebuch einer kammerzofe",
"deutschstunde", "disco boy",
"die kinder der seidenstraße",
"führer und verführer", "nur fliegen ist schöner",
"der mit dem wolf tanzt", "nightwatch",
"der dritte", "blind willow",
"the revenant", "lion", "undisputed",
"no turning back", "parker", "stirb langsam",
"shooter", "valerian", "the suicide squad",
"mile 22", "open range", "hot summer nights",
"local hero", "z for zachariah", "the good neighbor",
"the informer", "la coda del diavolo",
"ladykillers", "hi-lo country", "yalda",
"bad director", "powder girl",
}
ENGLISH_WORDS = {
"the", "of", "and", "in", "for", "from", "with", "on", "at",
"to", "is", "has", "men", "man", "last", "night", "day", "club",
"girl", "boy", "road", "way", "no", "dead", "kill", "out",
"black", "good", "old", "new", "one", "two", "fallen",
"international", "redemption", "revenge", "spirit",
}
def _is_known_cinema(title):
"""Prueft ob ein Film als bekannter Kinofilm erkannt wird."""
t = title.lower().strip()
for known in KNOWN_CINEMA:
if t.startswith(known) or t == known:
return True
words = set(re.findall(r'[a-z]+', t))
english_count = len(words & ENGLISH_WORDS)
if re.match(r'^[A-Za-z0-9:,\'\-\.\! ]+$', title) and english_count >= 2:
return True
return False
def _is_excluded(title):
"""Filtert Programmänderungen und Dokus."""
t = title.lower()
if "programmänderung" in t:
return True
for kw in DOKU_KEYWORDS:
if kw in t:
return True
return False
def handle_get_savetv_archive_filme(**kw):
"""Alle fertigen Archiv-Filme holen, in Kino vs. TV trennen, sortiert ausgeben."""
entries = _get_full_archive()
if not entries:
return "Keine Archiv-Eintraege gefunden."
seen_titles = {}
series_count = 0
excluded_count = 0
for e in entries:
tc = e.get("STRTELECASTENTRY", {})
episode = tc.get("SFOLGE", "")
if episode:
series_count += 1
continue
title = tc.get("STITLE", "?")
if _is_excluded(title):
excluded_count += 1
continue
station = tc.get("STVSTATIONNAME", "?")
days_left = int(tc.get("IDAYSLEFTBEFOREDELETE", 0))
tid = int(tc.get("ITELECASTID", 0))
is_cinema = _is_known_cinema(title)
key = title.lower().strip()
if key in seen_titles:
if days_left > seen_titles[key]["days_left"]:
seen_titles[key].update(days_left=days_left, tid=tid)
continue
seen_titles[key] = {
"title": title, "station": station,
"days_left": days_left, "tid": tid,
"cinema": is_cinema,
}
all_films = list(seen_titles.values())
cinema = sorted(
[f for f in all_films if f["cinema"]],
key=lambda x: x["days_left"],
)
tv_films = sorted(
[f for f in all_films if not f["cinema"]],
key=lambda x: x["days_left"],
)
lines = [
f"Save.TV Archiv: {len(all_films)} Filme "
f"({series_count} Serien, {excluded_count} Dokus/Spam gefiltert)\n"
]
cinema_urgent = [f for f in cinema if f["days_left"] <= 7]
cinema_safe = [f for f in cinema if f["days_left"] > 7]
if cinema_urgent:
lines.append(
f"KINO-HIGHLIGHTS DRINGEND — laufen bald ab, JETZT sichern:"
)
for f in cinema_urgent:
lines.append(
f" [{f['days_left']}d] {f['title']} ({f['station']})"
)
lines.append("")
if cinema_safe:
lines.append(f"KINO-HIGHLIGHTS ({len(cinema_safe)} Filme):")
for f in cinema_safe:
lines.append(
f" {f['title']} ({f['station']}, {f['days_left']}d)"
)
lines.append("")
if tv_films:
tv_urgent = [f for f in tv_films if f["days_left"] <= 7]
tv_safe = [f for f in tv_films if f["days_left"] > 7]
lines.append(
f"DEUTSCHE TV-FILME ({len(tv_films)}, "
f"davon {len(tv_urgent)} bald ablaufend):"
)
for f in tv_urgent:
lines.append(f" [{f['days_left']}d] {f['title']} ({f['station']})")
if tv_safe:
lines.append(f" ... und {len(tv_safe)} weitere mit >7 Tagen")
return "\n".join(lines)
def handle_get_savetv_status(**kw):
archive = _get_archive(state=1, count=20)
planned = _get_archive(state=0, count=20)
if "error" in archive:
return "Save.TV Fehler: " + archive["error"]
lines = ["Save.TV Status\n"]
total = int(archive.get("ITOTALENTRIESINARCHIVE", 0))
fertig_total = int(archive.get("ITOTALENTRIES", 0))
lines.append(f"Archiv: {total} Aufnahmen gesamt, {fertig_total} fertig")
fertig = archive.get("ARRVIDEOARCHIVEENTRIES", [])
if fertig:
lines.append("\nLetzte fertige Aufnahmen:")
for e in fertig[:10]:
tc = e.get("STRTELECASTENTRY", {})
lines.append(
" " + tc.get("STITLE", "?")[:40] + " | "
+ tc.get("DSTARTDATE", "?")[:10] + " | "
+ tc.get("STVSTATIONNAME", "?")
)
geplant = planned.get("ARRVIDEOARCHIVEENTRIES", [])
plan_total = int(planned.get("ITOTALENTRIES", 0))
if geplant:
lines.append(f"\nGeplante Aufnahmen ({plan_total}):")
for e in geplant[:10]:
tc = e.get("STRTELECASTENTRY", {})
lines.append(
" " + tc.get("STITLE", "?")[:40] + " | "
+ tc.get("DSTARTDATE", "?")[:16] + " | "
+ tc.get("STVSTATIONNAME", "?")
)
return "\n".join(lines)
def handle_get_savetv_tipps(**kw):
telecasts = _scrape_epg()
if not telecasts:
return "Konnte keine Programmdaten von Save.TV laden."
films = _filter_films(telecasts, only_new=False)
if not films:
return "Keine sehenswerten Spielfilme in den naechsten Tagen gefunden."
lines = ["TV-Filmtipps\n"]
for f in films[:10]:
lines.append(_format_film(f))
lines.append("")
lines.append("Sage 'Nimm [Filmname] auf' oder nenne die TelecastId")
return "\n".join(lines)
def get_new_films():
"""Fuer den Cronjob: Neue Filme scannen, Top-Filme automatisch aufnehmen.
Returns: (auto_recorded, suggestions)
auto_recorded: Filme die automatisch aufgenommen wurden (Score >= 85)
suggestions: Filme die dem User vorgeschlagen werden (Score 60-84)
"""
telecasts = _scrape_epg()
if not telecasts:
return [], []
films = _filter_films(telecasts, only_new=True)
_mark_seen(films)
auto_recorded = []
suggestions = []
for f in films:
score = f["_score"]
if score < SUGGEST_SCORE:
continue
if f.get("BEXISTRECORD"):
continue
if score >= AUTO_RECORD_SCORE:
tid = int(f.get("ITELECASTID", 0))
result = _record_telecast(tid)
f["_record_result"] = result
auto_recorded.append(f)
log.info("Auto-Aufnahme: %s (Score %d) -> %s",
f.get("STITLE"), score, result)
else:
suggestions.append(f)
return auto_recorded, suggestions
def handle_savetv_record(telecast_id=0, **kw):
if not telecast_id:
return "Keine TelecastId angegeben."
tid = int(telecast_id)
telecasts = _scrape_epg()
title = "ID " + str(tid)
for tc in telecasts:
if int(tc.get("ITELECASTID", 0)) == tid:
title = tc.get("STITLE", title)
break
result = _record_telecast(tid)
return "Save.TV: " + result + "\nSendung: " + title
HANDLERS = {
"get_savetv_status": handle_get_savetv_status,
"get_savetv_tipps": handle_get_savetv_tipps,
"get_savetv_archive_filme": handle_get_savetv_archive_filme,
"savetv_record": handle_savetv_record,
}