homelab-brain/fuenfvoacht/src/app.py

560 lines
20 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, render_template, request, jsonify, redirect, url_for, Response
from functools import wraps
from datetime import datetime, date, timedelta
import os
import asyncio
import logging
import requests as req_lib
import database as db
import openrouter
import logger as flog
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
TZ_NAME = os.environ.get('TIMEZONE', 'Europe/Berlin')
BOT_TOKEN = os.environ.get('TELEGRAM_BOT_TOKEN', '')
POST_TIME = os.environ.get('POST_TIME', '19:55')
AUTH_USER = os.environ.get('AUTH_USER', 'Holgerhh')
AUTH_PASS = os.environ.get('AUTH_PASS', 'ddlhh')
BRAND_MARKER = "Pax et Lux Terranaut01 https://t.me/DieneDemLeben"
BRAND_SIGNATURE = (
"Wir schützen die Zukunft unserer Kinder und das Leben❤\n\n"
"Pax et Lux Terranaut01 https://t.me/DieneDemLeben\n\n"
"Unterstützt die Menschen, die für Uns einstehen❗"
)
def check_auth(username, password):
return username == AUTH_USER and password == AUTH_PASS
def authenticate():
return Response(
'Zugang verweigert.', 401,
{'WWW-Authenticate': 'Basic realm="FünfVorAcht"'})
@app.before_request
def before_request_auth():
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
@app.after_request
def add_no_cache(response):
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0'
response.headers['Pragma'] = 'no-cache'
return response
def today_str():
import pytz
return datetime.now(pytz.timezone(TZ_NAME)).strftime('%Y-%m-%d')
def today_display():
import pytz
return datetime.now(pytz.timezone(TZ_NAME)).strftime('%d. %B %Y')
def week_range():
today = date.today()
start = today - timedelta(days=today.weekday())
return [(start + timedelta(days=i)).strftime('%Y-%m-%d') for i in range(7)]
def planning_days(count=7):
import pytz
tz = pytz.timezone(TZ_NAME)
t = datetime.now(tz).date()
return [(t + timedelta(days=i)).strftime('%Y-%m-%d') for i in range(count)]
def with_branding(content: str) -> str:
text = (content or "").rstrip()
if BRAND_MARKER in text:
return text
return f"{text}\n\n{BRAND_SIGNATURE}" if text else BRAND_SIGNATURE
def send_telegram_message(chat_id, text, reply_markup=None):
url = f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage"
payload = {"chat_id": chat_id, "text": text, "parse_mode": "HTML"}
if reply_markup:
import json
payload["reply_markup"] = json.dumps(reply_markup)
try:
r = req_lib.post(url, json=payload, timeout=10)
return r.json()
except Exception as e:
logger.error("Telegram send fehlgeschlagen: %s", e)
return None
def notify_all_reviewers(text, reply_markup=None):
results = []
for chat_id in db.get_reviewer_chat_ids():
result = send_telegram_message(chat_id, text, reply_markup)
results.append({'chat_id': chat_id, 'ok': bool(result and result.get('ok'))})
return results
# ── Main Dashboard ────────────────────────────────────────────────────────────
@app.route('/')
def index():
today = today_str()
articles_today = db.get_articles_by_date(today)
article_today = articles_today[0] if articles_today else None
week_days = week_range()
week_articles_raw = db.get_week_articles(week_days[0], week_days[-1])
# Mehrere Artikel pro Tag: dict date → list
week_articles = {}
for a in week_articles_raw:
week_articles.setdefault(a['date'], []).append(a)
recent = db.get_recent_articles(10)
stats = db.get_monthly_stats()
channel = db.get_channel()
prompts = db.get_prompts()
tags = db.get_tags()
favorites = db.get_favorites()
locations = db.get_locations()
current_location = db.get_current_location()
reviewers = db.get_reviewers()
last_posted = db.get_last_posted()
plan_days = planning_days(7)
plan_raw = db.get_week_articles(plan_days[0], plan_days[-1])
plan_articles = {}
for a in plan_raw:
plan_articles.setdefault(a['date'], []).append(a)
month_start = date.today().replace(day=1).strftime('%Y-%m-%d')
month_end = (date.today().replace(day=28) + timedelta(days=4)).replace(day=1) - timedelta(days=1)
month_articles = {}
for a in db.get_week_articles(month_start, month_end.strftime('%Y-%m-%d')):
month_articles.setdefault(a['date'], []).append(a['status'])
return render_template('index.html',
today=today,
article_today=article_today,
articles_today=articles_today,
week_days=week_days,
week_articles=week_articles,
plan_days=plan_days,
plan_articles=plan_articles,
month_articles=month_articles,
recent=recent,
stats=stats,
channel=channel,
post_time=POST_TIME,
prompts=prompts,
tags=tags,
favorites=favorites,
locations=locations,
current_location=current_location,
reviewers=reviewers,
last_posted=last_posted)
# ── History ───────────────────────────────────────────────────────────────────
@app.route('/history')
def history():
articles = db.get_recent_articles(30)
return render_template('history.html', articles=articles)
# ── Prompts ───────────────────────────────────────────────────────────────────
@app.route('/prompts')
def prompts():
all_prompts = db.get_prompts()
return render_template('prompts.html', prompts=all_prompts)
@app.route('/prompts/save', methods=['POST'])
def save_prompt():
pid = request.form.get('id')
name = request.form.get('name', '').strip()
system_prompt = request.form.get('system_prompt', '').strip()
if pid:
db.save_prompt(int(pid), name, system_prompt)
else:
db.create_prompt(name, system_prompt)
return redirect(url_for('prompts'))
@app.route('/prompts/default/<int:pid>', methods=['POST'])
def set_default_prompt(pid):
db.set_default_prompt(pid)
return redirect(url_for('prompts'))
@app.route('/prompts/delete/<int:pid>', methods=['POST'])
def delete_prompt(pid):
db.delete_prompt(pid)
return redirect(url_for('prompts'))
@app.route('/prompts/test', methods=['POST'])
def test_prompt():
data = request.get_json()
system_prompt = data.get('system_prompt', '')
source = data.get('source', 'https://tagesschau.de')
tag = data.get('tag', 'Politik')
prompt_id = data.get('prompt_id')
import pytz
date_display = datetime.now(pytz.timezone(TZ_NAME)).strftime('%d. %B %Y')
try:
result = asyncio.run(openrouter.generate_article(source, system_prompt, date_display, tag))
if prompt_id:
db.save_prompt_test_result(int(prompt_id), result)
return jsonify({'success': True, 'result': result})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
# ── Hilfe ─────────────────────────────────────────────────────────────────────
@app.route('/hilfe')
def hilfe():
return render_template('hilfe.html')
# ── Settings ──────────────────────────────────────────────────────────────────
@app.route('/settings')
def settings():
channel = db.get_channel()
favorites = db.get_favorites()
tags = db.get_tags()
reviewers = db.get_reviewers(active_only=False)
return render_template('settings.html', channel=channel,
favorites=favorites, tags=tags,
reviewers=reviewers)
@app.route('/settings/channel', methods=['POST'])
def save_channel():
telegram_id = request.form.get('telegram_id', '').strip()
post_time = request.form.get('post_time', '19:55').strip()
db.update_channel(telegram_id, post_time)
return redirect(url_for('settings'))
@app.route('/settings/favorite/add', methods=['POST'])
def add_favorite():
label = request.form.get('label', '').strip()
url = request.form.get('url', '').strip()
if label and url:
db.add_favorite(label, url)
return redirect(url_for('settings'))
# ── Reviewer API ──────────────────────────────────────────────────────────────
@app.route('/api/reviewers', methods=['GET'])
def api_reviewers():
return jsonify(db.get_reviewers(active_only=False))
@app.route('/api/reviewers/add', methods=['POST'])
def api_add_reviewer():
data = request.get_json()
chat_id = data.get('chat_id')
name = data.get('name', '').strip()
if not chat_id or not name:
return jsonify({'success': False, 'error': 'chat_id und name erforderlich'})
try:
chat_id = int(chat_id)
except ValueError:
return jsonify({'success': False, 'error': 'Ungültige Chat-ID'})
added = db.add_reviewer(chat_id, name)
if not added:
return jsonify({'success': False, 'error': 'Chat-ID bereits vorhanden'})
# Willkommensnachricht
welcome = (
f"👋 <b>Willkommen bei FünfVorAcht!</b>\n\n"
f"Du wurdest als Redakteur hinzugefügt.\n"
f"Ab jetzt erhältst du Reviews, Reminder und Status-Meldungen.\n\n"
f"/start für eine Übersicht aller Befehle."
)
send_telegram_message(chat_id, welcome)
return jsonify({'success': True})
@app.route('/api/reviewers/remove', methods=['POST'])
def api_remove_reviewer():
data = request.get_json()
chat_id = data.get('chat_id')
if not chat_id:
return jsonify({'success': False, 'error': 'chat_id erforderlich'})
db.remove_reviewer(int(chat_id))
return jsonify({'success': True})
# ── API Endpoints ─────────────────────────────────────────────────────────────
@app.route('/api/generate', methods=['POST'])
def api_generate():
data = request.get_json()
source = data.get('source', '').strip()
tag = data.get('tag', 'allgemein')
prompt_id = data.get('prompt_id')
date_str = data.get('date', today_str())
post_time = data.get('post_time', POST_TIME)
if not source:
return jsonify({'success': False, 'error': 'Keine Quelle angegeben'})
prompt = None
if prompt_id:
all_prompts = db.get_prompts()
prompt = next((p for p in all_prompts if str(p['id']) == str(prompt_id)), None)
if not prompt:
prompt = db.get_default_prompt()
if not prompt:
return jsonify({'success': False, 'error': 'Kein Prompt konfiguriert'})
try:
content = asyncio.run(
openrouter.generate_article(source, prompt['system_prompt'], today_display(), tag)
)
existing = db.get_article_by_date(date_str, post_time)
if existing:
db.update_article_content(date_str, content, new_version=True, post_time=post_time)
conn = db.get_conn()
conn.execute(
"UPDATE articles SET source_input=?, tag=?, status='draft' WHERE date=? AND post_time=?",
(source, tag, date_str, post_time)
)
conn.commit()
conn.close()
else:
db.create_article(date_str, source, content, prompt['id'], tag, post_time)
flog.article_generated(date_str, source, 1, tag)
return jsonify({'success': True, 'content': content})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/article/<date_str>/save', methods=['POST'])
def api_save(date_str):
data = request.get_json()
content = data.get('content', '').strip()
post_time = data.get('post_time', POST_TIME)
if not content:
return jsonify({'success': False, 'error': 'Kein Inhalt'})
existing = db.get_article_by_date(date_str, post_time)
if existing:
db.update_article_content(date_str, content, post_time=post_time)
flog.article_saved(date_str, post_time)
return jsonify({'success': True, 'article_id': existing['id']})
all_today = db.get_articles_by_date(date_str)
draft = next((a for a in all_today if a['status'] == 'draft'), None)
candidate = draft or next((a for a in all_today if a['status'] in ('pending_review', 'scheduled', 'sent_to_bot', 'approved')), None)
if candidate:
old_time = candidate['post_time']
conn = db.get_conn()
conn.execute("UPDATE articles SET post_time=? WHERE date=? AND post_time=?", (post_time, date_str, old_time))
conn.commit()
conn.close()
db.update_article_content(date_str, content, post_time=post_time)
flog.article_saved(date_str, post_time)
return jsonify({'success': True, 'article_id': candidate['id']})
# Kein Artikel vorhanden → neu erstellen (z.B. manueller Eintrag für zukünftigen Tag)
db.create_article(date_str, '', content, None, 'allgemein', post_time)
new_article = db.get_article_by_date(date_str, post_time)
flog.article_saved(date_str, post_time)
return jsonify({'success': True, 'article_id': new_article['id'] if new_article else None})
@app.route('/api/article/<date_str>/schedule', methods=['POST'])
def api_schedule(date_str):
"""Artikel einplanen: post_time + notify_at setzen."""
data = request.get_json()
post_time = data.get('post_time', POST_TIME)
notify_mode = data.get('notify_mode', 'auto') # sofort | auto | custom
notify_at_custom = data.get('notify_at')
existing = db.get_article_by_date(date_str, post_time)
if not existing:
all_today = db.get_articles_by_date(date_str)
candidate = next((a for a in all_today if a['status'] in ('draft', 'scheduled', 'sent_to_bot', 'pending_review', 'approved')), None)
if not candidate:
return jsonify({'success': False, 'error': 'Kein Artikel für diesen Tag gefunden'})
old_time = candidate['post_time']
conn = db.get_conn()
conn.execute(
"UPDATE articles SET post_time=? WHERE date=? AND post_time=?",
(post_time, date_str, old_time)
)
conn.commit()
conn.close()
conn = db.get_conn()
ts = datetime.utcnow().isoformat()
conn.execute(
"UPDATE articles SET status='approved', scheduled_at=?, notify_at=? WHERE date=? AND post_time=?",
(ts, ts, date_str, post_time)
)
conn.commit()
conn.close()
notify_all_reviewers(
f"📅 <b>Artikel eingeplant</b>\n\n"
f"📆 {date_str} um <b>{post_time} Uhr</b>\n"
f"Wird automatisch gepostet."
)
flog.article_scheduled(date_str, post_time, ts)
return jsonify({'success': True})
@app.route('/api/article/<int:article_id>/reschedule', methods=['POST'])
def api_reschedule(article_id):
data = request.get_json()
new_date = data.get('date')
new_time = data.get('post_time')
if not new_date or not new_time:
return jsonify({'success': False, 'error': 'date und post_time erforderlich'})
# Slot-Konflikt prüfen
if db.slot_is_taken(new_date, new_time, exclude_id=article_id):
taken = db.get_taken_slots(new_date)
flog.slot_conflict(new_date, new_time)
return jsonify({
'success': False,
'error': f'Slot {new_date} {new_time} ist bereits belegt.',
'taken_slots': taken
})
ok = db.reschedule_article(article_id, new_date, new_time)
if ok:
return jsonify({'success': True})
return jsonify({'success': False, 'error': 'Fehler beim Umplanen'})
@app.route('/api/article/<int:article_id>/delete', methods=['POST'])
def api_delete(article_id):
db.delete_article(article_id)
return jsonify({'success': True})
@app.route('/api/slots/<date_str>')
def api_slots(date_str):
"""Gibt belegte Slots für einen Tag zurück."""
taken = db.get_taken_slots(date_str)
return jsonify({'date': date_str, 'taken': taken})
@app.route('/api/article/<date_str>/send-to-bot', methods=['POST'])
def api_send_to_bot(date_str):
data = request.get_json() or {}
post_time = data.get('post_time', POST_TIME)
article = db.get_article_by_date(date_str, post_time)
if not article or not article.get('content_final'):
return jsonify({'success': False, 'error': 'Kein Artikel vorhanden'})
channel = db.get_channel()
pt = channel.get('post_time', post_time)
branded = with_branding(article['content_final'])
text = (
f"📋 <b>Review: {date_str} · {post_time} Uhr</b>\n"
f"Version {article['version']}\n"
f"──────────────────────\n\n"
f"{branded}\n\n"
f"──────────────────────\n"
f"Freigeben oder bearbeiten?"
)
keyboard = {
"inline_keyboard": [[
{"text": "✅ Freigeben", "callback_data": f"approve:{date_str}:{post_time}"},
{"text": "✏️ Bearbeiten", "callback_data": f"edit:{date_str}:{post_time}"}
]]
}
results = notify_all_reviewers(text, keyboard)
ok_count = sum(1 for r in results if r['ok'])
if ok_count > 0:
db.update_article_status(date_str, 'sent_to_bot', post_time=post_time)
flog.article_sent_to_bot(date_str, post_time, [r['chat_id'] for r in results if r['ok']])
return jsonify({'success': True})
return jsonify({'success': False, 'error': 'Kein Reviewer erreichbar'})
@app.route('/api/settings/post-time', methods=['POST'])
def api_save_post_time():
data = request.get_json()
post_time = data.get('post_time', '19:55')
channel = db.get_channel()
db.update_channel(channel.get('telegram_id', ''), post_time)
return jsonify({'success': True})
@app.route('/api/settings/location', methods=['POST'])
def api_set_location():
data = request.get_json()
location_id = data.get('location_id')
if not location_id:
return jsonify({'success': False, 'error': 'Keine Location-ID'})
db.set_location(location_id)
loc = db.get_current_location()
morning, afternoon = db.get_reminder_times_in_berlin(loc)
return jsonify({
'success': True,
'location': loc,
'reminders_berlin': {
'morning': f"{morning[0]:02d}:{morning[1]:02d}",
'afternoon': f"{afternoon[0]:02d}:{afternoon[1]:02d}"
}
})
@app.route('/api/balance')
def api_balance():
try:
balance = asyncio.run(openrouter.get_balance())
return jsonify(balance)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/article/<date_str>')
def api_article(date_str):
post_time = request.args.get('post_time')
article = db.get_article_by_date(date_str, post_time)
if not article:
return jsonify({'error': 'Nicht gefunden'}), 404
return jsonify(article)
@app.route('/api/articles/<date_str>')
def api_articles_for_date(date_str):
"""Alle Artikel eines Tages (alle Slots)."""
articles = db.get_articles_by_date(date_str)
return jsonify(articles)
@app.route('/api/article/<date_str>/approve', methods=['POST'])
def api_approve(date_str):
data = request.get_json() or {}
post_time = data.get('post_time', POST_TIME)
db.update_article_status(date_str, 'approved', post_time=post_time)
flog.article_approved(date_str, post_time, 0)
return jsonify({'success': True})
@app.route('/api/article/<date_str>/skip', methods=['POST'])
def api_skip(date_str):
data = request.get_json() or {}
post_time = data.get('post_time', POST_TIME)
db.update_article_status(date_str, 'skipped', post_time=post_time)
flog.article_skipped(date_str, post_time)
return jsonify({'success': True})
if __name__ == '__main__':
db.init_db()
app.run(host='0.0.0.0', port=8080, debug=False)