import sqlite3
import os
from datetime import datetime, date
DB_PATH = os.environ.get('DB_PATH', '/data/redax.db')
def get_conn():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def init_db():
conn = get_conn()
c = conn.cursor()
c.executescript("""
CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
content TEXT,
content_raw TEXT,
source_url TEXT,
article_type TEXT NOT NULL DEFAULT 'ki', -- 'ki' or 'rss'
source_feed_id INTEGER REFERENCES feeds(id),
status TEXT NOT NULL DEFAULT 'draft',
tone TEXT DEFAULT 'informativ',
post_date TEXT,
post_time TEXT DEFAULT '19:00',
wp_post_id INTEGER,
wp_url TEXT,
category_id INTEGER,
featured_image_url TEXT,
seo_title TEXT,
seo_description TEXT,
focus_keyword TEXT,
send_to_telegram INTEGER DEFAULT 0,
version INTEGER DEFAULT 1,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now')),
published_at TEXT,
UNIQUE(post_date, post_time)
);
CREATE TABLE IF NOT EXISTS feeds (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
url TEXT NOT NULL UNIQUE,
schedule TEXT DEFAULT '*/2 * * * *',
active INTEGER DEFAULT 1,
auto_publish INTEGER DEFAULT 0,
ki_rewrite INTEGER DEFAULT 0,
teaser_only INTEGER DEFAULT 1,
category_id INTEGER,
blacklist TEXT DEFAULT '',
last_fetched_at TEXT,
last_error TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS feed_items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
feed_id INTEGER NOT NULL REFERENCES feeds(id),
guid TEXT NOT NULL,
title TEXT,
url TEXT,
summary TEXT,
published_at TEXT,
status TEXT DEFAULT 'new',
article_id INTEGER REFERENCES articles(id),
fetched_at TEXT DEFAULT (datetime('now')),
UNIQUE(feed_id, guid)
);
CREATE TABLE IF NOT EXISTS prompts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
system_prompt TEXT NOT NULL,
is_default INTEGER DEFAULT 0,
last_tested_at TEXT,
test_result TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT
);
CREATE TABLE IF NOT EXISTS post_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
article_id INTEGER REFERENCES articles(id),
wp_post_id INTEGER,
wp_url TEXT,
tg_message_id INTEGER,
posted_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS mirror_posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
article_id INTEGER NOT NULL REFERENCES articles(id),
mirror_name TEXT NOT NULL,
mirror_label TEXT,
mirror_wp_id INTEGER,
mirror_url TEXT,
status TEXT DEFAULT 'pending',
error TEXT,
posted_at TEXT DEFAULT (datetime('now')),
UNIQUE(article_id, mirror_name)
);
""")
# Seed default prompt
c.execute("SELECT COUNT(*) FROM prompts")
if c.fetchone()[0] == 0:
c.execute("""
INSERT INTO prompts (name, system_prompt, is_default) VALUES (
'Standard',
'Du bist ein erfahrener Redakteur. Schreibe einen vollständigen, gut strukturierten Artikel auf Basis der folgenden Quelle.
Ton: {tone}
Datum: {date}
Formatierung:
- Titel als erste Zeile (ohne #)
- Dann den Artikeltext in HTML (H2, H3,
,
, )
- Am Ende: SEO_TITLE: [max 60 Zeichen]
- SEO_DESC: [max 155 Zeichen]
- KEYWORD: [1 Fokus-Keyword]
Quelle:
{source}',
1
)
""")
conn.commit()
conn.close()
# ── Articles ──────────────────────────────────────────────────────────────────
def get_articles(limit=50, status=None, article_type=None):
conn = get_conn()
q = "SELECT * FROM articles WHERE 1=1"
params = []
if status:
q += " AND status=?"
params.append(status)
if article_type:
q += " AND article_type=?"
params.append(article_type)
q += " ORDER BY post_date DESC, post_time DESC LIMIT ?"
params.append(limit)
rows = conn.execute(q, params).fetchall()
conn.close()
return [dict(r) for r in rows]
def get_article(article_id):
conn = get_conn()
row = conn.execute("SELECT * FROM articles WHERE id=?", (article_id,)).fetchone()
conn.close()
return dict(row) if row else None
def get_articles_for_date(date_str):
conn = get_conn()
rows = conn.execute(
"SELECT * FROM articles WHERE post_date=? ORDER BY post_time ASC",
(date_str,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def get_week_articles(from_date, to_date):
conn = get_conn()
rows = conn.execute(
"SELECT * FROM articles WHERE post_date BETWEEN ? AND ? ORDER BY post_date ASC, post_time ASC",
(from_date, to_date)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def create_article(data: dict) -> int:
conn = get_conn()
fields = ['title', 'content', 'content_raw', 'source_url', 'article_type',
'source_feed_id', 'status', 'tone', 'post_date', 'post_time',
'category_id', 'featured_image_url', 'seo_title', 'seo_description',
'focus_keyword', 'send_to_telegram']
cols = [f for f in fields if f in data]
vals = [data[f] for f in cols]
sql = f"INSERT INTO articles ({','.join(cols)}) VALUES ({','.join(['?']*len(cols))})"
cur = conn.execute(sql, vals)
article_id = cur.lastrowid
conn.commit()
conn.close()
return article_id
def update_article(article_id, data: dict):
conn = get_conn()
fields = ['title', 'content', 'content_raw', 'source_url', 'status', 'tone',
'post_date', 'post_time', 'wp_post_id', 'wp_url', 'category_id',
'featured_image_url', 'seo_title', 'seo_description', 'focus_keyword',
'send_to_telegram', 'published_at']
updates = {f: data[f] for f in fields if f in data}
updates['updated_at'] = datetime.utcnow().isoformat()
set_clause = ', '.join(f"{k}=?" for k in updates)
sql = f"UPDATE articles SET {set_clause} WHERE id=?"
conn.execute(sql, list(updates.values()) + [article_id])
conn.commit()
conn.close()
def delete_article(article_id):
conn = get_conn()
conn.execute("DELETE FROM articles WHERE id=?", (article_id,))
conn.commit()
conn.close()
def reschedule_article(article_id, new_date, new_time):
conn = get_conn()
existing = conn.execute(
"SELECT id FROM articles WHERE post_date=? AND post_time=? AND id!=?",
(new_date, new_time, article_id)
).fetchone()
if existing:
conn.close()
return False, "Slot bereits belegt"
conn.execute(
"UPDATE articles SET post_date=?, post_time=?, updated_at=? WHERE id=?",
(new_date, new_time, datetime.utcnow().isoformat(), article_id)
)
conn.commit()
conn.close()
return True, "OK"
def get_taken_slots(date_str):
conn = get_conn()
rows = conn.execute(
"SELECT post_time FROM articles WHERE post_date=?", (date_str,)
).fetchall()
conn.close()
return [r['post_time'] for r in rows]
def get_due_articles():
now = datetime.now()
today = now.strftime('%Y-%m-%d')
current_time = now.strftime('%H:%M')
conn = get_conn()
rows = conn.execute("""
SELECT * FROM articles
WHERE status='scheduled'
AND post_date=?
AND post_time<=?
""", (today, current_time)).fetchall()
conn.close()
return [dict(r) for r in rows]
def get_last_published():
conn = get_conn()
row = conn.execute(
"SELECT * FROM articles WHERE status='published' ORDER BY published_at DESC LIMIT 1"
).fetchone()
conn.close()
return dict(row) if row else None
def save_post_history(article_id, wp_post_id, wp_url, tg_message_id=None):
conn = get_conn()
conn.execute(
"INSERT INTO post_history (article_id, wp_post_id, wp_url, tg_message_id) VALUES (?,?,?,?)",
(article_id, wp_post_id, wp_url, tg_message_id)
)
conn.commit()
conn.close()
# ── Feeds ─────────────────────────────────────────────────────────────────────
def get_feeds(active_only=False):
conn = get_conn()
q = "SELECT * FROM feeds"
if active_only:
q += " WHERE active=1"
q += " ORDER BY name ASC"
rows = conn.execute(q).fetchall()
conn.close()
return [dict(r) for r in rows]
def get_feed(feed_id):
conn = get_conn()
row = conn.execute("SELECT * FROM feeds WHERE id=?", (feed_id,)).fetchone()
conn.close()
return dict(row) if row else None
def create_feed(data: dict) -> int:
conn = get_conn()
fields = ['name', 'url', 'schedule', 'active', 'auto_publish', 'ki_rewrite',
'teaser_only', 'category_id', 'blacklist']
cols = [f for f in fields if f in data]
vals = [data[f] for f in cols]
cur = conn.execute(
f"INSERT INTO feeds ({','.join(cols)}) VALUES ({','.join(['?']*len(cols))})",
vals
)
fid = cur.lastrowid
conn.commit()
conn.close()
return fid
def update_feed(feed_id, data: dict):
conn = get_conn()
fields = ['name', 'url', 'schedule', 'active', 'auto_publish', 'ki_rewrite',
'teaser_only', 'category_id', 'blacklist', 'last_fetched_at', 'last_error']
updates = {f: data[f] for f in fields if f in data}
set_clause = ', '.join(f"{k}=?" for k in updates)
conn.execute(f"UPDATE feeds SET {set_clause} WHERE id=?", list(updates.values()) + [feed_id])
conn.commit()
conn.close()
def delete_feed(feed_id):
conn = get_conn()
conn.execute("DELETE FROM feeds WHERE id=?", (feed_id,))
conn.commit()
conn.close()
def get_feed_queue(status='new', limit=50):
conn = get_conn()
rows = conn.execute("""
SELECT fi.*, f.name as feed_name FROM feed_items fi
JOIN feeds f ON fi.feed_id = f.id
WHERE fi.status=?
ORDER BY fi.fetched_at DESC LIMIT ?
""", (status, limit)).fetchall()
conn.close()
return [dict(r) for r in rows]
def save_feed_item(feed_id, guid, title, url, summary, published_at):
conn = get_conn()
try:
conn.execute("""
INSERT OR IGNORE INTO feed_items (feed_id, guid, title, url, summary, published_at)
VALUES (?,?,?,?,?,?)
""", (feed_id, guid, title, url, summary, published_at))
conn.commit()
inserted = conn.execute("SELECT changes()").fetchone()[0]
conn.close()
return inserted > 0
except Exception:
conn.close()
return False
def update_feed_item_status(item_id, status, article_id=None):
conn = get_conn()
conn.execute(
"UPDATE feed_items SET status=?, article_id=? WHERE id=?",
(status, article_id, item_id)
)
conn.commit()
conn.close()
def guid_exists(feed_id, guid):
conn = get_conn()
row = conn.execute(
"SELECT id FROM feed_items WHERE feed_id=? AND guid=?", (feed_id, guid)
).fetchone()
conn.close()
return row is not None
# ── Prompts ───────────────────────────────────────────────────────────────────
def get_prompts():
conn = get_conn()
rows = conn.execute("SELECT * FROM prompts ORDER BY is_default DESC, name ASC").fetchall()
conn.close()
return [dict(r) for r in rows]
def get_default_prompt():
conn = get_conn()
row = conn.execute("SELECT * FROM prompts WHERE is_default=1 LIMIT 1").fetchone()
conn.close()
return dict(row) if row else None
def get_prompt_by_id(pid: int):
conn = get_conn()
row = conn.execute("SELECT * FROM prompts WHERE id=?", (pid,)).fetchone()
conn.close()
return dict(row) if row else None
# ── Settings ──────────────────────────────────────────────────────────────────
def get_setting(key, default=None):
conn = get_conn()
row = conn.execute("SELECT value FROM settings WHERE key=?", (key,)).fetchone()
conn.close()
return row['value'] if row else default
def set_setting(key, value):
conn = get_conn()
conn.execute("INSERT OR REPLACE INTO settings (key,value) VALUES (?,?)", (key, value))
conn.commit()
conn.close()
# ── Mirror Posts ───────────────────────────────────────────────────────────────
def save_mirror_post(article_id: int, mirror_name: str, mirror_label: str,
mirror_wp_id: int = None, mirror_url: str = None,
status: str = 'ok', error: str = None):
conn = get_conn()
conn.execute("""
INSERT INTO mirror_posts (article_id, mirror_name, mirror_label, mirror_wp_id, mirror_url, status, error)
VALUES (?,?,?,?,?,?,?)
ON CONFLICT(article_id, mirror_name) DO UPDATE SET
mirror_wp_id=excluded.mirror_wp_id,
mirror_url=excluded.mirror_url,
status=excluded.status,
error=excluded.error,
posted_at=datetime('now')
""", (article_id, mirror_name, mirror_label, mirror_wp_id, mirror_url, status, error))
conn.commit()
conn.close()
def get_mirror_posts(article_id: int) -> list:
conn = get_conn()
rows = conn.execute(
"SELECT * FROM mirror_posts WHERE article_id=? ORDER BY mirror_name",
(article_id,)
).fetchall()
conn.close()
return [dict(r) for r in rows]