homelab-brain/redax-wp/src/rss_fetcher.py
root 064ae085b5 redax-wp: Sprint 1+2 — vollständiger Stack
Infrastruktur:
- CT 113 auf pve-hetzner erstellt (Docker, Tailscale)
- Forgejo-Repo redax-wp angelegt

Code (Sprint 2):
- docker-compose.yml: wordpress + db + redax-web
- .env.example mit allen Variablen
- database.py: articles, feeds, feed_items, prompts, settings
- wordpress.py: WP REST API Client (create/update post, media upload, Yoast SEO)
- rss_fetcher.py: Feed-Import, Blacklist, Teaser-Modus, KI-Rewrite
- app.py: Flask Dashboard, Scheduler (publish/rss/briefing), alle API-Routen
- templates: base, login, index (Zwei-Spalten-Editor), feeds, history, prompts, settings, hilfe
- README.md + .gitignore

Made-with: Cursor
2026-02-27 07:52:31 +07:00

157 lines
5.7 KiB
Python

import os
import feedparser
import requests
from bs4 import BeautifulSoup
from datetime import datetime
import database as db
import logger as flog
import openrouter
BLACKLIST_DEFAULT = ['Anzeige:', 'Sponsored', 'Werbung', 'Advertisement', '[Anzeige]']
def _is_blacklisted(title: str, blacklist_str: str) -> bool:
terms = [t.strip() for t in blacklist_str.split(',') if t.strip()] + BLACKLIST_DEFAULT
return any(term.lower() in title.lower() for term in terms)
def _extract_og_image(url: str) -> str | None:
try:
r = requests.get(url, timeout=10, headers={'User-Agent': 'Mozilla/5.0'})
soup = BeautifulSoup(r.text, 'lxml')
tag = soup.find('meta', property='og:image')
return tag['content'] if tag and tag.get('content') else None
except Exception:
return None
def fetch_feed(feed: dict) -> int:
"""Fetch a single feed, save new items. Returns count of new items."""
new_count = 0
try:
parsed = feedparser.parse(feed['url'])
for entry in parsed.entries:
guid = getattr(entry, 'id', entry.get('link', ''))
title = entry.get('title', '').strip()
url = entry.get('link', '')
summary = entry.get('summary', '')
published = entry.get('published', datetime.utcnow().isoformat())
if not guid or not title or not url:
continue
if _is_blacklisted(title, feed.get('blacklist', '')):
flog.info('rss_blacklisted', feed=feed['name'], title=title)
continue
if db.guid_exists(feed['id'], guid):
continue
is_new = db.save_feed_item(feed['id'], guid, title, url, summary, published)
if is_new:
new_count += 1
db.update_feed(feed['id'], {
'last_fetched_at': datetime.utcnow().isoformat(),
'last_error': ''
})
flog.info('rss_fetched', feed=feed['name'], new_items=new_count)
except Exception as e:
db.update_feed(feed['id'], {'last_error': str(e)})
flog.error('rss_fetch_failed', feed=feed['name'], error=str(e))
return new_count
def process_auto_publish(feed: dict, item: dict):
"""Process a feed item for auto-publish (teaser or KI-rewrite)."""
try:
title = item['title']
source_url = item['url']
summary = item.get('summary', '')
og_image = _extract_og_image(source_url)
if feed.get('ki_rewrite'):
content, seo_title, seo_desc, keyword = _ki_rewrite(title, source_url, summary)
elif feed.get('teaser_only', 1):
content = _build_teaser(title, summary, source_url)
seo_title = title[:60]
seo_desc = summary[:155] if summary else ''
keyword = ''
else:
content = summary
seo_title = title[:60]
seo_desc = ''
keyword = ''
article_id = db.create_article({
'title': title,
'content': content,
'source_url': source_url,
'article_type': 'rss',
'source_feed_id': feed['id'],
'status': 'scheduled',
'tone': 'informativ',
'category_id': feed.get('category_id'),
'featured_image_url': og_image,
'seo_title': seo_title,
'seo_description': seo_desc,
'focus_keyword': keyword,
'send_to_telegram': 0, # RSS-Artikel nie auf Telegram
})
db.update_feed_item_status(item['id'], 'queued', article_id)
flog.info('rss_article_queued', feed=feed['name'], title=title, article_id=article_id)
return article_id
except Exception as e:
flog.error('rss_process_failed', feed=feed['name'], error=str(e))
return None
def _ki_rewrite(title: str, url: str, summary: str) -> tuple:
"""KI rewrites a RSS article. Returns (content, seo_title, seo_desc, keyword)."""
prompt = db.get_default_prompt()
system = prompt['system_prompt'] if prompt else 'Schreibe einen Artikel.'
source_text = f"Titel: {title}\nURL: {url}\nZusammenfassung: {summary}"
system = system.replace('{tone}', 'informativ').replace('{date}', datetime.now().strftime('%d.%m.%Y'))
raw = openrouter.generate(system, source_text)
return _parse_ki_output(raw)
def _parse_ki_output(raw: str) -> tuple:
"""Parse KI output into (content, seo_title, seo_desc, keyword)."""
lines = raw.strip().split('\n')
seo_title, seo_desc, keyword = '', '', ''
content_lines = []
for line in lines:
if line.startswith('SEO_TITLE:'):
seo_title = line.replace('SEO_TITLE:', '').strip()
elif line.startswith('SEO_DESC:'):
seo_desc = line.replace('SEO_DESC:', '').strip()
elif line.startswith('KEYWORD:'):
keyword = line.replace('KEYWORD:', '').strip()
else:
content_lines.append(line)
content = '\n'.join(content_lines).strip()
return content, seo_title, seo_desc, keyword
def _build_teaser(title: str, summary: str, url: str) -> str:
"""Build a teaser post that links back to the original source."""
clean_summary = BeautifulSoup(summary, 'lxml').get_text()[:400] if summary else ''
return f"""<p>{clean_summary}</p>
<p><a href="{url}" target="_blank" rel="noopener">➜ Weiterlesen beim Original</a></p>"""
def run_all_feeds():
"""Fetch all active feeds and process auto-publish items."""
feeds = db.get_feeds(active_only=True)
for feed in feeds:
new_items = fetch_feed(feed)
if feed.get('auto_publish') and new_items > 0:
items = db.get_feed_queue(status='new')
feed_items = [i for i in items if i['feed_id'] == feed['id']]
for item in feed_items:
process_auto_publish(feed, item)