homelab-brain/homelab-ai-bot/core/mail_client.py

244 lines
7.7 KiB
Python

"""IMAP Mail Client — Liest E-Mails vom All-Inkl Spiegel-Postfach (Read-Only)."""
import imaplib
import email
from email.header import decode_header
from email.utils import parsedate_to_datetime
from datetime import datetime, timedelta, timezone
from typing import Optional
IMAP_SERVER = ""
IMAP_PORT = 993
MAIL_USER = ""
MAIL_PASS = ""
IMPORTANT_SENDERS = [
"paypal", "bank", "sparkasse", "postbank", "dkb",
"hetzner", "all-inkl", "kasserver", "cloudflare",
"proxmox", "synology", "tailscale",
"finanzamt", "elster", "bundesnetzagentur",
]
def init(cfg):
global IMAP_SERVER, IMAP_PORT, MAIL_USER, MAIL_PASS
IMAP_SERVER = cfg.raw.get("MAIL_IMAP_SERVER", "")
IMAP_PORT = int(cfg.raw.get("MAIL_IMAP_PORT", "993"))
MAIL_USER = cfg.raw.get("MAIL_USER", "")
MAIL_PASS = cfg.raw.get("MAIL_PASS", "")
def _connect() -> Optional[imaplib.IMAP4_SSL]:
if not IMAP_SERVER or not MAIL_USER or not MAIL_PASS:
return None
try:
m = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT)
m.login(MAIL_USER, MAIL_PASS)
return m
except Exception:
return None
def _decode_header_value(raw: str) -> str:
if not raw:
return ""
parts = decode_header(raw)
decoded = ""
for part, enc in parts:
if isinstance(part, bytes):
decoded += part.decode(enc or "utf-8", errors="replace")
else:
decoded += part
return decoded.strip()
def _parse_mail(msg_data) -> Optional[dict]:
try:
raw = msg_data[0][1]
msg = email.message_from_bytes(raw)
subj = _decode_header_value(msg.get("Subject", ""))
frm = _decode_header_value(msg.get("From", ""))
date_str = msg.get("Date", "")
try:
dt = parsedate_to_datetime(date_str)
except Exception:
dt = None
return {
"subject": subj[:120],
"from": frm[:80],
"date": dt,
"date_str": dt.strftime("%d.%m.%Y %H:%M") if dt else date_str[:20],
}
except Exception:
return None
def get_mail_count() -> dict:
"""Anzahl Mails total und ungelesen."""
m = _connect()
if not m:
return {"error": "IMAP-Verbindung fehlgeschlagen"}
try:
m.select("INBOX", readonly=True)
_, data = m.search(None, "ALL")
total = len(data[0].split()) if data[0] else 0
_, data = m.search(None, "UNSEEN")
unread = len(data[0].split()) if data[0] else 0
return {"total": total, "unread": unread, "account": MAIL_USER}
except Exception as e:
return {"error": str(e)}
finally:
m.logout()
def get_recent_mails(count: int = 10) -> list[dict]:
"""Letzte N Mails (neueste zuerst)."""
m = _connect()
if not m:
return [{"error": "IMAP-Verbindung fehlgeschlagen"}]
try:
m.select("INBOX", readonly=True)
_, data = m.search(None, "ALL")
ids = data[0].split() if data[0] else []
if not ids:
return []
recent_ids = ids[-count:]
recent_ids.reverse()
results = []
for mid in recent_ids:
_, msg_data = m.fetch(mid, "(BODY.PEEK[HEADER])")
parsed = _parse_mail(msg_data)
if parsed:
results.append(parsed)
return results
except Exception as e:
return [{"error": str(e)}]
finally:
m.logout()
def get_todays_mails() -> list[dict]:
"""Alle Mails von heute."""
m = _connect()
if not m:
return [{"error": "IMAP-Verbindung fehlgeschlagen"}]
try:
m.select("INBOX", readonly=True)
today = datetime.now().strftime("%d-%b-%Y")
_, data = m.search(None, f'(SINCE "{today}")')
ids = data[0].split() if data[0] else []
results = []
for mid in ids:
_, msg_data = m.fetch(mid, "(BODY.PEEK[HEADER])")
parsed = _parse_mail(msg_data)
if parsed:
results.append(parsed)
results.reverse()
return results
except Exception as e:
return [{"error": str(e)}]
finally:
m.logout()
def search_mail(query: str, days: int = 30, limit: int = 15) -> list[dict]:
"""Suche nach Mails per Absender oder Betreff."""
m = _connect()
if not m:
return [{"error": "IMAP-Verbindung fehlgeschlagen"}]
try:
m.select("INBOX", readonly=True)
since = (datetime.now() - timedelta(days=days)).strftime("%d-%b-%Y")
all_results = []
for criteria in [f'FROM "{query}"', f'SUBJECT "{query}"']:
try:
_, data = m.search(None, f'(SINCE "{since}" {criteria})')
ids = data[0].split() if data[0] else []
for mid in ids:
_, msg_data = m.fetch(mid, "(BODY.PEEK[HEADER])")
parsed = _parse_mail(msg_data)
if parsed:
all_results.append(parsed)
except Exception:
continue
seen = set()
unique = []
for r in all_results:
key = f"{r['date_str']}|{r['subject'][:40]}"
if key not in seen:
seen.add(key)
unique.append(r)
unique.sort(key=lambda x: x.get("date") or datetime.min.replace(tzinfo=timezone.utc), reverse=True)
return unique[:limit]
except Exception as e:
return [{"error": str(e)}]
finally:
m.logout()
def get_important_mails(hours: int = 24) -> list[dict]:
"""Mails von wichtigen Absendern (Bank, Hoster, etc.)."""
m = _connect()
if not m:
return [{"error": "IMAP-Verbindung fehlgeschlagen"}]
try:
m.select("INBOX", readonly=True)
since = (datetime.now() - timedelta(hours=hours)).strftime("%d-%b-%Y")
_, data = m.search(None, f'(SINCE "{since}")')
ids = data[0].split() if data[0] else []
results = []
for mid in ids:
_, msg_data = m.fetch(mid, "(BODY.PEEK[HEADER])")
parsed = _parse_mail(msg_data)
if parsed:
frm_lower = parsed["from"].lower()
if any(s in frm_lower for s in IMPORTANT_SENDERS):
results.append(parsed)
results.reverse()
return results
except Exception as e:
return [{"error": str(e)}]
finally:
m.logout()
def format_summary() -> str:
"""Komplett-Übersicht: Counts + letzte Mails + wichtige."""
counts = get_mail_count()
if "error" in counts:
return f"Mail-Fehler: {counts['error']}"
lines = [f"E-Mail ({counts['account']})"]
lines.append(f" Gesamt: {counts['total']}, Ungelesen: {counts['unread']}\n")
recent = get_recent_mails(5)
if recent and "error" not in recent[0]:
lines.append("Letzte 5 Mails:")
for r in recent:
lines.append(f" {r['date_str']} | {r['from'][:30]}")
lines.append(f"{r['subject'][:70]}")
important = get_important_mails(48)
if important and "error" not in important[0]:
lines.append(f"\nWichtige Mails (48h): {len(important)}")
for r in important:
lines.append(f" {r['date_str']} | {r['from'][:30]}")
lines.append(f"{r['subject'][:70]}")
else:
lines.append("\nKeine wichtigen Mails in den letzten 48h.")
return "\n".join(lines)
def format_search_results(results: list[dict]) -> str:
if not results:
return "Keine Mails gefunden."
if "error" in results[0]:
return f"Suche fehlgeschlagen: {results[0]['error']}"
lines = [f"{len(results)} Mail(s) gefunden:\n"]
for r in results:
lines.append(f" {r['date_str']} | {r['from'][:35]}")
lines.append(f"{r['subject'][:70]}")
return "\n".join(lines)