homelab-brain/homelab-ai-bot/core/mail_client.py

423 lines
14 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""IMAP Mail Client — Liest E-Mails vom All-Inkl Spiegel-Postfach (Read-Only).
Stufe 1: Keyword-Filter (IMPORTANT_SENDERS)
Stufe 2: LLM-Klassifizierung (classify_mails) — trennt Spam/Newsletter von Wichtigem.
"""
import imaplib
import email
import json
import requests as _req
from email.header import decode_header
from email.utils import parsedate_to_datetime
from datetime import datetime, timedelta, timezone
from typing import Optional
IMAP_SERVER = ""
IMAP_PORT = 993
MAIL_USER = ""
MAIL_PASS = ""
IMPORTANT_SENDERS = [
"paypal", "bank", "sparkasse", "postbank", "dkb", "ing-diba", "comdirect",
"hetzner", "all-inkl", "kasserver", "cloudflare",
"proxmox", "synology", "tailscale",
"finanzamt", "elster", "bundesnetzagentur", "polizei", "gericht",
"enbw", "vattenfall", "enso", "stadtwerke",
"sat-reisen", "lufthansa", "vietnam airlines", "booking.com",
"versicherung", "allianz", "huk", "debeka",
]
SPAM_SENDERS = [
"lebensfreunde", "amazon.de/promotion", "promotion@amazon",
"alibaba", "aliexpress", "temu",
"save.tv", "newsletter@bit", "magix", "video deluxe",
"platforms.ae",
]
def init(cfg):
global IMAP_SERVER, IMAP_PORT, MAIL_USER, MAIL_PASS
IMAP_SERVER = cfg.raw.get("MAIL_IMAP_SERVER", "")
IMAP_PORT = int(cfg.raw.get("MAIL_IMAP_PORT", "993"))
MAIL_USER = cfg.raw.get("MAIL_USER", "")
MAIL_PASS = cfg.raw.get("MAIL_PASS", "")
def _connect() -> Optional[imaplib.IMAP4_SSL]:
if not IMAP_SERVER or not MAIL_USER or not MAIL_PASS:
return None
try:
m = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT)
m.login(MAIL_USER, MAIL_PASS)
return m
except Exception:
return None
def _decode_header_value(raw: str) -> str:
if not raw:
return ""
parts = decode_header(raw)
decoded = ""
for part, enc in parts:
if isinstance(part, bytes):
decoded += part.decode(enc or "utf-8", errors="replace")
else:
decoded += part
return decoded.strip()
def _parse_mail(msg_data) -> Optional[dict]:
try:
raw = msg_data[0][1]
msg = email.message_from_bytes(raw)
subj = _decode_header_value(msg.get("Subject", ""))
frm = _decode_header_value(msg.get("From", ""))
date_str = msg.get("Date", "")
try:
dt = parsedate_to_datetime(date_str)
except Exception:
dt = None
return {
"subject": subj[:120],
"from": frm[:80],
"date": dt,
"date_str": dt.strftime("%d.%m.%Y %H:%M") if dt else date_str[:20],
}
except Exception:
return None
def get_mail_count() -> dict:
"""Anzahl Mails total und ungelesen."""
m = _connect()
if not m:
return {"error": "IMAP-Verbindung fehlgeschlagen"}
try:
m.select("INBOX", readonly=True)
_, data = m.search(None, "ALL")
total = len(data[0].split()) if data[0] else 0
_, data = m.search(None, "UNSEEN")
unread = len(data[0].split()) if data[0] else 0
return {"total": total, "unread": unread, "account": MAIL_USER}
except Exception as e:
return {"error": str(e)}
finally:
m.logout()
def get_recent_mails(count: int = 10) -> list[dict]:
"""Letzte N Mails (neueste zuerst)."""
m = _connect()
if not m:
return [{"error": "IMAP-Verbindung fehlgeschlagen"}]
try:
m.select("INBOX", readonly=True)
_, data = m.search(None, "ALL")
ids = data[0].split() if data[0] else []
if not ids:
return []
recent_ids = ids[-count:]
recent_ids.reverse()
results = []
for mid in recent_ids:
_, msg_data = m.fetch(mid, "(BODY.PEEK[HEADER])")
parsed = _parse_mail(msg_data)
if parsed:
results.append(parsed)
return results
except Exception as e:
return [{"error": str(e)}]
finally:
m.logout()
def get_todays_mails() -> list[dict]:
"""Alle Mails von heute."""
m = _connect()
if not m:
return [{"error": "IMAP-Verbindung fehlgeschlagen"}]
try:
m.select("INBOX", readonly=True)
today = datetime.now().strftime("%d-%b-%Y")
_, data = m.search(None, f'(SINCE "{today}")')
ids = data[0].split() if data[0] else []
results = []
for mid in ids:
_, msg_data = m.fetch(mid, "(BODY.PEEK[HEADER])")
parsed = _parse_mail(msg_data)
if parsed:
results.append(parsed)
results.reverse()
return results
except Exception as e:
return [{"error": str(e)}]
finally:
m.logout()
def search_mail(query: str, days: int = 30, limit: int = 15) -> list[dict]:
"""Suche nach Mails per Absender oder Betreff."""
m = _connect()
if not m:
return [{"error": "IMAP-Verbindung fehlgeschlagen"}]
try:
m.select("INBOX", readonly=True)
since = (datetime.now() - timedelta(days=days)).strftime("%d-%b-%Y")
all_results = []
for criteria in [f'FROM "{query}"', f'SUBJECT "{query}"']:
try:
_, data = m.search(None, f'(SINCE "{since}" {criteria})')
ids = data[0].split() if data[0] else []
for mid in ids:
_, msg_data = m.fetch(mid, "(BODY.PEEK[HEADER])")
parsed = _parse_mail(msg_data)
if parsed:
all_results.append(parsed)
except Exception:
continue
seen = set()
unique = []
for r in all_results:
key = f"{r['date_str']}|{r['subject'][:40]}"
if key not in seen:
seen.add(key)
unique.append(r)
unique.sort(key=lambda x: x.get("date") or datetime.min.replace(tzinfo=timezone.utc), reverse=True)
return unique[:limit]
except Exception as e:
return [{"error": str(e)}]
finally:
m.logout()
def _is_spam_sender(frm_lower: str) -> bool:
"""Prueft ob Absender in der Spam-Liste steht."""
return any(s in frm_lower for s in SPAM_SENDERS)
def get_important_mails(hours: int = 24) -> list[dict]:
"""Mails von wichtigen Absendern (Bank, Hoster, etc.), Spam-Sender ausgefiltert."""
m = _connect()
if not m:
return [{"error": "IMAP-Verbindung fehlgeschlagen"}]
try:
m.select("INBOX", readonly=True)
since = (datetime.now() - timedelta(hours=hours)).strftime("%d-%b-%Y")
_, data = m.search(None, f'(SINCE "{since}")')
ids = data[0].split() if data[0] else []
results = []
for mid in ids:
_, msg_data = m.fetch(mid, "(BODY.PEEK[HEADER])")
parsed = _parse_mail(msg_data)
if parsed:
frm_lower = parsed["from"].lower()
if _is_spam_sender(frm_lower):
continue
if any(s in frm_lower for s in IMPORTANT_SENDERS):
results.append(parsed)
results.reverse()
return results
except Exception as e:
return [{"error": str(e)}]
finally:
m.logout()
CLASSIFY_PROMPT = """Du bekommst eine Liste von E-Mails (Absender + Betreff).
Klassifiziere JEDE Mail in genau eine Kategorie. Sei STRENG — im Zweifel ist es Newsletter oder Spam.
Kategorien:
- "wichtig": NUR echte persoenliche Nachrichten, Rechnungen, Sicherheitswarnungen (Synology, Server), Bank-Transaktionen, Behoerden, Flugaenderungen, Vertragsrelevantes
- "aktion": Erfordert DRINGENDE Handlung (Passwort aendern, Zahlung faellig, Sicherheitsluecke, Termin bestaetigen)
- "info": Nuetzliche Info OHNE Handlungsbedarf (Versandbestaetigung, Status-Update, Ladebeleg)
- "newsletter": Regelmaessige Newsletter, Markt-Updates, Nachrichten-Digest, Fachzeitschriften, Angebote
- "spam": Werbung, Marketing, Dating-Benachrichtigungen, Phishing, unbekannte Absender mit verdaechtigem Betreff, "Du hast neue Besucher", Rabattaktionen
WICHTIG:
- "Neue Profilbesucher", "Du hast neue Nachrichten" von Dating/Social = SPAM
- Absender mit .ae/.ru/.cn Domain ohne Bezug = SPAM
- "Sale", "Angebot", "nur noch heute", Emojis im Betreff = NEWSLETTER oder SPAM
- Taegliche Markt-/Aktien-Updates = NEWSLETTER
- Synology Sicherheitswarnungen = AKTION
- Rechnungen (EnBW, Telekom etc.) = WICHTIG
Antworte NUR mit einem JSON-Array. Pro Mail ein Objekt mit "idx" (0-basiert) und "cat".
Beispiel: [{"idx":0,"cat":"spam"},{"idx":1,"cat":"wichtig"}]"""
def classify_mails(mails: list[dict], api_key: str) -> list[dict]:
"""LLM-gestützte Klassifizierung von Mails nach Wichtigkeit."""
if not mails or not api_key:
return mails
mail_text = "\n".join(
f"{i}. Von: {m['from'][:50]} | Betreff: {m['subject'][:80]}"
for i, m in enumerate(mails)
)
OLLAMA_URL = "http://100.84.255.83:11434/v1/chat/completions"
OLLAMA_MODEL = "qwen2.5:14b"
try:
r = _req.post(
OLLAMA_URL,
json={
"model": OLLAMA_MODEL,
"messages": [
{"role": "system", "content": CLASSIFY_PROMPT},
{"role": "user", "content": mail_text},
],
"max_tokens": 2000,
"temperature": 0,
},
timeout=120,
)
r.raise_for_status()
content = r.json()["choices"][0]["message"]["content"]
content = content.strip()
if "```" in content:
content = content.split("```", 1)[-1]
if content.startswith("json"):
content = content[4:]
content = content.rsplit("```", 1)[0]
content = content.strip()
classifications = json.loads(content)
cat_map = {c["idx"]: c["cat"] for c in classifications}
for i, m in enumerate(mails):
m["category"] = cat_map.get(i, "unknown")
return mails
except Exception:
for m in mails:
m["category"] = "unknown"
return mails
BATCH_SIZE = 40
def get_smart_digest(hours: int = 24, api_key: str = "") -> dict:
"""Intelligente Mail-Zusammenfassung: holt Mails, klassifiziert per LLM in Batches, gruppiert."""
m = _connect()
if not m:
return {"error": "IMAP-Verbindung fehlgeschlagen"}
try:
m.select("INBOX", readonly=True)
since = (datetime.now() - timedelta(hours=hours)).strftime("%d-%b-%Y")
_, data = m.search(None, f'(SINCE "{since}")')
ids = data[0].split() if data[0] else []
mails = []
for mid in ids:
_, msg_data = m.fetch(mid, "(BODY.PEEK[HEADER])")
parsed = _parse_mail(msg_data)
if parsed:
mails.append(parsed)
mails.reverse()
except Exception as e:
return {"error": str(e)}
finally:
m.logout()
if not mails:
return {"total": 0, "mails": [], "summary": {}}
if api_key:
classified = []
for i in range(0, len(mails), BATCH_SIZE):
batch = mails[i:i + BATCH_SIZE]
batch = classify_mails(batch, api_key)
classified.extend(batch)
mails = classified
summary = {}
for m_item in mails:
cat = m_item.get("category", "unknown")
summary.setdefault(cat, []).append(m_item)
return {"total": len(mails), "mails": mails, "summary": summary}
def format_smart_digest(digest: dict) -> str:
"""Formatiert den intelligenten Digest für Telegram."""
if "error" in digest:
return f"Mail-Fehler: {digest['error']}"
if digest.get("total", 0) == 0:
return "Keine neuen Mails im gewählten Zeitraum."
lines = [f"📧 Mail-Digest ({digest['total']} Mails)\n"]
summary = digest.get("summary", {})
cat_labels = {
"wichtig": "🔴 Wichtig",
"aktion": "⚡ Aktion nötig",
"info": " Info",
"newsletter": "📰 Newsletter",
"spam": "🗑️ Spam",
"unknown": "❓ Unkategorisiert",
}
cat_order = ["aktion", "wichtig", "info", "newsletter", "spam", "unknown"]
for cat in cat_order:
cat_mails = summary.get(cat, [])
if not cat_mails:
continue
label = cat_labels.get(cat, cat)
lines.append(f"{label} ({len(cat_mails)}):")
show = cat_mails if cat in ("wichtig", "aktion", "info") else cat_mails[:3]
for m_item in show:
lines.append(f" {m_item['date_str']} | {m_item['from'][:30]}")
lines.append(f"{m_item['subject'][:65]}")
if len(cat_mails) > len(show):
lines.append(f" ... und {len(cat_mails) - len(show)} weitere")
lines.append("")
wichtig = len(summary.get("wichtig", [])) + len(summary.get("aktion", []))
noise = len(summary.get("newsletter", [])) + len(summary.get("spam", []))
lines.append(f"Fazit: {wichtig} relevante, {noise} ignorierbare Mails")
return "\n".join(lines)
def format_summary() -> str:
"""Komplett-Übersicht: Counts + letzte Mails + wichtige."""
counts = get_mail_count()
if "error" in counts:
return f"Mail-Fehler: {counts['error']}"
lines = [f"E-Mail ({counts['account']})"]
lines.append(f" Gesamt: {counts['total']}, Ungelesen: {counts['unread']}\n")
recent = get_recent_mails(5)
if recent and "error" not in recent[0]:
lines.append("Letzte 5 Mails:")
for r in recent:
lines.append(f" {r['date_str']} | {r['from'][:30]}")
lines.append(f"{r['subject'][:70]}")
important = get_important_mails(48)
if important and "error" not in important[0]:
lines.append(f"\nWichtige Mails (48h): {len(important)}")
for r in important:
lines.append(f" {r['date_str']} | {r['from'][:30]}")
lines.append(f"{r['subject'][:70]}")
else:
lines.append("\nKeine wichtigen Mails in den letzten 48h.")
return "\n".join(lines)
def format_search_results(results: list[dict]) -> str:
if not results:
return "Keine Mails gefunden."
if "error" in results[0]:
return f"Suche fehlgeschlagen: {results[0]['error']}"
lines = [f"{len(results)} Mail(s) gefunden:\n"]
for r in results:
lines.append(f" {r['date_str']} | {r['from'][:35]}")
lines.append(f"{r['subject'][:70]}")
return "\n".join(lines)