"""Matomo Analytics API Client — Besucherstatistiken + qualifizierte Auswertung.""" import requests from datetime import datetime, timedelta from statistics import mean, stdev MATOMO_URL = "" MATOMO_TOKEN = "" MATOMO_SITE_ID = "1" def init(cfg): global MATOMO_URL, MATOMO_TOKEN, MATOMO_SITE_ID MATOMO_URL = cfg.raw.get("MATOMO_URL", "") MATOMO_TOKEN = cfg.raw.get("MATOMO_TOKEN", "") MATOMO_SITE_ID = cfg.raw.get("MATOMO_SITE_ID", "1") return bool(MATOMO_URL and MATOMO_TOKEN) def _api(method: str, **params) -> dict | list | None: try: p = { "module": "API", "method": method, "idSite": MATOMO_SITE_ID, "format": "json", "token_auth": MATOMO_TOKEN, } p.update(params) resp = requests.get(f"{MATOMO_URL}/index.php", params=p, timeout=15) resp.raise_for_status() return resp.json() except Exception as e: return {"error": str(e)} # --------------- Raw API calls --------------- def get_summary(period: str = "day", date: str = "today") -> dict: return _api("VisitsSummary.get", period=period, date=date) def get_visitor_trend(days: int = 30) -> dict: return _api("VisitsSummary.get", period="day", date=f"last{days}") def get_top_pages(period: str = "day", date: str = "today", limit: int = 10) -> list: result = _api("Actions.getPageUrls", period=period, date=date, filter_limit=limit, flat=1) if isinstance(result, dict) and "error" in result: return [] return result if isinstance(result, list) else [] def get_referrers(period: str = "day", date: str = "today", limit: int = 10) -> list: result = _api("Referrers.getReferrerType", period=period, date=date, filter_limit=limit) if isinstance(result, dict) and "error" in result: return [] return result if isinstance(result, list) else [] def get_countries(period: str = "day", date: str = "today", limit: int = 10) -> list: result = _api("UserCountry.getCountry", period=period, date=date, filter_limit=limit) if isinstance(result, dict) and "error" in result: return [] return result if isinstance(result, list) else [] def get_devices(period: str = "day", date: str = "today") -> list: result = _api("DevicesDetection.getType", period=period, date=date) if isinstance(result, dict) and "error" in result: return [] return result if isinstance(result, list) else [] # --------------- Analyse-Funktionen --------------- def _extract_daily_values(trend_data: dict, key: str = "nb_uniq_visitors") -> list[tuple[str, int]]: """Extrahiert (datum, wert) Paare, sortiert nach Datum.""" pairs = [] for date_str, data in sorted(trend_data.items()): if isinstance(data, dict): pairs.append((date_str, data.get(key, 0))) else: pairs.append((date_str, 0)) return pairs def _week_values(pairs: list[tuple[str, int]], weeks_ago: int = 0) -> list[int]: """Werte der letzten N-ten Woche (0 = aktuelle, 1 = letzte).""" if not pairs: return [] end_idx = len(pairs) - (weeks_ago * 7) start_idx = max(0, end_idx - 7) if end_idx <= 0: return [] return [v for _, v in pairs[start_idx:end_idx]] def _pct_change(old: float, new: float) -> float | None: if old == 0: return None return ((new - old) / old) * 100 def _classify_bounce(rate_str: str) -> str: try: rate = int(rate_str.replace("%", "")) except (ValueError, AttributeError): return "unbekannt" if rate <= 40: return "sehr gut (unter 40%)" elif rate <= 55: return "gut (40-55%)" elif rate <= 70: return "durchschnittlich (55-70%)" elif rate <= 85: return "schlecht (70-85%) — Besucher springen schnell ab" else: return "sehr schlecht (ueber 85%) — fast alle Besucher verlassen die Seite sofort" def _classify_avg_time(seconds: int) -> str: if seconds >= 180: return f"sehr gut ({seconds // 60}m {seconds % 60}s) — Besucher lesen ausfuehrlich" elif seconds >= 90: return f"gut ({seconds // 60}m {seconds % 60}s) — Besucher bleiben eine Weile" elif seconds >= 45: return f"maessig ({seconds}s) — Besucher ueberfliegen nur" else: return f"schlecht ({seconds}s) — Besucher verlassen sofort" def _find_outliers(pairs: list[tuple[str, int]]) -> dict: """Findet Tage mit ungewoehnlich hohem/niedrigem Traffic.""" values = [v for _, v in pairs if v > 0] if len(values) < 7: return {"peaks": [], "dips": []} avg = mean(values) sd = stdev(values) if len(values) > 1 else 0 peaks = [(d, v) for d, v in pairs if v > avg + 1.5 * sd and v > 0] dips = [(d, v) for d, v in pairs if 0 < v < max(avg - 1.5 * sd, 1)] return {"peaks": peaks, "dips": dips, "avg": avg, "sd": sd} def _trend_direction(pairs: list[tuple[str, int]], window: int = 7) -> str: """Bestimmt Trend-Richtung anhand der letzten N Tage vs. davor.""" if len(pairs) < window * 2: return "zu wenig Daten" recent = [v for _, v in pairs[-window:]] previous = [v for _, v in pairs[-window * 2:-window]] avg_recent = mean(recent) if recent else 0 avg_previous = mean(previous) if previous else 0 pct = _pct_change(avg_previous, avg_recent) if pct is None: return "vorher keine Besucher" if pct > 20: return f"stark steigend (+{pct:.0f}%)" elif pct > 5: return f"leicht steigend (+{pct:.0f}%)" elif pct > -5: return f"stabil ({pct:+.0f}%)" elif pct > -20: return f"leicht fallend ({pct:.0f}%)" else: return f"stark fallend ({pct:.0f}%)" def _analyze_referrers(referrers: list) -> list[str]: """Qualifizierte Aussagen ueber Traffic-Quellen.""" insights = [] if not referrers: return insights total = sum(r.get("nb_visits", 0) for r in referrers) if total == 0: return insights for r in referrers: label = r.get("label", "?") visits = r.get("nb_visits", 0) share = (visits / total * 100) if total > 0 else 0 if share > 70: insights.append(f"WARNUNG: {share:.0f}% des Traffics kommt von '{label}' — hohe Abhaengigkeit") elif share > 50: insights.append(f"'{label}' dominiert mit {share:.0f}% — Diversifikation empfohlen") search = next((r for r in referrers if "search" in r.get("label", "").lower()), None) direct = next((r for r in referrers if "direct" in r.get("label", "").lower()), None) if search and direct and total > 0: search_share = search.get("nb_visits", 0) / total * 100 direct_share = direct.get("nb_visits", 0) / total * 100 if direct_share > 30: insights.append(f"{direct_share:.0f}% Direktzugriffe — gutes Zeichen fuer Stammleser") if search_share < 10 and total > 20: insights.append(f"Nur {search_share:.0f}% Suchmaschinen-Traffic — SEO verbessern?") return insights def _analyze_top_pages(pages: list) -> list[str]: """Qualifizierte Aussagen ueber Content-Performance.""" insights = [] if not pages or len(pages) < 2: return insights total = sum(p.get("nb_hits", 0) for p in pages) top = pages[0] top_hits = top.get("nb_hits", 0) top_label = top.get("label", "?") if total > 0 and top_hits > 0: top_share = top_hits / total * 100 if top_share > 50: insights.append(f"'{top_label}' hat {top_share:.0f}% aller Aufrufe — ein klarer Hit") elif top_share > 30: insights.append(f"'{top_label}' ist der staerkste Artikel ({top_share:.0f}%)") avg_hits = total / len(pages) if pages else 0 strong = [p for p in pages if p.get("nb_hits", 0) > avg_hits * 2] if len(strong) > 1: insights.append(f"{len(strong)} Artikel performen ueberdurchschnittlich") return insights # --------------- Format-Funktionen (Output fuer LLM) --------------- def format_analytics(period: str = "day", date: str = "today") -> str: """Qualifizierter Analytics-Report mit Bewertungen und Vergleichen.""" lines = [] # --- Heute --- summary = get_summary(period, date) if isinstance(summary, dict) and "error" not in summary: visitors = summary.get("nb_uniq_visitors", 0) visits = summary.get("nb_visits", 0) actions = summary.get("nb_actions", 0) bounce = summary.get("bounce_rate", "?") avg_time = int(summary.get("avg_time_on_site", 0)) actions_per = summary.get("nb_actions_per_visit", 0) lines.append(f"=== HEUTE ({date}) ===") lines.append(f"Besucher: {visitors} unique, {visits} visits, {actions} Seitenaufrufe") lines.append(f"Seiten/Besuch: {actions_per}") lines.append(f"Bounce Rate: {bounce} — Bewertung: {_classify_bounce(bounce)}") lines.append(f"Verweildauer: {_classify_avg_time(avg_time)}") else: return f"Matomo nicht erreichbar: {summary}" # --- Woche-ueber-Woche --- trend_data = get_visitor_trend(21) if isinstance(trend_data, dict) and "error" not in trend_data: pairs = _extract_daily_values(trend_data) this_week = _week_values(pairs, 0) last_week = _week_values(pairs, 1) if this_week and last_week: avg_this = mean(this_week) avg_last = mean(last_week) pct = _pct_change(avg_last, avg_this) lines.append(f"\n=== WOCHENVERGLEICH ===") lines.append(f"Diese Woche: Ø {avg_this:.0f} Besucher/Tag (Summe: {sum(this_week)})") lines.append(f"Letzte Woche: Ø {avg_last:.0f} Besucher/Tag (Summe: {sum(last_week)})") if pct is not None: direction = "mehr" if pct > 0 else "weniger" lines.append(f"Veraenderung: {pct:+.0f}% ({direction} als letzte Woche)") # Trend-Richtung trend_dir = _trend_direction(pairs) lines.append(f"Trend (7 Tage vs. davor): {trend_dir}") # Ausreisser outliers = _find_outliers(pairs) if outliers.get("peaks"): for d, v in outliers["peaks"][:2]: lines.append(f"Peak: {d} mit {v} Besuchern (Ø {outliers['avg']:.0f})") if outliers.get("dips"): for d, v in outliers["dips"][:2]: lines.append(f"Tief: {d} mit nur {v} Besuchern") # --- Top Seiten --- pages = get_top_pages(period, "today", 10) if pages: lines.append(f"\n=== TOP SEITEN (heute) ===") for p in pages[:5]: label = p.get("label", "?") hits = p.get("nb_hits", 0) avg_time_page = int(p.get("avg_time_on_page", 0)) lines.append(f" {label}: {hits}x Aufrufe, {avg_time_page}s Lesezeit") page_insights = _analyze_top_pages(pages) for ins in page_insights: lines.append(f" → {ins}") # --- Traffic-Quellen --- referrers = get_referrers(period, "today", 10) if referrers: lines.append(f"\n=== TRAFFIC-QUELLEN (heute) ===") total_ref = sum(r.get("nb_visits", 0) for r in referrers) for r in referrers[:5]: label = r.get("label", "?") visits = r.get("nb_visits", 0) share = (visits / total_ref * 100) if total_ref > 0 else 0 lines.append(f" {label}: {visits} visits ({share:.0f}%)") ref_insights = _analyze_referrers(referrers) for ins in ref_insights: lines.append(f" → {ins}") # --- Laender --- countries = get_countries(period, "today", 5) if countries: lines.append(f"\n=== LAENDER (heute) ===") for c in countries[:5]: label = c.get("label", "?") visits = c.get("nb_visits", 0) lines.append(f" {label}: {visits}") # --- Geraete --- devices = get_devices(period, "today") if devices: lines.append(f"\n=== GERAETE (heute) ===") for d in devices[:4]: label = d.get("label", "?") visits = d.get("nb_visits", 0) lines.append(f" {label}: {visits}") mobile = sum(d.get("nb_visits", 0) for d in devices if "mobile" in d.get("label", "").lower() or "smartphone" in d.get("label", "").lower()) desktop = sum(d.get("nb_visits", 0) for d in devices if "desktop" in d.get("label", "").lower()) total_dev = sum(d.get("nb_visits", 0) for d in devices) if total_dev > 0: mob_pct = mobile / total_dev * 100 lines.append(f" Mobile-Anteil: {mob_pct:.0f}%") if mob_pct > 60: lines.append(f" → Mehrheit mobil — Mobile-Optimierung wichtig") elif mob_pct < 20: lines.append(f" → Fast nur Desktop-Nutzer") return "\n".join(lines) def format_trend(days: int = 30) -> str: """Besucherentwicklung mit qualifizierter Analyse.""" trend = get_visitor_trend(days) if isinstance(trend, dict) and "error" in trend: return f"Matomo-Fehler: {trend['error']}" pairs = _extract_daily_values(trend) values_nonzero = [v for _, v in pairs if v > 0] lines = [f"=== BESUCHERENTWICKLUNG ({days} Tage) ==="] # Rohdaten for date_str, v in pairs: marker = "" if values_nonzero: avg = mean(values_nonzero) if v > avg * 2: marker = " ★ Peak" elif v > 0 and v < avg * 0.3: marker = " ↓ Tief" lines.append(f" {date_str}: {v} Besucher{marker}") # Zusammenfassung if values_nonzero: total = sum(values_nonzero) avg = mean(values_nonzero) best_day = max(pairs, key=lambda x: x[1]) worst_day = min((p for p in pairs if p[1] > 0), key=lambda x: x[1], default=("?", 0)) lines.append(f"\n=== ZUSAMMENFASSUNG ===") lines.append(f"Gesamt: {sum(v for _, v in pairs)} Besucher in {days} Tagen") lines.append(f"Tage mit Besuchern: {len(values_nonzero)} von {len(pairs)}") lines.append(f"Durchschnitt: {avg:.0f} Besucher/Tag") lines.append(f"Bester Tag: {best_day[0]} ({best_day[1]} Besucher)") lines.append(f"Schwächster Tag: {worst_day[0]} ({worst_day[1]} Besucher)") # Trend trend_dir = _trend_direction(pairs) lines.append(f"Trend-Richtung: {trend_dir}") # Wochentags-Muster (nur bei >= 14 Tagen Daten sinnvoll) weekday_map = {} for d, v in pairs: try: wd = datetime.strptime(d, "%Y-%m-%d").strftime("%A") weekday_map.setdefault(wd, []).append(v) except ValueError: pass min_samples = min(len(vals) for vals in weekday_map.values()) if weekday_map else 0 if weekday_map and min_samples >= 2: lines.append(f"\n=== WOCHENTAGS-MUSTER (je {min_samples}-{max(len(v) for v in weekday_map.values())} Datenpunkte) ===") wd_order = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] wd_de = {"Monday": "Mo", "Tuesday": "Di", "Wednesday": "Mi", "Thursday": "Do", "Friday": "Fr", "Saturday": "Sa", "Sunday": "So"} for wd in wd_order: if wd in weekday_map: vals = weekday_map[wd] wd_avg = mean(vals) if vals else 0 lines.append(f" {wd_de[wd]}: Ø {wd_avg:.0f} Besucher ({len(vals)}x gemessen)") best_wd = max(weekday_map.items(), key=lambda x: mean(x[1]) if x[1] else 0) worst_wd = min(weekday_map.items(), key=lambda x: mean(x[1]) if x[1] else float('inf')) lines.append(f" → Bester Wochentag: {wd_de.get(best_wd[0], best_wd[0])} (Ø {mean(best_wd[1]):.0f})") lines.append(f" → Schwaechster: {wd_de.get(worst_wd[0], worst_wd[0])} (Ø {mean(worst_wd[1]):.0f})") if min_samples < 4: lines.append(f" ⚠ Wenig Datenpunkte — Muster wird mit mehr Daten zuverlaessiger") elif weekday_map: lines.append(f"\n=== WOCHENTAGS-MUSTER ===") lines.append(f" Zu wenig Daten fuer zuverlaessige Wochentags-Analyse (nur {days} Tage, mind. 14 noetig)") # Wachstums-Prognose if len(values_nonzero) >= 14: first_half = values_nonzero[:len(values_nonzero) // 2] second_half = values_nonzero[len(values_nonzero) // 2:] growth = _pct_change(mean(first_half), mean(second_half)) if growth is not None: lines.append(f"\n=== PROGNOSE ===") if growth > 10: lines.append(f"Wachstum erste→zweite Haelfte: +{growth:.0f}% — positiver Trend") monthly_proj = avg * 30 lines.append(f"Hochrechnung naechster Monat: ~{monthly_proj:.0f} Besucher") elif growth > -10: lines.append(f"Stabile Phase ({growth:+.0f}%) — Wachstum stagniert") else: lines.append(f"Ruecklaeufig ({growth:.0f}%) — Gegenmassnahmen pruefen") return "\n".join(lines)