From ae7bbebedee1c5dde8996656af03a08634ca8ec1 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 16 Mar 2026 13:05:25 +0700 Subject: [PATCH] =?UTF-8?q?Hausmeister-Bot:=20Qualifizierte=20Matomo-Auswe?= =?UTF-8?q?rtung=20=E2=80=94=20WoW-Vergleich,=20Trend,=20Ausreisser,=20Bou?= =?UTF-8?q?nce/Engagement-Bewertung,=20Wochentags-Muster,=20Traffic-Quelle?= =?UTF-8?q?n-Analyse,=20Prognose?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- homelab-ai-bot/core/matomo_client.py | 370 +++++++++++++++++++++++---- homelab-ai-bot/llm.py | 12 + 2 files changed, 333 insertions(+), 49 deletions(-) diff --git a/homelab-ai-bot/core/matomo_client.py b/homelab-ai-bot/core/matomo_client.py index 762f0119..99ed07cd 100644 --- a/homelab-ai-bot/core/matomo_client.py +++ b/homelab-ai-bot/core/matomo_client.py @@ -1,7 +1,8 @@ -"""Matomo Analytics API Client — Besucherstatistiken fuer arakavanews.com.""" +"""Matomo Analytics API Client — Besucherstatistiken + qualifizierte Auswertung.""" import requests from datetime import datetime, timedelta +from statistics import mean, stdev MATOMO_URL = "" MATOMO_TOKEN = "" @@ -33,18 +34,17 @@ def _api(method: str, **params) -> dict | list | None: return {"error": str(e)} +# --------------- Raw API calls --------------- + def get_summary(period: str = "day", date: str = "today") -> dict: - """Besucher-Zusammenfassung (Visits, Unique, Actions, Bounce, Avg Time).""" return _api("VisitsSummary.get", period=period, date=date) def get_visitor_trend(days: int = 30) -> dict: - """Tageweise Besucherzahlen der letzten N Tage.""" return _api("VisitsSummary.get", period="day", date=f"last{days}") def get_top_pages(period: str = "day", date: str = "today", limit: int = 10) -> list: - """Meistbesuchte Seiten.""" result = _api("Actions.getPageUrls", period=period, date=date, filter_limit=limit, flat=1) if isinstance(result, dict) and "error" in result: @@ -53,7 +53,6 @@ def get_top_pages(period: str = "day", date: str = "today", limit: int = 10) -> def get_referrers(period: str = "day", date: str = "today", limit: int = 10) -> list: - """Woher kommen Besucher (Suchmaschinen, Social, Direkt).""" result = _api("Referrers.getReferrerType", period=period, date=date, filter_limit=limit) if isinstance(result, dict) and "error" in result: @@ -62,7 +61,6 @@ def get_referrers(period: str = "day", date: str = "today", limit: int = 10) -> def get_countries(period: str = "day", date: str = "today", limit: int = 10) -> list: - """Besucher nach Laendern.""" result = _api("UserCountry.getCountry", period=period, date=date, filter_limit=limit) if isinstance(result, dict) and "error" in result: @@ -71,93 +69,367 @@ def get_countries(period: str = "day", date: str = "today", limit: int = 10) -> def get_devices(period: str = "day", date: str = "today") -> list: - """Besucher nach Geraetetyp (Desktop, Mobile, Tablet).""" result = _api("DevicesDetection.getType", period=period, date=date) if isinstance(result, dict) and "error" in result: return [] return result if isinstance(result, list) else [] +# --------------- Analyse-Funktionen --------------- + +def _extract_daily_values(trend_data: dict, key: str = "nb_uniq_visitors") -> list[tuple[str, int]]: + """Extrahiert (datum, wert) Paare, sortiert nach Datum.""" + pairs = [] + for date_str, data in sorted(trend_data.items()): + if isinstance(data, dict): + pairs.append((date_str, data.get(key, 0))) + else: + pairs.append((date_str, 0)) + return pairs + + +def _week_values(pairs: list[tuple[str, int]], weeks_ago: int = 0) -> list[int]: + """Werte der letzten N-ten Woche (0 = aktuelle, 1 = letzte).""" + if not pairs: + return [] + end_idx = len(pairs) - (weeks_ago * 7) + start_idx = max(0, end_idx - 7) + if end_idx <= 0: + return [] + return [v for _, v in pairs[start_idx:end_idx]] + + +def _pct_change(old: float, new: float) -> float | None: + if old == 0: + return None + return ((new - old) / old) * 100 + + +def _classify_bounce(rate_str: str) -> str: + try: + rate = int(rate_str.replace("%", "")) + except (ValueError, AttributeError): + return "unbekannt" + if rate <= 40: + return "sehr gut (unter 40%)" + elif rate <= 55: + return "gut (40-55%)" + elif rate <= 70: + return "durchschnittlich (55-70%)" + elif rate <= 85: + return "schlecht (70-85%) — Besucher springen schnell ab" + else: + return "sehr schlecht (ueber 85%) — fast alle Besucher verlassen die Seite sofort" + + +def _classify_avg_time(seconds: int) -> str: + if seconds >= 180: + return f"sehr gut ({seconds // 60}m {seconds % 60}s) — Besucher lesen ausfuehrlich" + elif seconds >= 90: + return f"gut ({seconds // 60}m {seconds % 60}s) — Besucher bleiben eine Weile" + elif seconds >= 45: + return f"maessig ({seconds}s) — Besucher ueberfliegen nur" + else: + return f"schlecht ({seconds}s) — Besucher verlassen sofort" + + +def _find_outliers(pairs: list[tuple[str, int]]) -> dict: + """Findet Tage mit ungewoehnlich hohem/niedrigem Traffic.""" + values = [v for _, v in pairs if v > 0] + if len(values) < 7: + return {"peaks": [], "dips": []} + + avg = mean(values) + sd = stdev(values) if len(values) > 1 else 0 + + peaks = [(d, v) for d, v in pairs if v > avg + 1.5 * sd and v > 0] + dips = [(d, v) for d, v in pairs if 0 < v < max(avg - 1.5 * sd, 1)] + + return {"peaks": peaks, "dips": dips, "avg": avg, "sd": sd} + + +def _trend_direction(pairs: list[tuple[str, int]], window: int = 7) -> str: + """Bestimmt Trend-Richtung anhand der letzten N Tage vs. davor.""" + if len(pairs) < window * 2: + return "zu wenig Daten" + + recent = [v for _, v in pairs[-window:]] + previous = [v for _, v in pairs[-window * 2:-window]] + + avg_recent = mean(recent) if recent else 0 + avg_previous = mean(previous) if previous else 0 + + pct = _pct_change(avg_previous, avg_recent) + if pct is None: + return "vorher keine Besucher" + + if pct > 20: + return f"stark steigend (+{pct:.0f}%)" + elif pct > 5: + return f"leicht steigend (+{pct:.0f}%)" + elif pct > -5: + return f"stabil ({pct:+.0f}%)" + elif pct > -20: + return f"leicht fallend ({pct:.0f}%)" + else: + return f"stark fallend ({pct:.0f}%)" + + +def _analyze_referrers(referrers: list) -> list[str]: + """Qualifizierte Aussagen ueber Traffic-Quellen.""" + insights = [] + if not referrers: + return insights + + total = sum(r.get("nb_visits", 0) for r in referrers) + if total == 0: + return insights + + for r in referrers: + label = r.get("label", "?") + visits = r.get("nb_visits", 0) + share = (visits / total * 100) if total > 0 else 0 + + if share > 70: + insights.append(f"WARNUNG: {share:.0f}% des Traffics kommt von '{label}' — hohe Abhaengigkeit") + elif share > 50: + insights.append(f"'{label}' dominiert mit {share:.0f}% — Diversifikation empfohlen") + + search = next((r for r in referrers if "search" in r.get("label", "").lower()), None) + direct = next((r for r in referrers if "direct" in r.get("label", "").lower()), None) + + if search and direct and total > 0: + search_share = search.get("nb_visits", 0) / total * 100 + direct_share = direct.get("nb_visits", 0) / total * 100 + if direct_share > 30: + insights.append(f"{direct_share:.0f}% Direktzugriffe — gutes Zeichen fuer Stammleser") + if search_share < 10 and total > 20: + insights.append(f"Nur {search_share:.0f}% Suchmaschinen-Traffic — SEO verbessern?") + + return insights + + +def _analyze_top_pages(pages: list) -> list[str]: + """Qualifizierte Aussagen ueber Content-Performance.""" + insights = [] + if not pages or len(pages) < 2: + return insights + + total = sum(p.get("nb_hits", 0) for p in pages) + top = pages[0] + top_hits = top.get("nb_hits", 0) + top_label = top.get("label", "?") + + if total > 0 and top_hits > 0: + top_share = top_hits / total * 100 + if top_share > 50: + insights.append(f"'{top_label}' hat {top_share:.0f}% aller Aufrufe — ein klarer Hit") + elif top_share > 30: + insights.append(f"'{top_label}' ist der staerkste Artikel ({top_share:.0f}%)") + + avg_hits = total / len(pages) if pages else 0 + strong = [p for p in pages if p.get("nb_hits", 0) > avg_hits * 2] + if len(strong) > 1: + insights.append(f"{len(strong)} Artikel performen ueberdurchschnittlich") + + return insights + + +# --------------- Format-Funktionen (Output fuer LLM) --------------- + def format_analytics(period: str = "day", date: str = "today") -> str: - """Kompakter Analytics-Report fuer den Hausmeister-Bot.""" + """Qualifizierter Analytics-Report mit Bewertungen und Vergleichen.""" lines = [] + # --- Heute --- summary = get_summary(period, date) if isinstance(summary, dict) and "error" not in summary: visitors = summary.get("nb_uniq_visitors", 0) visits = summary.get("nb_visits", 0) actions = summary.get("nb_actions", 0) bounce = summary.get("bounce_rate", "?") - avg_time = summary.get("avg_time_on_site", 0) - avg_min = int(avg_time) // 60 - avg_sec = int(avg_time) % 60 + avg_time = int(summary.get("avg_time_on_site", 0)) + actions_per = summary.get("nb_actions_per_visit", 0) + + lines.append(f"=== HEUTE ({date}) ===") lines.append(f"Besucher: {visitors} unique, {visits} visits, {actions} Seitenaufrufe") - lines.append(f"Bounce Rate: {bounce}, Verweildauer: {avg_min}m {avg_sec}s") + lines.append(f"Seiten/Besuch: {actions_per}") + lines.append(f"Bounce Rate: {bounce} — Bewertung: {_classify_bounce(bounce)}") + lines.append(f"Verweildauer: {_classify_avg_time(avg_time)}") else: return f"Matomo nicht erreichbar: {summary}" - trend = get_visitor_trend(14) - if isinstance(trend, dict) and "error" not in trend: - trend_lines = [] - for date_str, data in sorted(trend.items()): - if isinstance(data, dict): - v = data.get("nb_uniq_visitors", 0) - trend_lines.append(f" {date_str}: {v} Besucher") - else: - trend_lines.append(f" {date_str}: 0") - if trend_lines: - lines.append("\nTrend (14 Tage):") - lines.extend(trend_lines) + # --- Woche-ueber-Woche --- + trend_data = get_visitor_trend(21) + if isinstance(trend_data, dict) and "error" not in trend_data: + pairs = _extract_daily_values(trend_data) - pages = get_top_pages(period, "today", 5) + this_week = _week_values(pairs, 0) + last_week = _week_values(pairs, 1) + + if this_week and last_week: + avg_this = mean(this_week) + avg_last = mean(last_week) + pct = _pct_change(avg_last, avg_this) + + lines.append(f"\n=== WOCHENVERGLEICH ===") + lines.append(f"Diese Woche: Ø {avg_this:.0f} Besucher/Tag (Summe: {sum(this_week)})") + lines.append(f"Letzte Woche: Ø {avg_last:.0f} Besucher/Tag (Summe: {sum(last_week)})") + if pct is not None: + direction = "mehr" if pct > 0 else "weniger" + lines.append(f"Veraenderung: {pct:+.0f}% ({direction} als letzte Woche)") + + # Trend-Richtung + trend_dir = _trend_direction(pairs) + lines.append(f"Trend (7 Tage vs. davor): {trend_dir}") + + # Ausreisser + outliers = _find_outliers(pairs) + if outliers.get("peaks"): + for d, v in outliers["peaks"][:2]: + lines.append(f"Peak: {d} mit {v} Besuchern (Ø {outliers['avg']:.0f})") + if outliers.get("dips"): + for d, v in outliers["dips"][:2]: + lines.append(f"Tief: {d} mit nur {v} Besuchern") + + # --- Top Seiten --- + pages = get_top_pages(period, "today", 10) if pages: - lines.append("\nTop Seiten (heute):") + lines.append(f"\n=== TOP SEITEN (heute) ===") for p in pages[:5]: label = p.get("label", "?") hits = p.get("nb_hits", 0) - lines.append(f" {label}: {hits}x") + avg_time_page = int(p.get("avg_time_on_page", 0)) + lines.append(f" {label}: {hits}x Aufrufe, {avg_time_page}s Lesezeit") - referrers = get_referrers(period, "today", 5) + page_insights = _analyze_top_pages(pages) + for ins in page_insights: + lines.append(f" → {ins}") + + # --- Traffic-Quellen --- + referrers = get_referrers(period, "today", 10) if referrers: - lines.append("\nTraffic-Quellen (heute):") + lines.append(f"\n=== TRAFFIC-QUELLEN (heute) ===") + total_ref = sum(r.get("nb_visits", 0) for r in referrers) for r in referrers[:5]: label = r.get("label", "?") visits = r.get("nb_visits", 0) - lines.append(f" {label}: {visits} visits") + share = (visits / total_ref * 100) if total_ref > 0 else 0 + lines.append(f" {label}: {visits} visits ({share:.0f}%)") + ref_insights = _analyze_referrers(referrers) + for ins in ref_insights: + lines.append(f" → {ins}") + + # --- Laender --- countries = get_countries(period, "today", 5) if countries: - lines.append("\nLaender (heute):") + lines.append(f"\n=== LAENDER (heute) ===") for c in countries[:5]: label = c.get("label", "?") visits = c.get("nb_visits", 0) lines.append(f" {label}: {visits}") - return "\n".join(lines) if lines else "Keine Daten verfuegbar." + # --- Geraete --- + devices = get_devices(period, "today") + if devices: + lines.append(f"\n=== GERAETE (heute) ===") + for d in devices[:4]: + label = d.get("label", "?") + visits = d.get("nb_visits", 0) + lines.append(f" {label}: {visits}") + mobile = sum(d.get("nb_visits", 0) for d in devices if "mobile" in d.get("label", "").lower() or "smartphone" in d.get("label", "").lower()) + desktop = sum(d.get("nb_visits", 0) for d in devices if "desktop" in d.get("label", "").lower()) + total_dev = sum(d.get("nb_visits", 0) for d in devices) + if total_dev > 0: + mob_pct = mobile / total_dev * 100 + lines.append(f" Mobile-Anteil: {mob_pct:.0f}%") + if mob_pct > 60: + lines.append(f" → Mehrheit mobil — Mobile-Optimierung wichtig") + elif mob_pct < 20: + lines.append(f" → Fast nur Desktop-Nutzer") + + return "\n".join(lines) def format_trend(days: int = 30) -> str: - """Besucherentwicklung ueber N Tage — fuer Trend-Fragen.""" + """Besucherentwicklung mit qualifizierter Analyse.""" trend = get_visitor_trend(days) if isinstance(trend, dict) and "error" in trend: return f"Matomo-Fehler: {trend['error']}" - lines = [f"Besucherentwicklung (letzte {days} Tage):"] - total = 0 - day_count = 0 - for date_str, data in sorted(trend.items()): - if isinstance(data, dict): - v = data.get("nb_uniq_visitors", 0) - actions = data.get("nb_actions", 0) - lines.append(f" {date_str}: {v} Besucher, {actions} Aufrufe") - total += v - day_count += 1 - else: - lines.append(f" {date_str}: 0") - day_count += 1 + pairs = _extract_daily_values(trend) + values_nonzero = [v for _, v in pairs if v > 0] - if day_count > 0: - avg = total / day_count - lines.append(f"\nDurchschnitt: {avg:.0f} Besucher/Tag, Gesamt: {total}") + lines = [f"=== BESUCHERENTWICKLUNG ({days} Tage) ==="] + + # Rohdaten + for date_str, v in pairs: + marker = "" + if values_nonzero: + avg = mean(values_nonzero) + if v > avg * 2: + marker = " ★ Peak" + elif v > 0 and v < avg * 0.3: + marker = " ↓ Tief" + lines.append(f" {date_str}: {v} Besucher{marker}") + + # Zusammenfassung + if values_nonzero: + total = sum(values_nonzero) + avg = mean(values_nonzero) + best_day = max(pairs, key=lambda x: x[1]) + worst_day = min((p for p in pairs if p[1] > 0), key=lambda x: x[1], default=("?", 0)) + + lines.append(f"\n=== ZUSAMMENFASSUNG ===") + lines.append(f"Gesamt: {sum(v for _, v in pairs)} Besucher in {days} Tagen") + lines.append(f"Tage mit Besuchern: {len(values_nonzero)} von {len(pairs)}") + lines.append(f"Durchschnitt: {avg:.0f} Besucher/Tag") + lines.append(f"Bester Tag: {best_day[0]} ({best_day[1]} Besucher)") + lines.append(f"Schwächster Tag: {worst_day[0]} ({worst_day[1]} Besucher)") + + # Trend + trend_dir = _trend_direction(pairs) + lines.append(f"Trend-Richtung: {trend_dir}") + + # Wochentags-Muster + weekday_map = {} + for d, v in pairs: + try: + wd = datetime.strptime(d, "%Y-%m-%d").strftime("%A") + weekday_map.setdefault(wd, []).append(v) + except ValueError: + pass + + if weekday_map: + lines.append(f"\n=== WOCHENTAGS-MUSTER ===") + wd_order = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] + wd_de = {"Monday": "Mo", "Tuesday": "Di", "Wednesday": "Mi", "Thursday": "Do", + "Friday": "Fr", "Saturday": "Sa", "Sunday": "So"} + for wd in wd_order: + if wd in weekday_map: + vals = weekday_map[wd] + wd_avg = mean(vals) if vals else 0 + lines.append(f" {wd_de[wd]}: Ø {wd_avg:.0f} Besucher") + best_wd = max(weekday_map.items(), key=lambda x: mean(x[1]) if x[1] else 0) + worst_wd = min(weekday_map.items(), key=lambda x: mean(x[1]) if x[1] else float('inf')) + lines.append(f" → Bester Wochentag: {wd_de.get(best_wd[0], best_wd[0])} (Ø {mean(best_wd[1]):.0f})") + lines.append(f" → Schwaechster: {wd_de.get(worst_wd[0], worst_wd[0])} (Ø {mean(worst_wd[1]):.0f})") + + # Wachstums-Prognose + if len(values_nonzero) >= 14: + first_half = values_nonzero[:len(values_nonzero) // 2] + second_half = values_nonzero[len(values_nonzero) // 2:] + growth = _pct_change(mean(first_half), mean(second_half)) + if growth is not None: + lines.append(f"\n=== PROGNOSE ===") + if growth > 10: + lines.append(f"Wachstum erste→zweite Haelfte: +{growth:.0f}% — positiver Trend") + monthly_proj = avg * 30 + lines.append(f"Hochrechnung naechster Monat: ~{monthly_proj:.0f} Besucher") + elif growth > -10: + lines.append(f"Stabile Phase ({growth:+.0f}%) — Wachstum stagniert") + else: + lines.append(f"Ruecklaeufig ({growth:.0f}%) — Gegenmassnahmen pruefen") return "\n".join(lines) diff --git a/homelab-ai-bot/llm.py b/homelab-ai-bot/llm.py index e623c3ca..b6957e6a 100644 --- a/homelab-ai-bot/llm.py +++ b/homelab-ai-bot/llm.py @@ -146,6 +146,18 @@ Bei Reisen, Geld, Behoerden, Rechnungen, Buchungen: - NIEMALS praezise falsche Angaben machen. - Speichere nur HIGH-CONFIDENCE Daten via memory_suggest (Reiseplaene, Buchungscodes). +ANALYTICS-INTERPRETATION (Matomo): +Wenn du Analytics-Daten bekommst (get_matomo_analytics, get_matomo_trend), interpretiere sie QUALIFIZIERT: +- Nenne nicht nur Zahlen, sondern BEWERTE sie ("88% Bounce Rate ist schlecht", "42s Verweildauer ist zu kurz") +- Vergleiche IMMER mit der Vorwoche wenn Daten vorhanden ("30% mehr als letzte Woche") +- Nenne den TREND klar ("Traffic steigt seit 5 Tagen", "Ruecklaeufig seit Montag") +- Bei Peaks: Vermute WARUM ("Am 24.02. 147 Besucher — pruefe welcher Artikel viral ging") +- Bei hoher Abhaengigkeit von einer Quelle: WARNE ("80% kommt von Google — riskant") +- Gib 1-2 konkrete EMPFEHLUNGEN ("Bounce Rate senken: Ueberschriften verbessern, Ladezeit pruefen") +- Wochentags-Muster nutzen: "Dienstag ist dein staerkster Tag — poste neue Artikel dienstags" +- NICHT: endlose Zahlentabellen wiedergeben. Fasse zusammen, hebe das Wichtige hervor. +- Format: Kurze Absaetze, KEINE langen Listen. Wie ein Analytics-Berater der auf den Punkt kommt. + TOOLS: Nutze Tools fuer Live-Daten. Wenn alles OK: kurz sagen. Bei Problemen: erklaeren + Loesung."""