435 lines
16 KiB
Python
435 lines
16 KiB
Python
"""Matomo Analytics API Client — Besucherstatistiken + qualifizierte Auswertung."""
|
|
|
|
import requests
|
|
from datetime import datetime, timedelta
|
|
from statistics import mean, stdev
|
|
|
|
MATOMO_URL = ""
|
|
MATOMO_TOKEN = ""
|
|
MATOMO_SITE_ID = "1"
|
|
|
|
|
|
def init(cfg):
|
|
global MATOMO_URL, MATOMO_TOKEN, MATOMO_SITE_ID
|
|
MATOMO_URL = cfg.raw.get("MATOMO_URL", "")
|
|
MATOMO_TOKEN = cfg.raw.get("MATOMO_TOKEN", "")
|
|
MATOMO_SITE_ID = cfg.raw.get("MATOMO_SITE_ID", "1")
|
|
return bool(MATOMO_URL and MATOMO_TOKEN)
|
|
|
|
|
|
def _api(method: str, **params) -> dict | list | None:
|
|
try:
|
|
p = {
|
|
"module": "API",
|
|
"method": method,
|
|
"idSite": MATOMO_SITE_ID,
|
|
"format": "json",
|
|
"token_auth": MATOMO_TOKEN,
|
|
}
|
|
p.update(params)
|
|
resp = requests.get(f"{MATOMO_URL}/index.php", params=p, timeout=15)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
|
|
# --------------- Raw API calls ---------------
|
|
|
|
def get_summary(period: str = "day", date: str = "today") -> dict:
|
|
return _api("VisitsSummary.get", period=period, date=date)
|
|
|
|
|
|
def get_visitor_trend(days: int = 30) -> dict:
|
|
return _api("VisitsSummary.get", period="day", date=f"last{days}")
|
|
|
|
|
|
def get_top_pages(period: str = "day", date: str = "today", limit: int = 10) -> list:
|
|
result = _api("Actions.getPageUrls", period=period, date=date,
|
|
filter_limit=limit, flat=1)
|
|
if isinstance(result, dict) and "error" in result:
|
|
return []
|
|
return result if isinstance(result, list) else []
|
|
|
|
|
|
def get_referrers(period: str = "day", date: str = "today", limit: int = 10) -> list:
|
|
result = _api("Referrers.getReferrerType", period=period, date=date,
|
|
filter_limit=limit)
|
|
if isinstance(result, dict) and "error" in result:
|
|
return []
|
|
return result if isinstance(result, list) else []
|
|
|
|
|
|
def get_countries(period: str = "day", date: str = "today", limit: int = 10) -> list:
|
|
result = _api("UserCountry.getCountry", period=period, date=date,
|
|
filter_limit=limit)
|
|
if isinstance(result, dict) and "error" in result:
|
|
return []
|
|
return result if isinstance(result, list) else []
|
|
|
|
|
|
def get_devices(period: str = "day", date: str = "today") -> list:
|
|
result = _api("DevicesDetection.getType", period=period, date=date)
|
|
if isinstance(result, dict) and "error" in result:
|
|
return []
|
|
return result if isinstance(result, list) else []
|
|
|
|
|
|
# --------------- Analyse-Funktionen ---------------
|
|
|
|
def _extract_daily_values(trend_data: dict, key: str = "nb_uniq_visitors") -> list[tuple[str, int]]:
|
|
"""Extrahiert (datum, wert) Paare, sortiert nach Datum."""
|
|
pairs = []
|
|
for date_str, data in sorted(trend_data.items()):
|
|
if isinstance(data, dict):
|
|
pairs.append((date_str, data.get(key, 0)))
|
|
else:
|
|
pairs.append((date_str, 0))
|
|
return pairs
|
|
|
|
|
|
def _week_values(pairs: list[tuple[str, int]], weeks_ago: int = 0) -> list[int]:
|
|
"""Werte der letzten N-ten Woche (0 = aktuelle, 1 = letzte)."""
|
|
if not pairs:
|
|
return []
|
|
end_idx = len(pairs) - (weeks_ago * 7)
|
|
start_idx = max(0, end_idx - 7)
|
|
if end_idx <= 0:
|
|
return []
|
|
return [v for _, v in pairs[start_idx:end_idx]]
|
|
|
|
|
|
def _pct_change(old: float, new: float) -> float | None:
|
|
if old == 0:
|
|
return None
|
|
return ((new - old) / old) * 100
|
|
|
|
|
|
def _classify_bounce(rate_str: str) -> str:
|
|
try:
|
|
rate = int(rate_str.replace("%", ""))
|
|
except (ValueError, AttributeError):
|
|
return "unbekannt"
|
|
if rate <= 40:
|
|
return "sehr gut (unter 40%)"
|
|
elif rate <= 55:
|
|
return "gut (40-55%)"
|
|
elif rate <= 70:
|
|
return "durchschnittlich (55-70%)"
|
|
elif rate <= 85:
|
|
return "schlecht (70-85%) — Besucher springen schnell ab"
|
|
else:
|
|
return "sehr schlecht (ueber 85%) — fast alle Besucher verlassen die Seite sofort"
|
|
|
|
|
|
def _classify_avg_time(seconds: int) -> str:
|
|
if seconds >= 180:
|
|
return f"sehr gut ({seconds // 60}m {seconds % 60}s) — Besucher lesen ausfuehrlich"
|
|
elif seconds >= 90:
|
|
return f"gut ({seconds // 60}m {seconds % 60}s) — Besucher bleiben eine Weile"
|
|
elif seconds >= 45:
|
|
return f"maessig ({seconds}s) — Besucher ueberfliegen nur"
|
|
else:
|
|
return f"schlecht ({seconds}s) — Besucher verlassen sofort"
|
|
|
|
|
|
def _find_outliers(pairs: list[tuple[str, int]]) -> dict:
|
|
"""Findet Tage mit ungewoehnlich hohem/niedrigem Traffic."""
|
|
values = [v for _, v in pairs if v > 0]
|
|
if len(values) < 7:
|
|
return {"peaks": [], "dips": []}
|
|
|
|
avg = mean(values)
|
|
sd = stdev(values) if len(values) > 1 else 0
|
|
|
|
peaks = [(d, v) for d, v in pairs if v > avg + 1.5 * sd and v > 0]
|
|
dips = [(d, v) for d, v in pairs if 0 < v < max(avg - 1.5 * sd, 1)]
|
|
|
|
return {"peaks": peaks, "dips": dips, "avg": avg, "sd": sd}
|
|
|
|
|
|
def _trend_direction(pairs: list[tuple[str, int]], window: int = 7) -> str:
|
|
"""Bestimmt Trend-Richtung anhand der letzten N Tage vs. davor."""
|
|
if len(pairs) < window * 2:
|
|
return "zu wenig Daten"
|
|
|
|
recent = [v for _, v in pairs[-window:]]
|
|
previous = [v for _, v in pairs[-window * 2:-window]]
|
|
|
|
avg_recent = mean(recent) if recent else 0
|
|
avg_previous = mean(previous) if previous else 0
|
|
|
|
pct = _pct_change(avg_previous, avg_recent)
|
|
if pct is None:
|
|
return "vorher keine Besucher"
|
|
|
|
if pct > 20:
|
|
return f"stark steigend (+{pct:.0f}%)"
|
|
elif pct > 5:
|
|
return f"leicht steigend (+{pct:.0f}%)"
|
|
elif pct > -5:
|
|
return f"stabil ({pct:+.0f}%)"
|
|
elif pct > -20:
|
|
return f"leicht fallend ({pct:.0f}%)"
|
|
else:
|
|
return f"stark fallend ({pct:.0f}%)"
|
|
|
|
|
|
def _analyze_referrers(referrers: list) -> list[str]:
|
|
"""Qualifizierte Aussagen ueber Traffic-Quellen."""
|
|
insights = []
|
|
if not referrers:
|
|
return insights
|
|
|
|
total = sum(r.get("nb_visits", 0) for r in referrers)
|
|
if total == 0:
|
|
return insights
|
|
|
|
for r in referrers:
|
|
label = r.get("label", "?")
|
|
visits = r.get("nb_visits", 0)
|
|
share = (visits / total * 100) if total > 0 else 0
|
|
|
|
if share > 70:
|
|
insights.append(f"WARNUNG: {share:.0f}% des Traffics kommt von '{label}' — hohe Abhaengigkeit")
|
|
elif share > 50:
|
|
insights.append(f"'{label}' dominiert mit {share:.0f}% — Diversifikation empfohlen")
|
|
|
|
search = next((r for r in referrers if "search" in r.get("label", "").lower()), None)
|
|
direct = next((r for r in referrers if "direct" in r.get("label", "").lower()), None)
|
|
|
|
if search and direct and total > 0:
|
|
search_share = search.get("nb_visits", 0) / total * 100
|
|
direct_share = direct.get("nb_visits", 0) / total * 100
|
|
if direct_share > 30:
|
|
insights.append(f"{direct_share:.0f}% Direktzugriffe — gutes Zeichen fuer Stammleser")
|
|
if search_share < 10 and total > 20:
|
|
insights.append(f"Nur {search_share:.0f}% Suchmaschinen-Traffic — SEO verbessern?")
|
|
|
|
return insights
|
|
|
|
|
|
def _analyze_top_pages(pages: list) -> list[str]:
|
|
"""Qualifizierte Aussagen ueber Content-Performance."""
|
|
insights = []
|
|
if not pages or len(pages) < 2:
|
|
return insights
|
|
|
|
total = sum(p.get("nb_hits", 0) for p in pages)
|
|
top = pages[0]
|
|
top_hits = top.get("nb_hits", 0)
|
|
top_label = top.get("label", "?")
|
|
|
|
if total > 0 and top_hits > 0:
|
|
top_share = top_hits / total * 100
|
|
if top_share > 50:
|
|
insights.append(f"'{top_label}' hat {top_share:.0f}% aller Aufrufe — ein klarer Hit")
|
|
elif top_share > 30:
|
|
insights.append(f"'{top_label}' ist der staerkste Artikel ({top_share:.0f}%)")
|
|
|
|
avg_hits = total / len(pages) if pages else 0
|
|
strong = [p for p in pages if p.get("nb_hits", 0) > avg_hits * 2]
|
|
if len(strong) > 1:
|
|
insights.append(f"{len(strong)} Artikel performen ueberdurchschnittlich")
|
|
|
|
return insights
|
|
|
|
|
|
# --------------- Format-Funktionen (Output fuer LLM) ---------------
|
|
|
|
def format_analytics(period: str = "day", date: str = "today") -> str:
|
|
"""Qualifizierter Analytics-Report mit Bewertungen und Vergleichen."""
|
|
lines = []
|
|
|
|
# --- Heute ---
|
|
summary = get_summary(period, date)
|
|
if isinstance(summary, dict) and "error" not in summary:
|
|
visitors = summary.get("nb_uniq_visitors", 0)
|
|
visits = summary.get("nb_visits", 0)
|
|
actions = summary.get("nb_actions", 0)
|
|
bounce = summary.get("bounce_rate", "?")
|
|
avg_time = int(summary.get("avg_time_on_site", 0))
|
|
actions_per = summary.get("nb_actions_per_visit", 0)
|
|
|
|
lines.append(f"=== HEUTE ({date}) ===")
|
|
lines.append(f"Besucher: {visitors} unique, {visits} visits, {actions} Seitenaufrufe")
|
|
lines.append(f"Seiten/Besuch: {actions_per}")
|
|
lines.append(f"Bounce Rate: {bounce} — Bewertung: {_classify_bounce(bounce)}")
|
|
lines.append(f"Verweildauer: {_classify_avg_time(avg_time)}")
|
|
else:
|
|
return f"Matomo nicht erreichbar: {summary}"
|
|
|
|
# --- Woche-ueber-Woche ---
|
|
trend_data = get_visitor_trend(21)
|
|
if isinstance(trend_data, dict) and "error" not in trend_data:
|
|
pairs = _extract_daily_values(trend_data)
|
|
|
|
this_week = _week_values(pairs, 0)
|
|
last_week = _week_values(pairs, 1)
|
|
|
|
if this_week and last_week:
|
|
avg_this = mean(this_week)
|
|
avg_last = mean(last_week)
|
|
pct = _pct_change(avg_last, avg_this)
|
|
|
|
lines.append(f"\n=== WOCHENVERGLEICH ===")
|
|
lines.append(f"Diese Woche: Ø {avg_this:.0f} Besucher/Tag (Summe: {sum(this_week)})")
|
|
lines.append(f"Letzte Woche: Ø {avg_last:.0f} Besucher/Tag (Summe: {sum(last_week)})")
|
|
if pct is not None:
|
|
direction = "mehr" if pct > 0 else "weniger"
|
|
lines.append(f"Veraenderung: {pct:+.0f}% ({direction} als letzte Woche)")
|
|
|
|
# Trend-Richtung
|
|
trend_dir = _trend_direction(pairs)
|
|
lines.append(f"Trend (7 Tage vs. davor): {trend_dir}")
|
|
|
|
# Ausreisser
|
|
outliers = _find_outliers(pairs)
|
|
if outliers.get("peaks"):
|
|
for d, v in outliers["peaks"][:2]:
|
|
lines.append(f"Peak: {d} mit {v} Besuchern (Ø {outliers['avg']:.0f})")
|
|
if outliers.get("dips"):
|
|
for d, v in outliers["dips"][:2]:
|
|
lines.append(f"Tief: {d} mit nur {v} Besuchern")
|
|
|
|
# --- Top Seiten ---
|
|
pages = get_top_pages(period, "today", 10)
|
|
if pages:
|
|
lines.append(f"\n=== TOP SEITEN (heute) ===")
|
|
for p in pages[:5]:
|
|
label = p.get("label", "?")
|
|
hits = p.get("nb_hits", 0)
|
|
avg_time_page = int(p.get("avg_time_on_page", 0))
|
|
lines.append(f" {label}: {hits}x Aufrufe, {avg_time_page}s Lesezeit")
|
|
|
|
page_insights = _analyze_top_pages(pages)
|
|
for ins in page_insights:
|
|
lines.append(f" → {ins}")
|
|
|
|
# --- Traffic-Quellen ---
|
|
referrers = get_referrers(period, "today", 10)
|
|
if referrers:
|
|
lines.append(f"\n=== TRAFFIC-QUELLEN (heute) ===")
|
|
total_ref = sum(r.get("nb_visits", 0) for r in referrers)
|
|
for r in referrers[:5]:
|
|
label = r.get("label", "?")
|
|
visits = r.get("nb_visits", 0)
|
|
share = (visits / total_ref * 100) if total_ref > 0 else 0
|
|
lines.append(f" {label}: {visits} visits ({share:.0f}%)")
|
|
|
|
ref_insights = _analyze_referrers(referrers)
|
|
for ins in ref_insights:
|
|
lines.append(f" → {ins}")
|
|
|
|
# --- Laender ---
|
|
countries = get_countries(period, "today", 5)
|
|
if countries:
|
|
lines.append(f"\n=== LAENDER (heute) ===")
|
|
for c in countries[:5]:
|
|
label = c.get("label", "?")
|
|
visits = c.get("nb_visits", 0)
|
|
lines.append(f" {label}: {visits}")
|
|
|
|
# --- Geraete ---
|
|
devices = get_devices(period, "today")
|
|
if devices:
|
|
lines.append(f"\n=== GERAETE (heute) ===")
|
|
for d in devices[:4]:
|
|
label = d.get("label", "?")
|
|
visits = d.get("nb_visits", 0)
|
|
lines.append(f" {label}: {visits}")
|
|
mobile = sum(d.get("nb_visits", 0) for d in devices if "mobile" in d.get("label", "").lower() or "smartphone" in d.get("label", "").lower())
|
|
desktop = sum(d.get("nb_visits", 0) for d in devices if "desktop" in d.get("label", "").lower())
|
|
total_dev = sum(d.get("nb_visits", 0) for d in devices)
|
|
if total_dev > 0:
|
|
mob_pct = mobile / total_dev * 100
|
|
lines.append(f" Mobile-Anteil: {mob_pct:.0f}%")
|
|
if mob_pct > 60:
|
|
lines.append(f" → Mehrheit mobil — Mobile-Optimierung wichtig")
|
|
elif mob_pct < 20:
|
|
lines.append(f" → Fast nur Desktop-Nutzer")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_trend(days: int = 30) -> str:
|
|
"""Besucherentwicklung mit qualifizierter Analyse."""
|
|
trend = get_visitor_trend(days)
|
|
if isinstance(trend, dict) and "error" in trend:
|
|
return f"Matomo-Fehler: {trend['error']}"
|
|
|
|
pairs = _extract_daily_values(trend)
|
|
values_nonzero = [v for _, v in pairs if v > 0]
|
|
|
|
lines = [f"=== BESUCHERENTWICKLUNG ({days} Tage) ==="]
|
|
|
|
# Rohdaten
|
|
for date_str, v in pairs:
|
|
marker = ""
|
|
if values_nonzero:
|
|
avg = mean(values_nonzero)
|
|
if v > avg * 2:
|
|
marker = " ★ Peak"
|
|
elif v > 0 and v < avg * 0.3:
|
|
marker = " ↓ Tief"
|
|
lines.append(f" {date_str}: {v} Besucher{marker}")
|
|
|
|
# Zusammenfassung
|
|
if values_nonzero:
|
|
total = sum(values_nonzero)
|
|
avg = mean(values_nonzero)
|
|
best_day = max(pairs, key=lambda x: x[1])
|
|
worst_day = min((p for p in pairs if p[1] > 0), key=lambda x: x[1], default=("?", 0))
|
|
|
|
lines.append(f"\n=== ZUSAMMENFASSUNG ===")
|
|
lines.append(f"Gesamt: {sum(v for _, v in pairs)} Besucher in {days} Tagen")
|
|
lines.append(f"Tage mit Besuchern: {len(values_nonzero)} von {len(pairs)}")
|
|
lines.append(f"Durchschnitt: {avg:.0f} Besucher/Tag")
|
|
lines.append(f"Bester Tag: {best_day[0]} ({best_day[1]} Besucher)")
|
|
lines.append(f"Schwächster Tag: {worst_day[0]} ({worst_day[1]} Besucher)")
|
|
|
|
# Trend
|
|
trend_dir = _trend_direction(pairs)
|
|
lines.append(f"Trend-Richtung: {trend_dir}")
|
|
|
|
# Wochentags-Muster
|
|
weekday_map = {}
|
|
for d, v in pairs:
|
|
try:
|
|
wd = datetime.strptime(d, "%Y-%m-%d").strftime("%A")
|
|
weekday_map.setdefault(wd, []).append(v)
|
|
except ValueError:
|
|
pass
|
|
|
|
if weekday_map:
|
|
lines.append(f"\n=== WOCHENTAGS-MUSTER ===")
|
|
wd_order = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
|
|
wd_de = {"Monday": "Mo", "Tuesday": "Di", "Wednesday": "Mi", "Thursday": "Do",
|
|
"Friday": "Fr", "Saturday": "Sa", "Sunday": "So"}
|
|
for wd in wd_order:
|
|
if wd in weekday_map:
|
|
vals = weekday_map[wd]
|
|
wd_avg = mean(vals) if vals else 0
|
|
lines.append(f" {wd_de[wd]}: Ø {wd_avg:.0f} Besucher")
|
|
best_wd = max(weekday_map.items(), key=lambda x: mean(x[1]) if x[1] else 0)
|
|
worst_wd = min(weekday_map.items(), key=lambda x: mean(x[1]) if x[1] else float('inf'))
|
|
lines.append(f" → Bester Wochentag: {wd_de.get(best_wd[0], best_wd[0])} (Ø {mean(best_wd[1]):.0f})")
|
|
lines.append(f" → Schwaechster: {wd_de.get(worst_wd[0], worst_wd[0])} (Ø {mean(worst_wd[1]):.0f})")
|
|
|
|
# Wachstums-Prognose
|
|
if len(values_nonzero) >= 14:
|
|
first_half = values_nonzero[:len(values_nonzero) // 2]
|
|
second_half = values_nonzero[len(values_nonzero) // 2:]
|
|
growth = _pct_change(mean(first_half), mean(second_half))
|
|
if growth is not None:
|
|
lines.append(f"\n=== PROGNOSE ===")
|
|
if growth > 10:
|
|
lines.append(f"Wachstum erste→zweite Haelfte: +{growth:.0f}% — positiver Trend")
|
|
monthly_proj = avg * 30
|
|
lines.append(f"Hochrechnung naechster Monat: ~{monthly_proj:.0f} Besucher")
|
|
elif growth > -10:
|
|
lines.append(f"Stabile Phase ({growth:+.0f}%) — Wachstum stagniert")
|
|
else:
|
|
lines.append(f"Ruecklaeufig ({growth:.0f}%) — Gegenmassnahmen pruefen")
|
|
|
|
return "\n".join(lines)
|