homelab-brain/fuenfvoracht/src/database.py

704 lines
24 KiB
Python

import sqlite3
import os
import logging
from datetime import datetime
import logger as flog
_logger = logging.getLogger(__name__)
DB_PATH = os.environ.get('DB_PATH', '/data/fuenfvoracht.db')
DEFAULT_PROMPT = '''Du erstellst einen strukturierten Beitrag für den Telegram-Kanal "Fünf vor Acht".
Der Beitrag präsentiert einen Inhalt (Video, Artikel, Vortrag) neutral und informativ.
Leser sollen sich selbst ein Bild machen können.
EINGABE: {source}
DATUM: {date}
THEMA: {tag}
AUFGABE:
Analysiere die Quelle und erstelle einen Telegram-Beitrag nach exakt diesem FORMAT.
Wähle passende Emojis für die Sektions-Überschriften je nach Thema.
Schreibe sachlich und ohne eigene Wertung.
FORMAT — exakt so ausgeben (Telegram-kompatibel, kein HTML):
[Kategorie-Emoji] [Typ]: [Vollständiger Titel]
🔗 [Quelle ansehen / Artikel lesen / Video ansehen]:
[URL aus der Eingabe]
[Themen-Emoji] Inhaltlicher Schwerpunkt
[2-3 Sätze: Wer spricht/schreibt worüber und in welchem Kontext]
Themen im Überblick:
• [Kernthema 1]
• [Kernthema 2]
• [Kernthema 3]
• [Kernthema 4]
• [Kernthema 5]
[2. Themen-Emoji] [Zweiter Schwerpunkt falls vorhanden]
[2-3 Sätze zum zweiten Teil]
• [Unterpunkt 1]
• [Unterpunkt 2]
• [Unterpunkt 3]
📌 Einordnung
[2-3 Sätze: Neutrale Beschreibung des Formats/der Methode. Kein Urteil. Leser können Quellen selbst prüfen.]'''
def get_conn():
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def init_db():
conn = get_conn()
c = conn.cursor()
# Basis-Tabellen
c.executescript('''
CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
post_time TEXT NOT NULL DEFAULT '19:55',
source_input TEXT,
content_raw TEXT,
content_final TEXT,
status TEXT DEFAULT 'draft',
version INTEGER DEFAULT 1,
review_message_id INTEGER,
review_chat_id INTEGER,
prompt_id INTEGER,
tag TEXT,
notify_at TEXT,
scheduled_at TEXT,
created_at TEXT DEFAULT (datetime('now')),
sent_to_bot_at TEXT,
approved_at TEXT,
posted_at TEXT,
UNIQUE(date, post_time)
);
CREATE TABLE IF NOT EXISTS article_versions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
article_id INTEGER NOT NULL,
version_nr INTEGER NOT NULL,
content TEXT,
created_at TEXT DEFAULT (datetime('now')),
FOREIGN KEY (article_id) REFERENCES articles(id)
);
CREATE TABLE IF NOT EXISTS post_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
article_id INTEGER NOT NULL,
channel_message_id INTEGER,
posted_at TEXT DEFAULT (datetime('now')),
FOREIGN KEY (article_id) REFERENCES articles(id)
);
CREATE TABLE IF NOT EXISTS prompts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
system_prompt TEXT NOT NULL,
is_default INTEGER DEFAULT 0,
last_tested_at TEXT,
test_result TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS sources_favorites (
id INTEGER PRIMARY KEY AUTOINCREMENT,
label TEXT NOT NULL,
url TEXT,
used_count INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE
);
CREATE TABLE IF NOT EXISTS article_tags (
article_id INTEGER NOT NULL,
tag_id INTEGER NOT NULL,
PRIMARY KEY (article_id, tag_id),
FOREIGN KEY (article_id) REFERENCES articles(id),
FOREIGN KEY (tag_id) REFERENCES tags(id)
);
CREATE TABLE IF NOT EXISTS channels (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
telegram_id TEXT,
post_time TEXT DEFAULT '19:55',
timezone TEXT DEFAULT 'Europe/Berlin',
active INTEGER DEFAULT 1
);
CREATE TABLE IF NOT EXISTS locations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
flag TEXT NOT NULL,
timezone TEXT NOT NULL,
reminder_morning TEXT DEFAULT '10:00',
reminder_afternoon TEXT DEFAULT '18:00'
);
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT
);
CREATE TABLE IF NOT EXISTS reviewers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER NOT NULL UNIQUE,
name TEXT NOT NULL,
active INTEGER DEFAULT 1,
added_at TEXT DEFAULT (datetime('now'))
);
''')
# Migration: post_time Spalte zu bestehender articles-Tabelle hinzufügen falls fehlt
cols = [r[1] for r in c.execute("PRAGMA table_info(articles)").fetchall()]
if 'post_time' not in cols:
c.execute("ALTER TABLE articles ADD COLUMN post_time TEXT NOT NULL DEFAULT '19:55'")
_logger.info("Migration: post_time Spalte hinzugefügt")
if 'notify_at' not in cols:
c.execute("ALTER TABLE articles ADD COLUMN notify_at TEXT")
if 'scheduled_at' not in cols:
c.execute("ALTER TABLE articles ADD COLUMN scheduled_at TEXT")
# reviewers-Tabelle Migration
reviewer_cols = [r[1] for r in c.execute("PRAGMA table_info(reviewers)").fetchall()]
if not reviewer_cols:
_logger.info("Migration: reviewers Tabelle erstellt")
# Standard-Daten
c.execute("SELECT COUNT(*) FROM prompts")
if c.fetchone()[0] == 0:
c.execute(
"INSERT INTO prompts (name, system_prompt, is_default) VALUES (?, ?, 1)",
("Standard", DEFAULT_PROMPT)
)
c.execute("SELECT COUNT(*) FROM channels")
if c.fetchone()[0] == 0:
c.execute(
"INSERT INTO channels (name, telegram_id, post_time, timezone) VALUES (?, ?, ?, ?)",
("Fünf vor Acht", "", "19:55", "Europe/Berlin")
)
for tag in ["Politik", "Wirtschaft", "Tech", "Gesellschaft", "Umwelt", "Kultur", "Sport"]:
c.execute("INSERT OR IGNORE INTO tags (name) VALUES (?)", (tag,))
locations = [
("Deutschland", "🇩🇪", "Europe/Berlin", "10:00", "18:00"),
("Kambodscha", "🇰🇭", "Asia/Phnom_Penh", "10:00", "18:00"),
("Thailand", "🇹🇭", "Asia/Bangkok", "10:00", "18:00"),
("USA Ostküste", "🇺🇸", "America/New_York", "10:00", "18:00"),
("USA Westküste","🇺🇸", "America/Los_Angeles", "10:00", "18:00"),
("Spanien", "🇪🇸", "Europe/Madrid", "10:00", "18:00"),
]
c.execute("SELECT COUNT(*) FROM locations")
if c.fetchone()[0] == 0:
for loc in locations:
c.execute(
"INSERT INTO locations (name, flag, timezone, reminder_morning, reminder_afternoon) VALUES (?,?,?,?,?)",
loc
)
c.execute("INSERT OR IGNORE INTO settings (key, value) VALUES ('user_location_id', '1')")
conn.commit()
conn.close()
_logger.info("DB initialisiert: %s", DB_PATH)
# ── Article CRUD ──────────────────────────────────────────────────────────────
def get_article_by_date(date_str, post_time=None):
"""Gibt ersten Artikel des Tages zurück, oder den mit spezifischer post_time."""
conn = get_conn()
if post_time:
row = conn.execute(
"SELECT * FROM articles WHERE date=? AND post_time=?", (date_str, post_time)
).fetchone()
else:
row = conn.execute(
"SELECT * FROM articles WHERE date=? ORDER BY post_time ASC LIMIT 1", (date_str,)
).fetchone()
conn.close()
return dict(row) if row else None
def get_articles_by_date(date_str):
"""Gibt alle Artikel eines Tages zurück (mehrere Slots)."""
conn = get_conn()
rows = conn.execute(
"SELECT * FROM articles WHERE date=? ORDER BY post_time ASC", (date_str,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def get_article_by_id(article_id):
conn = get_conn()
row = conn.execute("SELECT * FROM articles WHERE id=?", (article_id,)).fetchone()
conn.close()
return dict(row) if row else None
def slot_is_taken(date_str, post_time, exclude_id=None):
"""Prüft ob ein Zeitslot bereits belegt ist."""
conn = get_conn()
if exclude_id:
row = conn.execute(
"SELECT id FROM articles WHERE date=? AND post_time=? AND id!=? AND status NOT IN ('skipped','rejected')",
(date_str, post_time, exclude_id)
).fetchone()
else:
row = conn.execute(
"SELECT id FROM articles WHERE date=? AND post_time=? AND status NOT IN ('skipped','rejected')",
(date_str, post_time)
).fetchone()
conn.close()
return row is not None
def get_taken_slots(date_str):
"""Gibt alle belegten Zeitslots eines Tages zurück."""
conn = get_conn()
rows = conn.execute(
"SELECT post_time FROM articles WHERE date=? AND status NOT IN ('skipped','rejected')",
(date_str,)
).fetchall()
conn.close()
return [r[0] for r in rows]
def create_article(date_str, source_input, content, prompt_id, tag="allgemein", post_time="19:55"):
conn = get_conn()
try:
conn.execute(
"INSERT INTO articles (date, post_time, source_input, content_raw, content_final, prompt_id, tag) VALUES (?,?,?,?,?,?,?)",
(date_str, post_time, source_input, content, content, prompt_id, tag)
)
conn.execute(
"INSERT INTO article_versions (article_id, version_nr, content) "
"VALUES ((SELECT id FROM articles WHERE date=? AND post_time=?), 1, ?)",
(date_str, post_time, content)
)
conn.commit()
flog.article_generated(date_str, source_input or '', 1, tag)
finally:
conn.close()
def reschedule_article(article_id, new_date, new_post_time):
"""Verschiebt einen Artikel auf einen neuen Datum/Zeit-Slot."""
conn = get_conn()
try:
conn.execute(
"UPDATE articles SET date=?, post_time=? WHERE id=?",
(new_date, new_post_time, article_id)
)
conn.commit()
flog.info('article_rescheduled', article_id=article_id,
new_date=new_date, new_post_time=new_post_time)
return True
except sqlite3.IntegrityError:
flog.slot_conflict(new_date, new_post_time)
return False
finally:
conn.close()
def delete_article(article_id):
conn = get_conn()
conn.execute("DELETE FROM article_versions WHERE article_id=?", (article_id,))
conn.execute("DELETE FROM articles WHERE id=?", (article_id,))
conn.commit()
conn.close()
flog.info('article_deleted', article_id=article_id)
def update_article_status(date_str, status, message_id=None, chat_id=None, post_time=None):
conn = get_conn()
ts = datetime.utcnow().isoformat()
where = "date=? AND post_time=?" if post_time else "date=?"
params_base = (date_str, post_time) if post_time else (date_str,)
if status == 'approved':
conn.execute(
f"UPDATE articles SET status=?, approved_at=?, review_message_id=?, review_chat_id=? WHERE {where}",
(status, ts, message_id, chat_id) + params_base
)
elif status == 'posted':
conn.execute(
f"UPDATE articles SET status=?, posted_at=? WHERE {where}",
(status, ts) + params_base
)
elif status == 'sent_to_bot':
conn.execute(
f"UPDATE articles SET status=?, sent_to_bot_at=?, review_message_id=?, review_chat_id=? WHERE {where}",
(status, ts, message_id, chat_id) + params_base
)
elif status == 'scheduled':
conn.execute(
f"UPDATE articles SET status=?, scheduled_at=? WHERE {where}",
(status, ts) + params_base
)
else:
conn.execute(f"UPDATE articles SET status=? WHERE {where}", (status,) + params_base)
conn.commit()
conn.close()
def schedule_article(date_str, post_time, notify_at):
"""Artikel einplanen mit Bot-Benachrichtigungs-Zeitpunkt."""
conn = get_conn()
ts = datetime.utcnow().isoformat()
conn.execute(
"UPDATE articles SET status='scheduled', scheduled_at=?, post_time=?, notify_at=? WHERE date=? AND post_time=?",
(ts, post_time, notify_at, date_str, post_time)
)
conn.commit()
conn.close()
flog.article_scheduled(date_str, post_time, notify_at)
def get_due_notifications():
"""Gibt alle scheduled-Artikel zurück deren notify_at <= jetzt."""
conn = get_conn()
now = datetime.utcnow().isoformat()
rows = conn.execute(
"SELECT * FROM articles WHERE status='scheduled' AND notify_at IS NOT NULL AND notify_at <= ?",
(now,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def update_article_content(date_str, content, new_version=False, post_time=None):
conn = get_conn()
where = "date=? AND post_time=?" if post_time else "date=?"
params_base = (date_str, post_time) if post_time else (date_str,)
if new_version:
version = conn.execute(
f"SELECT version FROM articles WHERE {where}", params_base
).fetchone()
new_v = (version[0] or 1) + 1
conn.execute(
f"UPDATE articles SET content_raw=?, content_final=?, version=?, status='pending_review' WHERE {where}",
(content, content, new_v) + params_base
)
article_id = conn.execute(
f"SELECT id FROM articles WHERE {where}", params_base
).fetchone()[0]
conn.execute(
"INSERT INTO article_versions (article_id, version_nr, content) VALUES (?,?,?)",
(article_id, new_v, content)
)
else:
conn.execute(
f"UPDATE articles SET content_final=? WHERE {where}", (content,) + params_base
)
conn.commit()
conn.close()
def save_post_history(date_str, channel_message_id, post_time=None):
conn = get_conn()
where = "date=? AND post_time=?" if post_time else "date=?"
params = (date_str, post_time) if post_time else (date_str,)
article_id = conn.execute(
f"SELECT id FROM articles WHERE {where}", params
).fetchone()
if article_id:
conn.execute(
"INSERT INTO post_history (article_id, channel_message_id) VALUES (?,?)",
(article_id[0], channel_message_id)
)
conn.commit()
conn.close()
def get_recent_articles(limit=30):
conn = get_conn()
rows = conn.execute(
"SELECT * FROM articles ORDER BY date DESC, post_time ASC LIMIT ?", (limit,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def get_recent_posted_articles(limit=30):
conn = get_conn()
rows = conn.execute(
"SELECT * FROM articles WHERE status='posted' ORDER BY posted_at DESC LIMIT ?",
(limit,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def get_week_articles(from_date, to_date):
conn = get_conn()
rows = conn.execute(
"SELECT * FROM articles WHERE date BETWEEN ? AND ? ORDER BY date ASC, post_time ASC",
(from_date, to_date)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def get_last_posted():
conn = get_conn()
row = conn.execute(
"SELECT date, post_time, posted_at FROM articles WHERE status='posted' ORDER BY posted_at DESC LIMIT 1"
).fetchone()
conn.close()
return dict(row) if row else None
# ── Prompts ───────────────────────────────────────────────────────────────────
def get_prompts():
conn = get_conn()
rows = conn.execute("SELECT * FROM prompts ORDER BY is_default DESC, id ASC").fetchall()
conn.close()
return [dict(r) for r in rows]
def get_default_prompt():
conn = get_conn()
row = conn.execute("SELECT * FROM prompts WHERE is_default=1 LIMIT 1").fetchone()
if not row:
row = conn.execute("SELECT * FROM prompts LIMIT 1").fetchone()
conn.close()
return dict(row) if row else None
def save_prompt(prompt_id, name, system_prompt):
conn = get_conn()
conn.execute("UPDATE prompts SET name=?, system_prompt=? WHERE id=?", (name, system_prompt, prompt_id))
conn.commit()
conn.close()
def create_prompt(name, system_prompt):
conn = get_conn()
conn.execute("INSERT INTO prompts (name, system_prompt) VALUES (?,?)", (name, system_prompt))
conn.commit()
conn.close()
def set_default_prompt(prompt_id):
conn = get_conn()
conn.execute("UPDATE prompts SET is_default=0")
conn.execute("UPDATE prompts SET is_default=1 WHERE id=?", (prompt_id,))
conn.commit()
conn.close()
def delete_prompt(prompt_id):
conn = get_conn()
conn.execute("DELETE FROM prompts WHERE id=? AND is_default=0", (prompt_id,))
conn.commit()
conn.close()
def save_prompt_test_result(prompt_id, result):
conn = get_conn()
conn.execute(
"UPDATE prompts SET test_result=?, last_tested_at=? WHERE id=?",
(result, datetime.utcnow().isoformat(), prompt_id)
)
conn.commit()
conn.close()
# ── Channel / Settings ────────────────────────────────────────────────────────
def get_channel():
conn = get_conn()
row = conn.execute("SELECT * FROM channels WHERE active=1 LIMIT 1").fetchone()
conn.close()
return dict(row) if row else {}
def update_channel(telegram_id, post_time="19:55"):
conn = get_conn()
conn.execute(
"UPDATE channels SET telegram_id=?, post_time=? WHERE active=1",
(telegram_id, post_time)
)
conn.commit()
conn.close()
# ── Reviewers ─────────────────────────────────────────────────────────────────
def get_reviewers(active_only=True):
conn = get_conn()
if active_only:
rows = conn.execute(
"SELECT * FROM reviewers WHERE active=1 ORDER BY added_at ASC"
).fetchall()
else:
rows = conn.execute("SELECT * FROM reviewers ORDER BY added_at ASC").fetchall()
conn.close()
return [dict(r) for r in rows]
def add_reviewer(chat_id: int, name: str):
conn = get_conn()
try:
conn.execute(
"INSERT INTO reviewers (chat_id, name, active) VALUES (?,?,1)",
(chat_id, name)
)
conn.commit()
flog.reviewer_added(chat_id, name)
return True
except sqlite3.IntegrityError:
return False
finally:
conn.close()
def remove_reviewer(chat_id: int):
conn = get_conn()
conn.execute("UPDATE reviewers SET active=0 WHERE chat_id=?", (chat_id,))
conn.commit()
conn.close()
flog.reviewer_removed(chat_id)
def get_reviewer_chat_ids():
"""Gibt aktive Reviewer-Chat-IDs aus DB zurück, Fallback auf ENV."""
reviewers = get_reviewers(active_only=True)
if reviewers:
return [r['chat_id'] for r in reviewers]
# Fallback: ENV
import os
ids = []
raw = os.environ.get('REVIEW_CHAT_IDS', '')
if raw.strip():
for part in raw.split(','):
try:
ids.append(int(part.strip()))
except ValueError:
pass
admin = os.environ.get('ADMIN_CHAT_ID', '')
if admin:
try:
ids.append(int(admin))
except ValueError:
pass
unique = []
for cid in ids:
if cid not in unique:
unique.append(cid)
return unique
# ── Favorites ─────────────────────────────────────────────────────────────────
def get_favorites():
conn = get_conn()
rows = conn.execute("SELECT * FROM sources_favorites ORDER BY used_count DESC").fetchall()
conn.close()
return [dict(r) for r in rows]
def add_favorite(label, url):
conn = get_conn()
conn.execute("INSERT INTO sources_favorites (label, url) VALUES (?,?)", (label, url))
conn.commit()
conn.close()
def increment_favorite(fav_id):
conn = get_conn()
conn.execute("UPDATE sources_favorites SET used_count=used_count+1 WHERE id=?", (fav_id,))
conn.commit()
conn.close()
# ── Tags ──────────────────────────────────────────────────────────────────────
def get_tags():
conn = get_conn()
rows = conn.execute("SELECT * FROM tags ORDER BY name").fetchall()
conn.close()
return [dict(r) for r in rows]
# ── Locations ─────────────────────────────────────────────────────────────────
def get_locations():
conn = get_conn()
rows = conn.execute("SELECT * FROM locations ORDER BY id").fetchall()
conn.close()
return [dict(r) for r in rows]
def get_current_location():
conn = get_conn()
loc_id = conn.execute(
"SELECT value FROM settings WHERE key='user_location_id'"
).fetchone()
if not loc_id:
conn.close()
return None
row = conn.execute("SELECT * FROM locations WHERE id=?", (loc_id[0],)).fetchone()
conn.close()
return dict(row) if row else None
def set_location(location_id):
conn = get_conn()
conn.execute(
"INSERT OR REPLACE INTO settings (key, value) VALUES ('user_location_id', ?)",
(str(location_id),)
)
conn.commit()
conn.close()
def get_reminder_times_in_berlin(location: dict) -> tuple:
import pytz
from datetime import datetime, date
user_tz = pytz.timezone(location['timezone'])
berlin_tz = pytz.timezone('Europe/Berlin')
today = date.today()
def convert(local_time_str):
h, m = map(int, local_time_str.split(':'))
local_dt = user_tz.localize(datetime(today.year, today.month, today.day, h, m))
berlin_dt = local_dt.astimezone(berlin_tz)
return berlin_dt.hour, berlin_dt.minute
return convert(location['reminder_morning']), convert(location['reminder_afternoon'])
# ── Stats ─────────────────────────────────────────────────────────────────────
def get_monthly_stats():
conn = get_conn()
from datetime import date
month = date.today().strftime('%Y-%m')
total = conn.execute(
"SELECT COUNT(*) FROM articles WHERE date LIKE ?", (f"{month}%",)
).fetchone()[0]
posted = conn.execute(
"SELECT COUNT(*) FROM articles WHERE date LIKE ? AND status='posted'", (f"{month}%",)
).fetchone()[0]
skipped = conn.execute(
"SELECT COUNT(*) FROM articles WHERE date LIKE ? AND status='skipped'", (f"{month}%",)
).fetchone()[0]
avg_version = conn.execute(
"SELECT AVG(version) FROM articles WHERE date LIKE ?", (f"{month}%",)
).fetchone()[0] or 0
conn.close()
return {"total": total, "posted": posted, "skipped": skipped, "avg_version": round(avg_version, 1)}