"""IMAP Mail Client — Liest E-Mails vom All-Inkl Spiegel-Postfach (Read-Only). Stufe 1: Keyword-Filter (IMPORTANT_SENDERS) Stufe 2: LLM-Klassifizierung (classify_mails) — trennt Spam/Newsletter von Wichtigem. """ import imaplib import email import json import requests as _req from email.header import decode_header from email.utils import parsedate_to_datetime from datetime import datetime, timedelta, timezone from typing import Optional IMAP_SERVER = "" IMAP_PORT = 993 MAIL_USER = "" MAIL_PASS = "" IMPORTANT_SENDERS = [ "paypal", "bank", "sparkasse", "postbank", "dkb", "ing-diba", "comdirect", "hetzner", "all-inkl", "kasserver", "cloudflare", "proxmox", "synology", "tailscale", "finanzamt", "elster", "bundesnetzagentur", "polizei", "gericht", "enbw", "vattenfall", "enso", "stadtwerke", "sat-reisen", "lufthansa", "vietnam airlines", "booking.com", "versicherung", "allianz", "huk", "debeka", ] SPAM_SENDERS = [ "lebensfreunde", "amazon.de/promotion", "promotion@amazon", "alibaba", "aliexpress", "temu", "save.tv", "newsletter@bit", "magix", "video deluxe", "platforms.ae", ] def init(cfg): global IMAP_SERVER, IMAP_PORT, MAIL_USER, MAIL_PASS IMAP_SERVER = cfg.raw.get("MAIL_IMAP_SERVER", "") IMAP_PORT = int(cfg.raw.get("MAIL_IMAP_PORT", "993")) MAIL_USER = cfg.raw.get("MAIL_USER", "") MAIL_PASS = cfg.raw.get("MAIL_PASS", "") def _connect() -> Optional[imaplib.IMAP4_SSL]: if not IMAP_SERVER or not MAIL_USER or not MAIL_PASS: return None try: m = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT) m.login(MAIL_USER, MAIL_PASS) return m except Exception: return None def _decode_header_value(raw: str) -> str: if not raw: return "" parts = decode_header(raw) decoded = "" for part, enc in parts: if isinstance(part, bytes): decoded += part.decode(enc or "utf-8", errors="replace") else: decoded += part return decoded.strip() def _parse_mail(msg_data) -> Optional[dict]: try: raw = msg_data[0][1] msg = email.message_from_bytes(raw) subj = _decode_header_value(msg.get("Subject", "")) frm = _decode_header_value(msg.get("From", "")) date_str = msg.get("Date", "") try: dt = parsedate_to_datetime(date_str) except Exception: dt = None return { "subject": subj[:120], "from": frm[:80], "date": dt, "date_str": dt.strftime("%d.%m.%Y %H:%M") if dt else date_str[:20], } except Exception: return None def get_mail_count() -> dict: """Anzahl Mails total und ungelesen.""" m = _connect() if not m: return {"error": "IMAP-Verbindung fehlgeschlagen"} try: m.select("INBOX", readonly=True) _, data = m.search(None, "ALL") total = len(data[0].split()) if data[0] else 0 _, data = m.search(None, "UNSEEN") unread = len(data[0].split()) if data[0] else 0 return {"total": total, "unread": unread, "account": MAIL_USER} except Exception as e: return {"error": str(e)} finally: m.logout() def get_recent_mails(count: int = 10) -> list[dict]: """Letzte N Mails (neueste zuerst).""" m = _connect() if not m: return [{"error": "IMAP-Verbindung fehlgeschlagen"}] try: m.select("INBOX", readonly=True) _, data = m.search(None, "ALL") ids = data[0].split() if data[0] else [] if not ids: return [] recent_ids = ids[-count:] recent_ids.reverse() results = [] for mid in recent_ids: _, msg_data = m.fetch(mid, "(BODY.PEEK[HEADER])") parsed = _parse_mail(msg_data) if parsed: results.append(parsed) return results except Exception as e: return [{"error": str(e)}] finally: m.logout() def get_todays_mails() -> list[dict]: """Alle Mails von heute.""" m = _connect() if not m: return [{"error": "IMAP-Verbindung fehlgeschlagen"}] try: m.select("INBOX", readonly=True) today = datetime.now().strftime("%d-%b-%Y") _, data = m.search(None, f'(SINCE "{today}")') ids = data[0].split() if data[0] else [] results = [] for mid in ids: _, msg_data = m.fetch(mid, "(BODY.PEEK[HEADER])") parsed = _parse_mail(msg_data) if parsed: results.append(parsed) results.reverse() return results except Exception as e: return [{"error": str(e)}] finally: m.logout() def search_mail(query: str, days: int = 30, limit: int = 15) -> list[dict]: """Suche nach Mails per Absender oder Betreff.""" m = _connect() if not m: return [{"error": "IMAP-Verbindung fehlgeschlagen"}] try: m.select("INBOX", readonly=True) since = (datetime.now() - timedelta(days=days)).strftime("%d-%b-%Y") all_results = [] for criteria in [f'FROM "{query}"', f'SUBJECT "{query}"']: try: _, data = m.search(None, f'(SINCE "{since}" {criteria})') ids = data[0].split() if data[0] else [] for mid in ids: _, msg_data = m.fetch(mid, "(BODY.PEEK[HEADER])") parsed = _parse_mail(msg_data) if parsed: all_results.append(parsed) except Exception: continue seen = set() unique = [] for r in all_results: key = f"{r['date_str']}|{r['subject'][:40]}" if key not in seen: seen.add(key) unique.append(r) unique.sort(key=lambda x: x.get("date") or datetime.min.replace(tzinfo=timezone.utc), reverse=True) return unique[:limit] except Exception as e: return [{"error": str(e)}] finally: m.logout() def _is_spam_sender(frm_lower: str) -> bool: """Prueft ob Absender in der Spam-Liste steht.""" return any(s in frm_lower for s in SPAM_SENDERS) def get_important_mails(hours: int = 24) -> list[dict]: """Mails von wichtigen Absendern (Bank, Hoster, etc.), Spam-Sender ausgefiltert.""" m = _connect() if not m: return [{"error": "IMAP-Verbindung fehlgeschlagen"}] try: m.select("INBOX", readonly=True) since = (datetime.now() - timedelta(hours=hours)).strftime("%d-%b-%Y") _, data = m.search(None, f'(SINCE "{since}")') ids = data[0].split() if data[0] else [] results = [] for mid in ids: _, msg_data = m.fetch(mid, "(BODY.PEEK[HEADER])") parsed = _parse_mail(msg_data) if parsed: frm_lower = parsed["from"].lower() if _is_spam_sender(frm_lower): continue if any(s in frm_lower for s in IMPORTANT_SENDERS): results.append(parsed) results.reverse() return results except Exception as e: return [{"error": str(e)}] finally: m.logout() CLASSIFY_PROMPT = """Du bekommst eine Liste von E-Mails (Absender + Betreff). Klassifiziere JEDE Mail in genau eine Kategorie. Sei STRENG — im Zweifel ist es Newsletter oder Spam. Kategorien: - "wichtig": NUR echte persoenliche Nachrichten, Rechnungen, Sicherheitswarnungen (Synology, Server), Bank-Transaktionen, Behoerden, Flugaenderungen, Vertragsrelevantes - "aktion": Erfordert DRINGENDE Handlung (Passwort aendern, Zahlung faellig, Sicherheitsluecke, Termin bestaetigen) - "info": Nuetzliche Info OHNE Handlungsbedarf (Versandbestaetigung, Status-Update, Ladebeleg) - "newsletter": Regelmaessige Newsletter, Markt-Updates, Nachrichten-Digest, Fachzeitschriften, Angebote - "spam": Werbung, Marketing, Dating-Benachrichtigungen, Phishing, unbekannte Absender mit verdaechtigem Betreff, "Du hast neue Besucher", Rabattaktionen WICHTIG: - "Neue Profilbesucher", "Du hast neue Nachrichten" von Dating/Social = SPAM - Absender mit .ae/.ru/.cn Domain ohne Bezug = SPAM - "Sale", "Angebot", "nur noch heute", Emojis im Betreff = NEWSLETTER oder SPAM - Taegliche Markt-/Aktien-Updates = NEWSLETTER - Synology Sicherheitswarnungen = AKTION - Rechnungen (EnBW, Telekom etc.) = WICHTIG Antworte NUR mit einem JSON-Array. Pro Mail ein Objekt mit "idx" (0-basiert) und "cat". Beispiel: [{"idx":0,"cat":"spam"},{"idx":1,"cat":"wichtig"}]""" def classify_mails(mails: list[dict], api_key: str) -> list[dict]: """LLM-gestützte Klassifizierung von Mails nach Wichtigkeit.""" if not mails or not api_key: return mails mail_text = "\n".join( f"{i}. Von: {m['from'][:50]} | Betreff: {m['subject'][:80]}" for i, m in enumerate(mails) ) try: r = _req.post( "https://openrouter.ai/api/v1/chat/completions", headers={"Authorization": f"Bearer {api_key}"}, json={ "model": "openai/gpt-4o-mini", "messages": [ {"role": "system", "content": CLASSIFY_PROMPT}, {"role": "user", "content": mail_text}, ], "max_tokens": 2000, "temperature": 0, }, timeout=60, ) r.raise_for_status() content = r.json()["choices"][0]["message"]["content"] content = content.strip() if "```" in content: content = content.split("```", 1)[-1] if content.startswith("json"): content = content[4:] content = content.rsplit("```", 1)[0] content = content.strip() classifications = json.loads(content) cat_map = {c["idx"]: c["cat"] for c in classifications} for i, m in enumerate(mails): m["category"] = cat_map.get(i, "unknown") return mails except Exception: for m in mails: m["category"] = "unknown" return mails BATCH_SIZE = 40 def get_smart_digest(hours: int = 24, api_key: str = "") -> dict: """Intelligente Mail-Zusammenfassung: holt Mails, klassifiziert per LLM in Batches, gruppiert.""" m = _connect() if not m: return {"error": "IMAP-Verbindung fehlgeschlagen"} try: m.select("INBOX", readonly=True) since = (datetime.now() - timedelta(hours=hours)).strftime("%d-%b-%Y") _, data = m.search(None, f'(SINCE "{since}")') ids = data[0].split() if data[0] else [] mails = [] for mid in ids: _, msg_data = m.fetch(mid, "(BODY.PEEK[HEADER])") parsed = _parse_mail(msg_data) if parsed: mails.append(parsed) mails.reverse() except Exception as e: return {"error": str(e)} finally: m.logout() if not mails: return {"total": 0, "mails": [], "summary": {}} if api_key: classified = [] for i in range(0, len(mails), BATCH_SIZE): batch = mails[i:i + BATCH_SIZE] batch = classify_mails(batch, api_key) classified.extend(batch) mails = classified summary = {} for m_item in mails: cat = m_item.get("category", "unknown") summary.setdefault(cat, []).append(m_item) return {"total": len(mails), "mails": mails, "summary": summary} def format_smart_digest(digest: dict) -> str: """Formatiert den intelligenten Digest für Telegram.""" if "error" in digest: return f"Mail-Fehler: {digest['error']}" if digest.get("total", 0) == 0: return "Keine neuen Mails im gewählten Zeitraum." lines = [f"📧 Mail-Digest ({digest['total']} Mails)\n"] summary = digest.get("summary", {}) cat_labels = { "wichtig": "🔴 Wichtig", "aktion": "⚡ Aktion nötig", "info": "ℹ️ Info", "newsletter": "📰 Newsletter", "spam": "🗑️ Spam", "unknown": "❓ Unkategorisiert", } cat_order = ["aktion", "wichtig", "info", "newsletter", "spam", "unknown"] for cat in cat_order: cat_mails = summary.get(cat, []) if not cat_mails: continue label = cat_labels.get(cat, cat) lines.append(f"{label} ({len(cat_mails)}):") show = cat_mails if cat in ("wichtig", "aktion", "info") else cat_mails[:3] for m_item in show: lines.append(f" {m_item['date_str']} | {m_item['from'][:30]}") lines.append(f" → {m_item['subject'][:65]}") if len(cat_mails) > len(show): lines.append(f" ... und {len(cat_mails) - len(show)} weitere") lines.append("") wichtig = len(summary.get("wichtig", [])) + len(summary.get("aktion", [])) noise = len(summary.get("newsletter", [])) + len(summary.get("spam", [])) lines.append(f"Fazit: {wichtig} relevante, {noise} ignorierbare Mails") return "\n".join(lines) def format_summary() -> str: """Komplett-Übersicht: Counts + letzte Mails + wichtige.""" counts = get_mail_count() if "error" in counts: return f"Mail-Fehler: {counts['error']}" lines = [f"E-Mail ({counts['account']})"] lines.append(f" Gesamt: {counts['total']}, Ungelesen: {counts['unread']}\n") recent = get_recent_mails(5) if recent and "error" not in recent[0]: lines.append("Letzte 5 Mails:") for r in recent: lines.append(f" {r['date_str']} | {r['from'][:30]}") lines.append(f" → {r['subject'][:70]}") important = get_important_mails(48) if important and "error" not in important[0]: lines.append(f"\nWichtige Mails (48h): {len(important)}") for r in important: lines.append(f" {r['date_str']} | {r['from'][:30]}") lines.append(f" → {r['subject'][:70]}") else: lines.append("\nKeine wichtigen Mails in den letzten 48h.") return "\n".join(lines) def format_search_results(results: list[dict]) -> str: if not results: return "Keine Mails gefunden." if "error" in results[0]: return f"Suche fehlgeschlagen: {results[0]['error']}" lines = [f"{len(results)} Mail(s) gefunden:\n"] for r in results: lines.append(f" {r['date_str']} | {r['from'][:35]}") lines.append(f" → {r['subject'][:70]}") return "\n".join(lines)