"""Save.TV Online-Videorecorder — EPG Scanner + Film-Tipps + Aufnahme-Steuerung. Architektur: - EPG-Daten von Save.TV: TvProgrammFilm.cfm (3 Tage) + TvProgrammFilmHighlights.cfm (4 Wochen) - Nur TVCATEGORYID 1 (Spielfilm), Spam-Genres rausgefiltert - Seen-Cache: Nur neue Filme werden gemeldet (nicht erneut bei jedem Scan) - Aufnahmen per tcJWriteRecord.cfm """ import re import json import logging import requests from datetime import datetime, timedelta from pathlib import Path log = logging.getLogger("savetv") SAVETV_URL = "https://www.save.tv" SAVETV_USER = "" SAVETV_PASS = "" _session = None _session_ts = None SESSION_MAX_AGE = 1800 EPG_PAGES = [ "/STV/M/obj/TVProgCtr/TvProgrammFilm.cfm", "/STV/M/obj/TVProgCtr/TvProgrammFilmHighlights.cfm", "/STV/M/obj/TVProgCtr/TvProgramm2015.cfm", "/STV/M/obj/TVProgCtr/TvProgramm2215.cfm", ] SEEN_CACHE = Path("/tmp/savetv_seen_ids.json") SEEN_MAX_AGE_DAYS = 30 AUTO_RECORD_SCORE = 80 SUGGEST_SCORE = 60 SPAM_SUBCATEGORIES = { "teleshop", "shopping", "dauerwerbesendung", "volksmusik", "casting", "reality", "quiz/spiel", "comic", "zeichentrick", "erotik", "kindersendung", "sonstige", } GOOD_SUBCATEGORIES = { "action", "thriller", "krimi", "drama", "komödie", "komodie", "comedy", "science fiction", "sci-fi", "fantasy", "abenteuer", "horror", "western", "historienfilm", "animation", "mystery", "romanze", } TOOLS = [ { "type": "function", "function": { "name": "get_savetv_status", "description": "Save.TV Status: Aufnahmen im Archiv, geplante Aufnahmen anzeigen.", "parameters": {"type": "object", "properties": {}, "required": []}, }, }, { "type": "function", "function": { "name": "get_savetv_tipps", "description": "TV-Filmtipps: Sehenswerte Spielfilme der naechsten Tage/Wochen. " "Nutze bei 'was laeuft', 'gute Filme', 'TV Tipps', 'Fernsehen', 'Save.TV'.", "parameters": {"type": "object", "properties": {}, "required": []}, }, }, { "type": "function", "function": { "name": "savetv_record", "description": "Save.TV Aufnahme anlegen fuer eine bestimmte TelecastId. " "Nutze wenn User sagt 'nimm auf', 'aufnehmen', 'record'.", "parameters": { "type": "object", "properties": { "telecast_id": {"type": "number", "description": "TelecastId der Sendung"} }, "required": ["telecast_id"], }, }, }, ] SYSTEM_PROMPT_EXTRA = """TV / Save.TV Tools: - get_savetv_tipps: Zeigt sehenswerte Spielfilme der naechsten Tage/Wochen - savetv_record: Nimmt einen Film per TelecastId auf - get_savetv_status: Zeigt Archiv und geplante Aufnahmen Wenn der User einen Film aufnehmen will, nutze savetv_record mit der TelecastId. """ def _init_creds(): global SAVETV_USER, SAVETV_PASS if SAVETV_USER: return try: from core import config cfg = config.parse_config() SAVETV_USER = cfg.raw.get("SAVETV_USER", "") SAVETV_PASS = cfg.raw.get("SAVETV_PASS", "") except Exception: pass def _get_session(): """Login und Session cachen.""" global _session, _session_ts _init_creds() now = datetime.now() if _session and _session_ts and (now - _session_ts).seconds < SESSION_MAX_AGE: return _session s = requests.Session() s.headers.update({"User-Agent": "Mozilla/5.0 Hausmeister-Bot/1.0"}) try: s.post( SAVETV_URL + "/STV/M/Index.cfm?sk=PREMIUM", data={"sUsername": SAVETV_USER, "sPassword": SAVETV_PASS, "value": "Login"}, allow_redirects=True, timeout=15, ) cookies = s.cookies.get_dict() if not cookies.get("savetv_active_login"): log.warning("Save.TV Login fehlgeschlagen") return None except Exception as e: log.error("Save.TV Login Error: %s", e) return None _session = s _session_ts = now log.info("Save.TV Login erfolgreich") return s def _load_seen(): """Lade gesehene TelecastIDs. Format: {id_str: 'YYYY-MM-DD'}.""" if not SEEN_CACHE.exists(): return {} try: data = json.loads(SEEN_CACHE.read_text()) cutoff = (datetime.now() - timedelta(days=SEEN_MAX_AGE_DAYS)).strftime("%Y-%m-%d") return {k: v for k, v in data.items() if v >= cutoff} except Exception: return {} def _save_seen(seen): try: SEEN_CACHE.write_text(json.dumps(seen)) except Exception as e: log.error("Seen-Cache schreiben: %s", e) def _get_archive(state=0, count=20): """Archiv abrufen. state: 0=geplant, 1=fertig.""" s = _get_session() if not s: return {"error": "Login fehlgeschlagen"} try: r = s.get( SAVETV_URL + "/STV/M/obj/archive/JSON/VideoArchiveApi.cfm", params={ "bAggregateEntries": "false", "iEntriesPerPage": str(count), "iRecordingState": str(state), }, headers={"X-Requested-With": "XMLHttpRequest"}, timeout=15, ) return r.json() except Exception as e: return {"error": str(e)} def _scrape_epg(): """Holt Filme aus Save.TV Programmseiten (JSON im HTML). Quellen: - TvProgrammFilm.cfm: Alle Filme der naechsten 3 Tage (~35) - TvProgrammFilmHighlights.cfm: Kuratierte Highlights 4 Wochen (~22) - TvProgramm2015/2215.cfm: Primetime alle Genres (Filme rausfiltern) """ s = _get_session() if not s: return [] all_telecasts = [] seen_ids = set() for page_path in EPG_PAGES: try: r = s.get(SAVETV_URL + page_path, timeout=15) m = re.search( r'model\s*=\s*(\{"TvCategoryId".*?"SortedTelecasts":\[.*?\]\})', r.text, re.DOTALL, ) if not m: log.warning("Kein model-JSON in %s", page_path) continue data = json.loads(m.group(1)) for tc in data.get("SortedTelecasts", []): tid = int(tc.get("ITELECASTID", 0)) if tid and tid not in seen_ids: seen_ids.add(tid) all_telecasts.append(tc) count = len(data.get("SortedTelecasts", [])) log.debug("EPG %s: %d Sendungen", page_path.split("/")[-1], count) except Exception as e: log.error("EPG Scrape %s: %s", page_path, e) log.info("EPG gesamt: %d Sendungen aus %d Quellen", len(all_telecasts), len(EPG_PAGES)) return all_telecasts def _filter_films(telecasts, only_new=False): """Filtert auf sehenswerte Spielfilme. only_new=True: Nur Filme die noch nicht im Seen-Cache sind (fuer Cronjob). """ films = [] now = datetime.now() seen = _load_seen() if only_new else {} for tc in telecasts: cat_id = tc.get("TVCATEGORYID", 0) if cat_id != 1.0: continue title = tc.get("STITLE", "") if not title or len(title) < 2: continue subcat = (tc.get("SSUBCATEGORYNAME") or "").lower() if subcat in SPAM_SUBCATEGORIES: continue start_str = tc.get("DSTARTDATE", "") try: start_dt = datetime.strptime(start_str, "%Y-%m-%d %H:%M:%S") except (ValueError, TypeError): continue if start_dt < now: continue tid = str(int(tc.get("ITELECASTID", 0))) if only_new and tid in seen: continue score = 50 if subcat in GOOD_SUBCATEGORIES: score += 20 desc = tc.get("STHEMA") or tc.get("SFULLSUBTITLE") or "" if len(desc) > 50: score += 10 already_recorded = tc.get("BEXISTRECORD", False) if already_recorded: score -= 30 is_highlight = tc.get("BISTIPOFDAY", False) if is_highlight: score += 10 tc["_score"] = score tc["_start_dt"] = start_dt films.append(tc) films.sort(key=lambda x: (-x["_score"], x["_start_dt"])) return films def _mark_seen(films): """Markiere Filme als gesehen im Cache.""" seen = _load_seen() today = datetime.now().strftime("%Y-%m-%d") for f in films: tid = str(int(f.get("ITELECASTID", 0))) if tid != "0": seen[tid] = today _save_seen(seen) def _record_telecast(telecast_id): """Aufnahme anlegen.""" s = _get_session() if not s: return "Login fehlgeschlagen" try: r = s.post( SAVETV_URL + "/STV/M/obj/TC/tcJWriteRecord.cfm", data={"TelecastId": telecast_id, "iRecordingBuffer": 0}, headers={"X-Requested-With": "XMLHttpRequest"}, timeout=15, ) data = r.json() return data.get("SMESSAGE", "Unbekannte Antwort") except Exception as e: return "Fehler: " + str(e) def _format_film(f, with_tid=True): """Formatiert einen Film als Text.""" title = f.get("STITLE", "?") station = f.get("STVSTATIONNAME", "?") start = f.get("DSTARTDATE", "?")[:16] subcat = f.get("SSUBCATEGORYNAME", "") desc = (f.get("STHEMA") or f.get("SFULLSUBTITLE") or "")[:120] tid = int(f.get("ITELECASTID", 0)) recorded = " [geplant]" if f.get("BEXISTRECORD") else "" days_until = "" try: start_dt = datetime.strptime(f.get("DSTARTDATE", ""), "%Y-%m-%d %H:%M:%S") delta = (start_dt.date() - datetime.now().date()).days if delta == 0: days_until = " (heute)" elif delta == 1: days_until = " (morgen)" else: days_until = " (in " + str(delta) + " Tagen)" except (ValueError, TypeError): pass lines = [" " + title + recorded + days_until] lines.append(" " + station + " | " + start + " | " + subcat) if desc: lines.append(" " + desc + "...") if with_tid: lines.append(" TelecastId " + str(tid)) return "\n".join(lines) def handle_get_savetv_status(**kw): archive = _get_archive(state=1, count=5) planned = _get_archive(state=0, count=10) if "error" in archive: return "Save.TV Fehler: " + archive["error"] lines = ["Save.TV Status\n"] total = int(archive.get("ITOTALENTRIESINARCHIVE", 0)) lines.append("Archiv: " + str(total) + " Aufnahmen gesamt") fertig = archive.get("ARRVIDEOARCHIVEENTRIES", []) if fertig: lines.append("\nLetzte fertige Aufnahmen:") for e in fertig[:5]: tc = e.get("STRTELECASTENTRY", {}) lines.append( " " + tc.get("STITLE", "?")[:40] + " | " + tc.get("DSTARTDATE", "?")[:10] + " | " + tc.get("STVSTATIONNAME", "?") ) geplant = planned.get("ARRVIDEOARCHIVEENTRIES", []) plan_total = int(planned.get("ITOTALENTRIES", 0)) if geplant: lines.append("\nGeplante Aufnahmen (" + str(plan_total) + "):") for e in geplant[:10]: tc = e.get("STRTELECASTENTRY", {}) lines.append( " " + tc.get("STITLE", "?")[:40] + " | " + tc.get("DSTARTDATE", "?")[:16] + " | " + tc.get("STVSTATIONNAME", "?") ) return "\n".join(lines) def handle_get_savetv_tipps(**kw): telecasts = _scrape_epg() if not telecasts: return "Konnte keine Programmdaten von Save.TV laden." films = _filter_films(telecasts, only_new=False) if not films: return "Keine sehenswerten Spielfilme in den naechsten Tagen gefunden." lines = ["TV-Filmtipps\n"] for f in films[:10]: lines.append(_format_film(f)) lines.append("") lines.append("Sage 'Nimm [Filmname] auf' oder nenne die TelecastId") return "\n".join(lines) def get_new_films(): """Fuer den Cronjob: Neue Filme scannen, Top-Filme automatisch aufnehmen. Returns: (auto_recorded, suggestions) auto_recorded: Filme die automatisch aufgenommen wurden (Score >= 85) suggestions: Filme die dem User vorgeschlagen werden (Score 60-84) """ telecasts = _scrape_epg() if not telecasts: return [], [] films = _filter_films(telecasts, only_new=True) _mark_seen(films) auto_recorded = [] suggestions = [] for f in films: score = f["_score"] if score < SUGGEST_SCORE: continue if f.get("BEXISTRECORD"): continue if score >= AUTO_RECORD_SCORE: tid = int(f.get("ITELECASTID", 0)) result = _record_telecast(tid) f["_record_result"] = result auto_recorded.append(f) log.info("Auto-Aufnahme: %s (Score %d) -> %s", f.get("STITLE"), score, result) else: suggestions.append(f) return auto_recorded, suggestions def handle_savetv_record(telecast_id=0, **kw): if not telecast_id: return "Keine TelecastId angegeben." tid = int(telecast_id) telecasts = _scrape_epg() title = "ID " + str(tid) for tc in telecasts: if int(tc.get("ITELECASTID", 0)) == tid: title = tc.get("STITLE", title) break result = _record_telecast(tid) return "Save.TV: " + result + "\nSendung: " + title HANDLERS = { "get_savetv_status": handle_get_savetv_status, "get_savetv_tipps": handle_get_savetv_tipps, "savetv_record": handle_savetv_record, }