homelab-brain/homelab-ai-bot/core/mail_client.py

382 lines
13 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""IMAP Mail Client — Liest E-Mails vom All-Inkl Spiegel-Postfach (Read-Only).
Stufe 1: Keyword-Filter (IMPORTANT_SENDERS)
Stufe 2: LLM-Klassifizierung (classify_mails) — trennt Spam/Newsletter von Wichtigem.
"""
import imaplib
import email
import json
import requests as _req
from email.header import decode_header
from email.utils import parsedate_to_datetime
from datetime import datetime, timedelta, timezone
from typing import Optional
IMAP_SERVER = ""
IMAP_PORT = 993
MAIL_USER = ""
MAIL_PASS = ""
IMPORTANT_SENDERS = [
"paypal", "bank", "sparkasse", "postbank", "dkb",
"hetzner", "all-inkl", "kasserver", "cloudflare",
"proxmox", "synology", "tailscale",
"finanzamt", "elster", "bundesnetzagentur",
]
def init(cfg):
global IMAP_SERVER, IMAP_PORT, MAIL_USER, MAIL_PASS
IMAP_SERVER = cfg.raw.get("MAIL_IMAP_SERVER", "")
IMAP_PORT = int(cfg.raw.get("MAIL_IMAP_PORT", "993"))
MAIL_USER = cfg.raw.get("MAIL_USER", "")
MAIL_PASS = cfg.raw.get("MAIL_PASS", "")
def _connect() -> Optional[imaplib.IMAP4_SSL]:
if not IMAP_SERVER or not MAIL_USER or not MAIL_PASS:
return None
try:
m = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT)
m.login(MAIL_USER, MAIL_PASS)
return m
except Exception:
return None
def _decode_header_value(raw: str) -> str:
if not raw:
return ""
parts = decode_header(raw)
decoded = ""
for part, enc in parts:
if isinstance(part, bytes):
decoded += part.decode(enc or "utf-8", errors="replace")
else:
decoded += part
return decoded.strip()
def _parse_mail(msg_data) -> Optional[dict]:
try:
raw = msg_data[0][1]
msg = email.message_from_bytes(raw)
subj = _decode_header_value(msg.get("Subject", ""))
frm = _decode_header_value(msg.get("From", ""))
date_str = msg.get("Date", "")
try:
dt = parsedate_to_datetime(date_str)
except Exception:
dt = None
return {
"subject": subj[:120],
"from": frm[:80],
"date": dt,
"date_str": dt.strftime("%d.%m.%Y %H:%M") if dt else date_str[:20],
}
except Exception:
return None
def get_mail_count() -> dict:
"""Anzahl Mails total und ungelesen."""
m = _connect()
if not m:
return {"error": "IMAP-Verbindung fehlgeschlagen"}
try:
m.select("INBOX", readonly=True)
_, data = m.search(None, "ALL")
total = len(data[0].split()) if data[0] else 0
_, data = m.search(None, "UNSEEN")
unread = len(data[0].split()) if data[0] else 0
return {"total": total, "unread": unread, "account": MAIL_USER}
except Exception as e:
return {"error": str(e)}
finally:
m.logout()
def get_recent_mails(count: int = 10) -> list[dict]:
"""Letzte N Mails (neueste zuerst)."""
m = _connect()
if not m:
return [{"error": "IMAP-Verbindung fehlgeschlagen"}]
try:
m.select("INBOX", readonly=True)
_, data = m.search(None, "ALL")
ids = data[0].split() if data[0] else []
if not ids:
return []
recent_ids = ids[-count:]
recent_ids.reverse()
results = []
for mid in recent_ids:
_, msg_data = m.fetch(mid, "(BODY.PEEK[HEADER])")
parsed = _parse_mail(msg_data)
if parsed:
results.append(parsed)
return results
except Exception as e:
return [{"error": str(e)}]
finally:
m.logout()
def get_todays_mails() -> list[dict]:
"""Alle Mails von heute."""
m = _connect()
if not m:
return [{"error": "IMAP-Verbindung fehlgeschlagen"}]
try:
m.select("INBOX", readonly=True)
today = datetime.now().strftime("%d-%b-%Y")
_, data = m.search(None, f'(SINCE "{today}")')
ids = data[0].split() if data[0] else []
results = []
for mid in ids:
_, msg_data = m.fetch(mid, "(BODY.PEEK[HEADER])")
parsed = _parse_mail(msg_data)
if parsed:
results.append(parsed)
results.reverse()
return results
except Exception as e:
return [{"error": str(e)}]
finally:
m.logout()
def search_mail(query: str, days: int = 30, limit: int = 15) -> list[dict]:
"""Suche nach Mails per Absender oder Betreff."""
m = _connect()
if not m:
return [{"error": "IMAP-Verbindung fehlgeschlagen"}]
try:
m.select("INBOX", readonly=True)
since = (datetime.now() - timedelta(days=days)).strftime("%d-%b-%Y")
all_results = []
for criteria in [f'FROM "{query}"', f'SUBJECT "{query}"']:
try:
_, data = m.search(None, f'(SINCE "{since}" {criteria})')
ids = data[0].split() if data[0] else []
for mid in ids:
_, msg_data = m.fetch(mid, "(BODY.PEEK[HEADER])")
parsed = _parse_mail(msg_data)
if parsed:
all_results.append(parsed)
except Exception:
continue
seen = set()
unique = []
for r in all_results:
key = f"{r['date_str']}|{r['subject'][:40]}"
if key not in seen:
seen.add(key)
unique.append(r)
unique.sort(key=lambda x: x.get("date") or datetime.min.replace(tzinfo=timezone.utc), reverse=True)
return unique[:limit]
except Exception as e:
return [{"error": str(e)}]
finally:
m.logout()
def get_important_mails(hours: int = 24) -> list[dict]:
"""Mails von wichtigen Absendern (Bank, Hoster, etc.)."""
m = _connect()
if not m:
return [{"error": "IMAP-Verbindung fehlgeschlagen"}]
try:
m.select("INBOX", readonly=True)
since = (datetime.now() - timedelta(hours=hours)).strftime("%d-%b-%Y")
_, data = m.search(None, f'(SINCE "{since}")')
ids = data[0].split() if data[0] else []
results = []
for mid in ids:
_, msg_data = m.fetch(mid, "(BODY.PEEK[HEADER])")
parsed = _parse_mail(msg_data)
if parsed:
frm_lower = parsed["from"].lower()
if any(s in frm_lower for s in IMPORTANT_SENDERS):
results.append(parsed)
results.reverse()
return results
except Exception as e:
return [{"error": str(e)}]
finally:
m.logout()
CLASSIFY_PROMPT = """Du bekommst eine Liste von E-Mails (Absender + Betreff).
Klassifiziere JEDE Mail in genau eine Kategorie:
- "wichtig": Rechnungen, Sicherheitswarnungen, Server-Alerts, Bank, Behörden, persönliche Nachrichten
- "aktion": Erfordert eine Handlung (Passwort ändern, Zahlung fällig, Termin bestätigen)
- "info": Nützliche Info aber keine Handlung nötig (Versandbestätigung, Status-Update)
- "newsletter": Newsletter, Marketing, Angebote, Werbung
- "spam": Offensichtlicher Spam, Phishing, unseriös
Antworte NUR mit einem JSON-Array. Pro Mail ein Objekt mit "idx" (0-basiert) und "cat" (Kategorie).
Beispiel: [{"idx":0,"cat":"newsletter"},{"idx":1,"cat":"wichtig"}]"""
def classify_mails(mails: list[dict], api_key: str) -> list[dict]:
"""LLM-gestützte Klassifizierung von Mails nach Wichtigkeit."""
if not mails or not api_key:
return mails
mail_text = "\n".join(
f"{i}. Von: {m['from'][:50]} | Betreff: {m['subject'][:80]}"
for i, m in enumerate(mails)
)
try:
r = _req.post(
"https://openrouter.ai/api/v1/chat/completions",
headers={"Authorization": f"Bearer {api_key}"},
json={
"model": "openai/gpt-4o-mini",
"messages": [
{"role": "system", "content": CLASSIFY_PROMPT},
{"role": "user", "content": mail_text},
],
"max_tokens": 400,
"temperature": 0,
},
timeout=30,
)
r.raise_for_status()
content = r.json()["choices"][0]["message"]["content"]
content = content.strip()
if content.startswith("```"):
content = content.split("\n", 1)[-1].rsplit("```", 1)[0]
classifications = json.loads(content)
cat_map = {c["idx"]: c["cat"] for c in classifications}
for i, m in enumerate(mails):
m["category"] = cat_map.get(i, "unknown")
return mails
except Exception:
for m in mails:
m["category"] = "unknown"
return mails
def get_smart_digest(hours: int = 24, api_key: str = "") -> dict:
"""Intelligente Mail-Zusammenfassung: holt Mails, klassifiziert per LLM, gruppiert."""
m = _connect()
if not m:
return {"error": "IMAP-Verbindung fehlgeschlagen"}
try:
m.select("INBOX", readonly=True)
since = (datetime.now() - timedelta(hours=hours)).strftime("%d-%b-%Y")
_, data = m.search(None, f'(SINCE "{since}")')
ids = data[0].split() if data[0] else []
mails = []
for mid in ids[-50:]:
_, msg_data = m.fetch(mid, "(BODY.PEEK[HEADER])")
parsed = _parse_mail(msg_data)
if parsed:
mails.append(parsed)
mails.reverse()
except Exception as e:
return {"error": str(e)}
finally:
m.logout()
if not mails:
return {"total": 0, "mails": [], "summary": {}}
if api_key:
mails = classify_mails(mails, api_key)
summary = {}
for m_item in mails:
cat = m_item.get("category", "unknown")
summary.setdefault(cat, []).append(m_item)
return {"total": len(mails), "mails": mails, "summary": summary}
def format_smart_digest(digest: dict) -> str:
"""Formatiert den intelligenten Digest für Telegram."""
if "error" in digest:
return f"Mail-Fehler: {digest['error']}"
if digest.get("total", 0) == 0:
return "Keine neuen Mails im gewählten Zeitraum."
lines = [f"📧 Mail-Digest ({digest['total']} Mails)\n"]
summary = digest.get("summary", {})
cat_labels = {
"wichtig": "🔴 Wichtig",
"aktion": "⚡ Aktion nötig",
"info": " Info",
"newsletter": "📰 Newsletter",
"spam": "🗑️ Spam",
"unknown": "❓ Unkategorisiert",
}
cat_order = ["aktion", "wichtig", "info", "newsletter", "spam", "unknown"]
for cat in cat_order:
cat_mails = summary.get(cat, [])
if not cat_mails:
continue
label = cat_labels.get(cat, cat)
lines.append(f"{label} ({len(cat_mails)}):")
show = cat_mails if cat in ("wichtig", "aktion", "info") else cat_mails[:3]
for m_item in show:
lines.append(f" {m_item['date_str']} | {m_item['from'][:30]}")
lines.append(f"{m_item['subject'][:65]}")
if len(cat_mails) > len(show):
lines.append(f" ... und {len(cat_mails) - len(show)} weitere")
lines.append("")
wichtig = len(summary.get("wichtig", [])) + len(summary.get("aktion", []))
noise = len(summary.get("newsletter", [])) + len(summary.get("spam", []))
lines.append(f"Fazit: {wichtig} relevante, {noise} ignorierbare Mails")
return "\n".join(lines)
def format_summary() -> str:
"""Komplett-Übersicht: Counts + letzte Mails + wichtige."""
counts = get_mail_count()
if "error" in counts:
return f"Mail-Fehler: {counts['error']}"
lines = [f"E-Mail ({counts['account']})"]
lines.append(f" Gesamt: {counts['total']}, Ungelesen: {counts['unread']}\n")
recent = get_recent_mails(5)
if recent and "error" not in recent[0]:
lines.append("Letzte 5 Mails:")
for r in recent:
lines.append(f" {r['date_str']} | {r['from'][:30]}")
lines.append(f"{r['subject'][:70]}")
important = get_important_mails(48)
if important and "error" not in important[0]:
lines.append(f"\nWichtige Mails (48h): {len(important)}")
for r in important:
lines.append(f" {r['date_str']} | {r['from'][:30]}")
lines.append(f"{r['subject'][:70]}")
else:
lines.append("\nKeine wichtigen Mails in den letzten 48h.")
return "\n".join(lines)
def format_search_results(results: list[dict]) -> str:
if not results:
return "Keine Mails gefunden."
if "error" in results[0]:
return f"Suche fehlgeschlagen: {results[0]['error']}"
lines = [f"{len(results)} Mail(s) gefunden:\n"]
for r in results:
lines.append(f" {r['date_str']} | {r['from'][:35]}")
lines.append(f"{r['subject'][:70]}")
return "\n".join(lines)