homelab-brain/homelab-ai-bot/tools/savetv.py

650 lines
20 KiB
Python

"""Save.TV Online-Videorecorder — EPG Scanner + Film-Tipps + Aufnahme-Steuerung.
Architektur:
- EPG-Daten von Save.TV: TvProgrammFilm.cfm (3 Tage) + TvProgrammFilmHighlights.cfm (4 Wochen)
- Nur TVCATEGORYID 1 (Spielfilm), Spam-Genres rausgefiltert
- Seen-Cache: Nur neue Filme werden gemeldet (nicht erneut bei jedem Scan)
- Aufnahmen per tcJWriteRecord.cfm
"""
import re
import json
import logging
import requests
from datetime import datetime, timedelta
from pathlib import Path
log = logging.getLogger("savetv")
SAVETV_URL = "https://www.save.tv"
SAVETV_USER = ""
SAVETV_PASS = ""
_session = None
_session_ts = None
SESSION_MAX_AGE = 1800
EPG_PAGES = [
"/STV/M/obj/TVProgCtr/TvProgrammFilm.cfm",
"/STV/M/obj/TVProgCtr/TvProgrammFilmHighlights.cfm",
"/STV/M/obj/TVProgCtr/TvProgramm2015.cfm",
"/STV/M/obj/TVProgCtr/TvProgramm2215.cfm",
]
SEEN_CACHE = Path("/tmp/savetv_seen_ids.json")
SEEN_MAX_AGE_DAYS = 30
AUTO_RECORD_SCORE = 80
SUGGEST_SCORE = 60
SPAM_SUBCATEGORIES = {
"teleshop", "shopping", "dauerwerbesendung", "volksmusik",
"casting", "reality", "quiz/spiel", "comic", "zeichentrick",
"erotik", "kindersendung", "sonstige",
}
GOOD_SUBCATEGORIES = {
"action", "thriller", "krimi", "drama", "komödie", "komodie",
"comedy", "science fiction", "sci-fi", "fantasy", "abenteuer",
"horror", "western", "historienfilm", "animation", "mystery",
"romanze",
}
TOOLS = [
{
"type": "function",
"function": {
"name": "get_savetv_status",
"description": "Save.TV Status: Aufnahmen im Archiv, geplante Aufnahmen anzeigen.",
"parameters": {"type": "object", "properties": {}, "required": []},
},
},
{
"type": "function",
"function": {
"name": "get_savetv_tipps",
"description": "TV-Filmtipps: Sehenswerte Spielfilme der naechsten Tage/Wochen. "
"Nutze bei 'was laeuft', 'gute Filme', 'TV Tipps', 'Fernsehen', 'Save.TV'.",
"parameters": {"type": "object", "properties": {}, "required": []},
},
},
{
"type": "function",
"function": {
"name": "get_savetv_archive_filme",
"description": "Save.TV Archiv-Filme bewerten: Alle fertigen Aufnahmen holen, "
"nach Qualitaet bewerten, deduplizieren. Zeigt Top-Filme, dringende "
"(bald ablaufend) und weitere. Nutze bei 'gute Filme im Archiv', "
"'welche Filme habe ich', 'was ist sehenswert', 'Archiv bewerten'.",
"parameters": {"type": "object", "properties": {}, "required": []},
},
},
{
"type": "function",
"function": {
"name": "savetv_record",
"description": "Save.TV Aufnahme anlegen fuer eine bestimmte TelecastId. "
"Nutze wenn User sagt 'nimm auf', 'aufnehmen', 'record'.",
"parameters": {
"type": "object",
"properties": {
"telecast_id": {"type": "number", "description": "TelecastId der Sendung"}
},
"required": ["telecast_id"],
},
},
},
]
SYSTEM_PROMPT_EXTRA = """TV / Save.TV Tools:
- get_savetv_tipps: Zeigt sehenswerte Spielfilme der naechsten Tage/Wochen
- get_savetv_archive_filme: Bewertet alle fertigen Aufnahmen im Archiv nach Qualitaet
- savetv_record: Nimmt einen Film per TelecastId auf
- get_savetv_status: Zeigt Archiv und geplante Aufnahmen
Wenn der User nach Archiv-Filmen/Bewertung fragt, nutze get_savetv_archive_filme.
WICHTIG bei Archiv-Bewertung: Die Scores sind nur grobe Heuristiken (Sender, Highlight-Flag).
Nutze DEIN eigenes Filmwissen um die wirklich guten Filme zu identifizieren! Schau die
KOMPLETTE Liste durch — auch Filme mit Score 50-55 koennen Meisterwerke sein (z.B. bekannte
internationale Filme, Oscar-Gewinner, Klassiker). Sortiere nach DEINER Einschaetzung der
Filmqualitaet, nicht blind nach Score. Hebe besonders hervor: bald ablaufende gute Filme.
"""
def _init_creds():
global SAVETV_USER, SAVETV_PASS
if SAVETV_USER:
return
try:
from core import config
cfg = config.parse_config()
SAVETV_USER = cfg.raw.get("SAVETV_USER", "")
SAVETV_PASS = cfg.raw.get("SAVETV_PASS", "")
except Exception:
pass
def _get_session():
"""Login und Session cachen."""
global _session, _session_ts
_init_creds()
now = datetime.now()
if _session and _session_ts and (now - _session_ts).seconds < SESSION_MAX_AGE:
return _session
s = requests.Session()
s.headers.update({"User-Agent": "Mozilla/5.0 Hausmeister-Bot/1.0"})
try:
s.post(
SAVETV_URL + "/STV/M/Index.cfm?sk=PREMIUM",
data={"sUsername": SAVETV_USER, "sPassword": SAVETV_PASS, "value": "Login"},
allow_redirects=True,
timeout=15,
)
cookies = s.cookies.get_dict()
if not cookies.get("savetv_active_login"):
log.warning("Save.TV Login fehlgeschlagen")
return None
except Exception as e:
log.error("Save.TV Login Error: %s", e)
return None
_session = s
_session_ts = now
log.info("Save.TV Login erfolgreich")
return s
def _load_seen():
"""Lade gesehene TelecastIDs. Format: {id_str: 'YYYY-MM-DD'}."""
if not SEEN_CACHE.exists():
return {}
try:
data = json.loads(SEEN_CACHE.read_text())
cutoff = (datetime.now() - timedelta(days=SEEN_MAX_AGE_DAYS)).strftime("%Y-%m-%d")
return {k: v for k, v in data.items() if v >= cutoff}
except Exception:
return {}
def _save_seen(seen):
try:
SEEN_CACHE.write_text(json.dumps(seen))
except Exception as e:
log.error("Seen-Cache schreiben: %s", e)
def _get_archive(state=0, count=20):
"""Archiv abrufen (POST, wie Web-UI). state: 0=geplant, 1=fertig."""
s = _get_session()
if not s:
return {"error": "Login fehlgeschlagen"}
try:
end = datetime.now().strftime("%Y-%m-%d")
start = (datetime.now() - timedelta(days=60)).strftime("%Y-%m-%d")
r = s.post(
SAVETV_URL + "/STV/M/obj/archive/JSON/VideoArchiveApi.cfm",
data={
"bAggregateEntries": "false",
"iEntriesPerPage": str(count),
"iRecordingState": str(state),
"dStartdate": start,
"dEnddate": end,
},
headers={"X-Requested-With": "XMLHttpRequest"},
timeout=15,
)
return r.json()
except Exception as e:
return {"error": str(e)}
def _get_full_archive():
"""Alle fertigen Aufnahmen paginiert holen."""
s = _get_session()
if not s:
return []
end = datetime.now().strftime("%Y-%m-%d")
start = (datetime.now() - timedelta(days=60)).strftime("%Y-%m-%d")
all_entries = []
for page in range(1, 20):
try:
r = s.post(
SAVETV_URL + "/STV/M/obj/archive/JSON/VideoArchiveApi.cfm",
data={
"bAggregateEntries": "false",
"iEntriesPerPage": "100",
"iCurrentPage": str(page),
"iRecordingState": "1",
"dStartdate": start,
"dEnddate": end,
},
headers={"X-Requested-With": "XMLHttpRequest"},
timeout=15,
)
data = r.json()
entries = data.get("ARRVIDEOARCHIVEENTRIES", [])
if not entries:
break
all_entries.extend(entries)
except Exception as e:
log.error("Archive page %d: %s", page, e)
break
return all_entries
def _scrape_epg():
"""Holt Filme aus Save.TV Programmseiten (JSON im HTML).
Quellen:
- TvProgrammFilm.cfm: Alle Filme der naechsten 3 Tage (~35)
- TvProgrammFilmHighlights.cfm: Kuratierte Highlights 4 Wochen (~22)
- TvProgramm2015/2215.cfm: Primetime alle Genres (Filme rausfiltern)
"""
s = _get_session()
if not s:
return []
all_telecasts = []
seen_ids = set()
for page_path in EPG_PAGES:
try:
r = s.get(SAVETV_URL + page_path, timeout=15)
m = re.search(
r'model\s*=\s*(\{"TvCategoryId".*?"SortedTelecasts":\[.*?\]\})',
r.text,
re.DOTALL,
)
if not m:
log.warning("Kein model-JSON in %s", page_path)
continue
data = json.loads(m.group(1))
for tc in data.get("SortedTelecasts", []):
tid = int(tc.get("ITELECASTID", 0))
if tid and tid not in seen_ids:
seen_ids.add(tid)
all_telecasts.append(tc)
count = len(data.get("SortedTelecasts", []))
log.debug("EPG %s: %d Sendungen", page_path.split("/")[-1], count)
except Exception as e:
log.error("EPG Scrape %s: %s", page_path, e)
log.info("EPG gesamt: %d Sendungen aus %d Quellen", len(all_telecasts), len(EPG_PAGES))
return all_telecasts
def _filter_films(telecasts, only_new=False):
"""Filtert auf sehenswerte Spielfilme.
only_new=True: Nur Filme die noch nicht im Seen-Cache sind (fuer Cronjob).
"""
films = []
now = datetime.now()
seen = _load_seen() if only_new else {}
for tc in telecasts:
cat_id = tc.get("TVCATEGORYID", 0)
if cat_id != 1.0:
continue
title = tc.get("STITLE", "")
if not title or len(title) < 2:
continue
subcat = (tc.get("SSUBCATEGORYNAME") or "").lower()
if subcat in SPAM_SUBCATEGORIES:
continue
start_str = tc.get("DSTARTDATE", "")
try:
start_dt = datetime.strptime(start_str, "%Y-%m-%d %H:%M:%S")
except (ValueError, TypeError):
continue
if start_dt < now:
continue
tid = str(int(tc.get("ITELECASTID", 0)))
if only_new and tid in seen:
continue
score = 50
if subcat in GOOD_SUBCATEGORIES:
score += 20
desc = tc.get("STHEMA") or tc.get("SFULLSUBTITLE") or ""
if len(desc) > 50:
score += 10
already_recorded = tc.get("BEXISTRECORD", False)
if already_recorded:
score -= 30
is_highlight = tc.get("BISTIPOFDAY", False)
if is_highlight:
score += 10
tc["_score"] = score
tc["_start_dt"] = start_dt
films.append(tc)
films.sort(key=lambda x: (-x["_score"], x["_start_dt"]))
return films
def _mark_seen(films):
"""Markiere Filme als gesehen im Cache."""
seen = _load_seen()
today = datetime.now().strftime("%Y-%m-%d")
for f in films:
tid = str(int(f.get("ITELECASTID", 0)))
if tid != "0":
seen[tid] = today
_save_seen(seen)
def _record_telecast(telecast_id):
"""Aufnahme anlegen."""
s = _get_session()
if not s:
return "Login fehlgeschlagen"
try:
r = s.post(
SAVETV_URL + "/STV/M/obj/TC/tcJWriteRecord.cfm",
data={"TelecastId": telecast_id, "iRecordingBuffer": 0},
headers={"X-Requested-With": "XMLHttpRequest"},
timeout=15,
)
data = r.json()
return data.get("SMESSAGE", "Unbekannte Antwort")
except Exception as e:
return "Fehler: " + str(e)
def _format_film(f, with_tid=True):
"""Formatiert einen Film als Text."""
title = f.get("STITLE", "?")
station = f.get("STVSTATIONNAME", "?")
start = f.get("DSTARTDATE", "?")[:16]
subcat = f.get("SSUBCATEGORYNAME", "")
desc = (f.get("STHEMA") or f.get("SFULLSUBTITLE") or "")[:120]
tid = int(f.get("ITELECASTID", 0))
recorded = " [geplant]" if f.get("BEXISTRECORD") else ""
days_until = ""
try:
start_dt = datetime.strptime(f.get("DSTARTDATE", ""), "%Y-%m-%d %H:%M:%S")
delta = (start_dt.date() - datetime.now().date()).days
if delta == 0:
days_until = " (heute)"
elif delta == 1:
days_until = " (morgen)"
else:
days_until = " (in " + str(delta) + " Tagen)"
except (ValueError, TypeError):
pass
lines = [" " + title + recorded + days_until]
lines.append(" " + station + " | " + start + " | " + subcat)
if desc:
lines.append(" " + desc + "...")
if with_tid:
lines.append(" TelecastId " + str(tid))
return "\n".join(lines)
DOKU_KEYWORDS = {
"schlangen", "giftig", "gefährlich", "tiere", "tierwelt",
"wildtiere", "safari", "ozean", "meer", "ozeane",
"doku", "dokumentation", "reportage", "magazin",
"botschafter der meere", "dynastie", "leoparden",
"schildkröten", "wale", "australien", "afrika",
"gehirn unter strom",
}
def _score_archive_film(title, station, highlight, subtitle="", thema=""):
"""Bewertet einen Archiv-Film heuristisch (0-100)."""
t = title.lower()
s = station.lower()
if "programmänderung" in t:
return -1
for kw in DOKU_KEYWORDS:
if kw in t:
return -1
score = 50
premium_stations = {"arte", "zdf", "das erste", "mdr", "swr", "ndr", "wdr", "br"}
action_stations = {"prosieben", "sat.1", "kabel 1", "vox", "rtl", "tele 5", "zdf_neo"}
if s in premium_stations:
score += 5
elif s in action_stations:
score += 3
if highlight:
score += 15
desc = (thema or subtitle or "").lower()
if len(desc) > 30:
score += 5
quality_hints = [
"oscar", "golden globe", "cannes", "berlinale", "venedig",
"preisgekrönt", "meisterwerk", "bestseller", "basiert auf",
]
for hint in quality_hints:
if hint in desc or hint in t:
score += 10
break
if any(c.isascii() and c.isalpha() for c in title) and not all(c.isascii() for c in title if c.isalpha()):
pass
elif re.search(r'[A-Z][a-z]+ [A-Z][a-z]+', title) and not re.search(r'[äöüÄÖÜß]', title):
score += 8
return score
def handle_get_savetv_archive_filme(**kw):
"""Alle fertigen Archiv-Filme holen, bewerten, deduplizieren, sortiert ausgeben."""
entries = _get_full_archive()
if not entries:
return "Keine Archiv-Eintraege gefunden."
films = []
seen_titles = {}
series_count = 0
for e in entries:
tc = e.get("STRTELECASTENTRY", {})
episode = tc.get("SFOLGE", "")
if episode:
series_count += 1
continue
title = tc.get("STITLE", "?")
station = tc.get("STVSTATIONNAME", "?")
highlight = tc.get("BISHIGHLIGHT", False)
subtitle = tc.get("SSUBTITLE", "")
thema = tc.get("STHEMA", "")
date = tc.get("DSTARTDATE", "?")[:10]
days_left = int(tc.get("IDAYSLEFTBEFOREDELETE", 0))
tid = int(tc.get("ITELECASTID", 0))
score = _score_archive_film(title, station, highlight, subtitle, thema)
if score < 0:
continue
key = title.lower().strip()
if key in seen_titles:
if days_left > seen_titles[key]["days_left"]:
seen_titles[key]["days_left"] = days_left
seen_titles[key]["date"] = date
seen_titles[key]["tid"] = tid
continue
seen_titles[key] = {
"title": title, "station": station, "date": date,
"days_left": days_left, "score": score, "tid": tid,
"highlight": highlight,
}
films = sorted(seen_titles.values(), key=lambda x: (-x["score"], x["days_left"]))
total_archive = len(entries)
urgent = sorted(
[f for f in films if f["days_left"] <= 7],
key=lambda x: (x["days_left"], -x["score"]),
)
lines = [
f"Save.TV Archiv-Bewertung: {len(films)} Filme "
f"(von {total_archive} Aufnahmen, {series_count} Serien-Episoden gefiltert)\n"
]
if urgent:
lines.append(f"DRINGEND — {len(urgent)} Filme laufen in <=7 Tagen ab:")
for f in urgent:
lines.append(
f" [{f['days_left']}d] {f['title'][:50]} | {f['station']} | TID {f['tid']}"
)
lines.append("")
safe = [f for f in films if f["days_left"] > 7]
if safe:
lines.append(
f"ALLE FILME IM ARCHIV ({len(safe)}) — nutze dein Filmwissen "
f"um die besten zu identifizieren:"
)
for f in safe:
lines.append(
f" {f['title'][:50]} | {f['station']} | {f['days_left']}d"
)
return "\n".join(lines)
def handle_get_savetv_status(**kw):
archive = _get_archive(state=1, count=20)
planned = _get_archive(state=0, count=20)
if "error" in archive:
return "Save.TV Fehler: " + archive["error"]
lines = ["Save.TV Status\n"]
total = int(archive.get("ITOTALENTRIESINARCHIVE", 0))
fertig_total = int(archive.get("ITOTALENTRIES", 0))
lines.append(f"Archiv: {total} Aufnahmen gesamt, {fertig_total} fertig")
fertig = archive.get("ARRVIDEOARCHIVEENTRIES", [])
if fertig:
lines.append("\nLetzte fertige Aufnahmen:")
for e in fertig[:10]:
tc = e.get("STRTELECASTENTRY", {})
lines.append(
" " + tc.get("STITLE", "?")[:40] + " | "
+ tc.get("DSTARTDATE", "?")[:10] + " | "
+ tc.get("STVSTATIONNAME", "?")
)
geplant = planned.get("ARRVIDEOARCHIVEENTRIES", [])
plan_total = int(planned.get("ITOTALENTRIES", 0))
if geplant:
lines.append(f"\nGeplante Aufnahmen ({plan_total}):")
for e in geplant[:10]:
tc = e.get("STRTELECASTENTRY", {})
lines.append(
" " + tc.get("STITLE", "?")[:40] + " | "
+ tc.get("DSTARTDATE", "?")[:16] + " | "
+ tc.get("STVSTATIONNAME", "?")
)
return "\n".join(lines)
def handle_get_savetv_tipps(**kw):
telecasts = _scrape_epg()
if not telecasts:
return "Konnte keine Programmdaten von Save.TV laden."
films = _filter_films(telecasts, only_new=False)
if not films:
return "Keine sehenswerten Spielfilme in den naechsten Tagen gefunden."
lines = ["TV-Filmtipps\n"]
for f in films[:10]:
lines.append(_format_film(f))
lines.append("")
lines.append("Sage 'Nimm [Filmname] auf' oder nenne die TelecastId")
return "\n".join(lines)
def get_new_films():
"""Fuer den Cronjob: Neue Filme scannen, Top-Filme automatisch aufnehmen.
Returns: (auto_recorded, suggestions)
auto_recorded: Filme die automatisch aufgenommen wurden (Score >= 85)
suggestions: Filme die dem User vorgeschlagen werden (Score 60-84)
"""
telecasts = _scrape_epg()
if not telecasts:
return [], []
films = _filter_films(telecasts, only_new=True)
_mark_seen(films)
auto_recorded = []
suggestions = []
for f in films:
score = f["_score"]
if score < SUGGEST_SCORE:
continue
if f.get("BEXISTRECORD"):
continue
if score >= AUTO_RECORD_SCORE:
tid = int(f.get("ITELECASTID", 0))
result = _record_telecast(tid)
f["_record_result"] = result
auto_recorded.append(f)
log.info("Auto-Aufnahme: %s (Score %d) -> %s",
f.get("STITLE"), score, result)
else:
suggestions.append(f)
return auto_recorded, suggestions
def handle_savetv_record(telecast_id=0, **kw):
if not telecast_id:
return "Keine TelecastId angegeben."
tid = int(telecast_id)
telecasts = _scrape_epg()
title = "ID " + str(tid)
for tc in telecasts:
if int(tc.get("ITELECASTID", 0)) == tid:
title = tc.get("STITLE", title)
break
result = _record_telecast(tid)
return "Save.TV: " + result + "\nSendung: " + title
HANDLERS = {
"get_savetv_status": handle_get_savetv_status,
"get_savetv_tipps": handle_get_savetv_tipps,
"get_savetv_archive_filme": handle_get_savetv_archive_filme,
"savetv_record": handle_savetv_record,
}