423 lines
14 KiB
Python
423 lines
14 KiB
Python
"""IMAP Mail Client — Liest E-Mails vom All-Inkl Spiegel-Postfach (Read-Only).
|
||
|
||
Stufe 1: Keyword-Filter (IMPORTANT_SENDERS)
|
||
Stufe 2: LLM-Klassifizierung (classify_mails) — trennt Spam/Newsletter von Wichtigem.
|
||
"""
|
||
|
||
import imaplib
|
||
import email
|
||
import json
|
||
import requests as _req
|
||
from email.header import decode_header
|
||
from email.utils import parsedate_to_datetime
|
||
from datetime import datetime, timedelta, timezone
|
||
from typing import Optional
|
||
|
||
IMAP_SERVER = ""
|
||
IMAP_PORT = 993
|
||
MAIL_USER = ""
|
||
MAIL_PASS = ""
|
||
|
||
IMPORTANT_SENDERS = [
|
||
"paypal", "bank", "sparkasse", "postbank", "dkb", "ing-diba", "comdirect",
|
||
"hetzner", "all-inkl", "kasserver", "cloudflare",
|
||
"proxmox", "synology", "tailscale",
|
||
"finanzamt", "elster", "bundesnetzagentur", "polizei", "gericht",
|
||
"enbw", "vattenfall", "enso", "stadtwerke",
|
||
"sat-reisen", "lufthansa", "vietnam airlines", "booking.com",
|
||
"versicherung", "allianz", "huk", "debeka",
|
||
]
|
||
|
||
SPAM_SENDERS = [
|
||
"lebensfreunde", "amazon.de/promotion", "promotion@amazon",
|
||
"alibaba", "aliexpress", "temu",
|
||
"save.tv", "newsletter@bit", "magix", "video deluxe",
|
||
"platforms.ae",
|
||
]
|
||
|
||
|
||
def init(cfg):
|
||
global IMAP_SERVER, IMAP_PORT, MAIL_USER, MAIL_PASS
|
||
IMAP_SERVER = cfg.raw.get("MAIL_IMAP_SERVER", "")
|
||
IMAP_PORT = int(cfg.raw.get("MAIL_IMAP_PORT", "993"))
|
||
MAIL_USER = cfg.raw.get("MAIL_USER", "")
|
||
MAIL_PASS = cfg.raw.get("MAIL_PASS", "")
|
||
|
||
|
||
def _connect() -> Optional[imaplib.IMAP4_SSL]:
|
||
if not IMAP_SERVER or not MAIL_USER or not MAIL_PASS:
|
||
return None
|
||
try:
|
||
m = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT)
|
||
m.login(MAIL_USER, MAIL_PASS)
|
||
return m
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _decode_header_value(raw: str) -> str:
|
||
if not raw:
|
||
return ""
|
||
parts = decode_header(raw)
|
||
decoded = ""
|
||
for part, enc in parts:
|
||
if isinstance(part, bytes):
|
||
decoded += part.decode(enc or "utf-8", errors="replace")
|
||
else:
|
||
decoded += part
|
||
return decoded.strip()
|
||
|
||
|
||
def _parse_mail(msg_data) -> Optional[dict]:
|
||
try:
|
||
raw = msg_data[0][1]
|
||
msg = email.message_from_bytes(raw)
|
||
subj = _decode_header_value(msg.get("Subject", ""))
|
||
frm = _decode_header_value(msg.get("From", ""))
|
||
date_str = msg.get("Date", "")
|
||
try:
|
||
dt = parsedate_to_datetime(date_str)
|
||
except Exception:
|
||
dt = None
|
||
return {
|
||
"subject": subj[:120],
|
||
"from": frm[:80],
|
||
"date": dt,
|
||
"date_str": dt.strftime("%d.%m.%Y %H:%M") if dt else date_str[:20],
|
||
}
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def get_mail_count() -> dict:
|
||
"""Anzahl Mails total und ungelesen."""
|
||
m = _connect()
|
||
if not m:
|
||
return {"error": "IMAP-Verbindung fehlgeschlagen"}
|
||
try:
|
||
m.select("INBOX", readonly=True)
|
||
_, data = m.search(None, "ALL")
|
||
total = len(data[0].split()) if data[0] else 0
|
||
_, data = m.search(None, "UNSEEN")
|
||
unread = len(data[0].split()) if data[0] else 0
|
||
return {"total": total, "unread": unread, "account": MAIL_USER}
|
||
except Exception as e:
|
||
return {"error": str(e)}
|
||
finally:
|
||
m.logout()
|
||
|
||
|
||
def get_recent_mails(count: int = 10) -> list[dict]:
|
||
"""Letzte N Mails (neueste zuerst)."""
|
||
m = _connect()
|
||
if not m:
|
||
return [{"error": "IMAP-Verbindung fehlgeschlagen"}]
|
||
try:
|
||
m.select("INBOX", readonly=True)
|
||
_, data = m.search(None, "ALL")
|
||
ids = data[0].split() if data[0] else []
|
||
if not ids:
|
||
return []
|
||
recent_ids = ids[-count:]
|
||
recent_ids.reverse()
|
||
results = []
|
||
for mid in recent_ids:
|
||
_, msg_data = m.fetch(mid, "(BODY.PEEK[HEADER])")
|
||
parsed = _parse_mail(msg_data)
|
||
if parsed:
|
||
results.append(parsed)
|
||
return results
|
||
except Exception as e:
|
||
return [{"error": str(e)}]
|
||
finally:
|
||
m.logout()
|
||
|
||
|
||
def get_todays_mails() -> list[dict]:
|
||
"""Alle Mails von heute."""
|
||
m = _connect()
|
||
if not m:
|
||
return [{"error": "IMAP-Verbindung fehlgeschlagen"}]
|
||
try:
|
||
m.select("INBOX", readonly=True)
|
||
today = datetime.now().strftime("%d-%b-%Y")
|
||
_, data = m.search(None, f'(SINCE "{today}")')
|
||
ids = data[0].split() if data[0] else []
|
||
results = []
|
||
for mid in ids:
|
||
_, msg_data = m.fetch(mid, "(BODY.PEEK[HEADER])")
|
||
parsed = _parse_mail(msg_data)
|
||
if parsed:
|
||
results.append(parsed)
|
||
results.reverse()
|
||
return results
|
||
except Exception as e:
|
||
return [{"error": str(e)}]
|
||
finally:
|
||
m.logout()
|
||
|
||
|
||
def search_mail(query: str, days: int = 30, limit: int = 15) -> list[dict]:
|
||
"""Suche nach Mails per Absender oder Betreff."""
|
||
m = _connect()
|
||
if not m:
|
||
return [{"error": "IMAP-Verbindung fehlgeschlagen"}]
|
||
try:
|
||
m.select("INBOX", readonly=True)
|
||
since = (datetime.now() - timedelta(days=days)).strftime("%d-%b-%Y")
|
||
|
||
all_results = []
|
||
for criteria in [f'FROM "{query}"', f'SUBJECT "{query}"']:
|
||
try:
|
||
_, data = m.search(None, f'(SINCE "{since}" {criteria})')
|
||
ids = data[0].split() if data[0] else []
|
||
for mid in ids:
|
||
_, msg_data = m.fetch(mid, "(BODY.PEEK[HEADER])")
|
||
parsed = _parse_mail(msg_data)
|
||
if parsed:
|
||
all_results.append(parsed)
|
||
except Exception:
|
||
continue
|
||
|
||
seen = set()
|
||
unique = []
|
||
for r in all_results:
|
||
key = f"{r['date_str']}|{r['subject'][:40]}"
|
||
if key not in seen:
|
||
seen.add(key)
|
||
unique.append(r)
|
||
unique.sort(key=lambda x: x.get("date") or datetime.min.replace(tzinfo=timezone.utc), reverse=True)
|
||
return unique[:limit]
|
||
except Exception as e:
|
||
return [{"error": str(e)}]
|
||
finally:
|
||
m.logout()
|
||
|
||
|
||
def _is_spam_sender(frm_lower: str) -> bool:
|
||
"""Prueft ob Absender in der Spam-Liste steht."""
|
||
return any(s in frm_lower for s in SPAM_SENDERS)
|
||
|
||
|
||
def get_important_mails(hours: int = 24) -> list[dict]:
|
||
"""Mails von wichtigen Absendern (Bank, Hoster, etc.), Spam-Sender ausgefiltert."""
|
||
m = _connect()
|
||
if not m:
|
||
return [{"error": "IMAP-Verbindung fehlgeschlagen"}]
|
||
try:
|
||
m.select("INBOX", readonly=True)
|
||
since = (datetime.now() - timedelta(hours=hours)).strftime("%d-%b-%Y")
|
||
_, data = m.search(None, f'(SINCE "{since}")')
|
||
ids = data[0].split() if data[0] else []
|
||
results = []
|
||
for mid in ids:
|
||
_, msg_data = m.fetch(mid, "(BODY.PEEK[HEADER])")
|
||
parsed = _parse_mail(msg_data)
|
||
if parsed:
|
||
frm_lower = parsed["from"].lower()
|
||
if _is_spam_sender(frm_lower):
|
||
continue
|
||
if any(s in frm_lower for s in IMPORTANT_SENDERS):
|
||
results.append(parsed)
|
||
results.reverse()
|
||
return results
|
||
except Exception as e:
|
||
return [{"error": str(e)}]
|
||
finally:
|
||
m.logout()
|
||
|
||
|
||
CLASSIFY_PROMPT = """Du bekommst eine Liste von E-Mails (Absender + Betreff).
|
||
Klassifiziere JEDE Mail in genau eine Kategorie. Sei STRENG — im Zweifel ist es Newsletter oder Spam.
|
||
|
||
Kategorien:
|
||
- "wichtig": NUR echte persoenliche Nachrichten, Rechnungen, Sicherheitswarnungen (Synology, Server), Bank-Transaktionen, Behoerden, Flugaenderungen, Vertragsrelevantes
|
||
- "aktion": Erfordert DRINGENDE Handlung (Passwort aendern, Zahlung faellig, Sicherheitsluecke, Termin bestaetigen)
|
||
- "info": Nuetzliche Info OHNE Handlungsbedarf (Versandbestaetigung, Status-Update, Ladebeleg)
|
||
- "newsletter": Regelmaessige Newsletter, Markt-Updates, Nachrichten-Digest, Fachzeitschriften, Angebote
|
||
- "spam": Werbung, Marketing, Dating-Benachrichtigungen, Phishing, unbekannte Absender mit verdaechtigem Betreff, "Du hast neue Besucher", Rabattaktionen
|
||
|
||
WICHTIG:
|
||
- "Neue Profilbesucher", "Du hast neue Nachrichten" von Dating/Social = SPAM
|
||
- Absender mit .ae/.ru/.cn Domain ohne Bezug = SPAM
|
||
- "Sale", "Angebot", "nur noch heute", Emojis im Betreff = NEWSLETTER oder SPAM
|
||
- Taegliche Markt-/Aktien-Updates = NEWSLETTER
|
||
- Synology Sicherheitswarnungen = AKTION
|
||
- Rechnungen (EnBW, Telekom etc.) = WICHTIG
|
||
|
||
Antworte NUR mit einem JSON-Array. Pro Mail ein Objekt mit "idx" (0-basiert) und "cat".
|
||
Beispiel: [{"idx":0,"cat":"spam"},{"idx":1,"cat":"wichtig"}]"""
|
||
|
||
|
||
def classify_mails(mails: list[dict], api_key: str) -> list[dict]:
|
||
"""LLM-gestützte Klassifizierung von Mails nach Wichtigkeit."""
|
||
if not mails or not api_key:
|
||
return mails
|
||
|
||
mail_text = "\n".join(
|
||
f"{i}. Von: {m['from'][:50]} | Betreff: {m['subject'][:80]}"
|
||
for i, m in enumerate(mails)
|
||
)
|
||
|
||
OLLAMA_URL = "http://100.84.255.83:11434/v1/chat/completions"
|
||
OLLAMA_MODEL = "qwen2.5:14b"
|
||
|
||
try:
|
||
r = _req.post(
|
||
OLLAMA_URL,
|
||
json={
|
||
"model": OLLAMA_MODEL,
|
||
"messages": [
|
||
{"role": "system", "content": CLASSIFY_PROMPT},
|
||
{"role": "user", "content": mail_text},
|
||
],
|
||
"max_tokens": 2000,
|
||
"temperature": 0,
|
||
},
|
||
timeout=120,
|
||
)
|
||
r.raise_for_status()
|
||
content = r.json()["choices"][0]["message"]["content"]
|
||
content = content.strip()
|
||
if "```" in content:
|
||
content = content.split("```", 1)[-1]
|
||
if content.startswith("json"):
|
||
content = content[4:]
|
||
content = content.rsplit("```", 1)[0]
|
||
content = content.strip()
|
||
classifications = json.loads(content)
|
||
|
||
cat_map = {c["idx"]: c["cat"] for c in classifications}
|
||
for i, m in enumerate(mails):
|
||
m["category"] = cat_map.get(i, "unknown")
|
||
return mails
|
||
except Exception:
|
||
for m in mails:
|
||
m["category"] = "unknown"
|
||
return mails
|
||
|
||
|
||
BATCH_SIZE = 40
|
||
|
||
|
||
def get_smart_digest(hours: int = 24, api_key: str = "") -> dict:
|
||
"""Intelligente Mail-Zusammenfassung: holt Mails, klassifiziert per LLM in Batches, gruppiert."""
|
||
m = _connect()
|
||
if not m:
|
||
return {"error": "IMAP-Verbindung fehlgeschlagen"}
|
||
try:
|
||
m.select("INBOX", readonly=True)
|
||
since = (datetime.now() - timedelta(hours=hours)).strftime("%d-%b-%Y")
|
||
_, data = m.search(None, f'(SINCE "{since}")')
|
||
ids = data[0].split() if data[0] else []
|
||
|
||
mails = []
|
||
for mid in ids:
|
||
_, msg_data = m.fetch(mid, "(BODY.PEEK[HEADER])")
|
||
parsed = _parse_mail(msg_data)
|
||
if parsed:
|
||
mails.append(parsed)
|
||
mails.reverse()
|
||
except Exception as e:
|
||
return {"error": str(e)}
|
||
finally:
|
||
m.logout()
|
||
|
||
if not mails:
|
||
return {"total": 0, "mails": [], "summary": {}}
|
||
|
||
if api_key:
|
||
classified = []
|
||
for i in range(0, len(mails), BATCH_SIZE):
|
||
batch = mails[i:i + BATCH_SIZE]
|
||
batch = classify_mails(batch, api_key)
|
||
classified.extend(batch)
|
||
mails = classified
|
||
|
||
summary = {}
|
||
for m_item in mails:
|
||
cat = m_item.get("category", "unknown")
|
||
summary.setdefault(cat, []).append(m_item)
|
||
|
||
return {"total": len(mails), "mails": mails, "summary": summary}
|
||
|
||
|
||
def format_smart_digest(digest: dict) -> str:
|
||
"""Formatiert den intelligenten Digest für Telegram."""
|
||
if "error" in digest:
|
||
return f"Mail-Fehler: {digest['error']}"
|
||
if digest.get("total", 0) == 0:
|
||
return "Keine neuen Mails im gewählten Zeitraum."
|
||
|
||
lines = [f"📧 Mail-Digest ({digest['total']} Mails)\n"]
|
||
summary = digest.get("summary", {})
|
||
|
||
cat_labels = {
|
||
"wichtig": "🔴 Wichtig",
|
||
"aktion": "⚡ Aktion nötig",
|
||
"info": "ℹ️ Info",
|
||
"newsletter": "📰 Newsletter",
|
||
"spam": "🗑️ Spam",
|
||
"unknown": "❓ Unkategorisiert",
|
||
}
|
||
cat_order = ["aktion", "wichtig", "info", "newsletter", "spam", "unknown"]
|
||
|
||
for cat in cat_order:
|
||
cat_mails = summary.get(cat, [])
|
||
if not cat_mails:
|
||
continue
|
||
label = cat_labels.get(cat, cat)
|
||
lines.append(f"{label} ({len(cat_mails)}):")
|
||
show = cat_mails if cat in ("wichtig", "aktion", "info") else cat_mails[:3]
|
||
for m_item in show:
|
||
lines.append(f" {m_item['date_str']} | {m_item['from'][:30]}")
|
||
lines.append(f" → {m_item['subject'][:65]}")
|
||
if len(cat_mails) > len(show):
|
||
lines.append(f" ... und {len(cat_mails) - len(show)} weitere")
|
||
lines.append("")
|
||
|
||
wichtig = len(summary.get("wichtig", [])) + len(summary.get("aktion", []))
|
||
noise = len(summary.get("newsletter", [])) + len(summary.get("spam", []))
|
||
lines.append(f"Fazit: {wichtig} relevante, {noise} ignorierbare Mails")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
def format_summary() -> str:
|
||
"""Komplett-Übersicht: Counts + letzte Mails + wichtige."""
|
||
counts = get_mail_count()
|
||
if "error" in counts:
|
||
return f"Mail-Fehler: {counts['error']}"
|
||
|
||
lines = [f"E-Mail ({counts['account']})"]
|
||
lines.append(f" Gesamt: {counts['total']}, Ungelesen: {counts['unread']}\n")
|
||
|
||
recent = get_recent_mails(5)
|
||
if recent and "error" not in recent[0]:
|
||
lines.append("Letzte 5 Mails:")
|
||
for r in recent:
|
||
lines.append(f" {r['date_str']} | {r['from'][:30]}")
|
||
lines.append(f" → {r['subject'][:70]}")
|
||
|
||
important = get_important_mails(48)
|
||
if important and "error" not in important[0]:
|
||
lines.append(f"\nWichtige Mails (48h): {len(important)}")
|
||
for r in important:
|
||
lines.append(f" {r['date_str']} | {r['from'][:30]}")
|
||
lines.append(f" → {r['subject'][:70]}")
|
||
else:
|
||
lines.append("\nKeine wichtigen Mails in den letzten 48h.")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
def format_search_results(results: list[dict]) -> str:
|
||
if not results:
|
||
return "Keine Mails gefunden."
|
||
if "error" in results[0]:
|
||
return f"Suche fehlgeschlagen: {results[0]['error']}"
|
||
lines = [f"{len(results)} Mail(s) gefunden:\n"]
|
||
for r in results:
|
||
lines.append(f" {r['date_str']} | {r['from'][:35]}")
|
||
lines.append(f" → {r['subject'][:70]}")
|
||
return "\n".join(lines)
|