#!/usr/bin/env python3 """ Umami Analytics Client Self-hosted Umami integration with username/password authentication. Creates websites, gets tracking codes, and fetches analytics data. """ import os import sys import requests import argparse from datetime import datetime, timedelta from typing import Dict, Optional, List from pathlib import Path class UmamiClient: """Umami Analytics API client with username/password auth""" def __init__(self, umami_url: str, username: str = None, password: str = None, token: str = None): """ Initialize Umami client Args: umami_url: Umami instance URL (e.g., https://analytics.example.com) username: Umami username/email (for self-hosted) password: Umami password (for self-hosted) token: Bearer token (optional, if already have) """ self.umami_url = umami_url.rstrip('/') self.api_url = f"{self.umami_url}/api" self.username = username self.password = password self.token = token self.user_id = None # Auto-login if credentials provided if username and password and not token: self.login() def login(self) -> Dict: """Login to Umami and get bearer token""" try: url = f"{self.api_url}/auth/login" data = { 'username': self.username, 'password': self.password } response = requests.post(url, json=data) response.raise_for_status() result = response.json() if 'token' in result: self.token = result['token'] self.user_id = result.get('user', {}).get('id') return { 'success': True, 'token': self.token, 'user_id': self.user_id, 'username': result.get('user', {}).get('username') } else: return {'success': False, 'error': 'No token in response'} except Exception as e: return {'success': False, 'error': str(e)} def _get_headers(self) -> Dict: """Get request headers with auth""" if not self.token: if self.username and self.password: self.login() return { 'Authorization': f'Bearer {self.token}', 'Content-Type': 'application/json', 'Accept': 'application/json' } def create_website(self, name: str, domain: str) -> Dict: """ Create new Umami website Args: name: Website name domain: Website domain (full URL) Returns: Website creation result """ try: url = f"{self.api_url}/websites" data = { 'name': name, 'domain': domain } response = requests.post(url, json=data, headers=self._get_headers()) response.raise_for_status() result = response.json() return { 'success': True, 'website_id': result.get('id'), 'name': result.get('name'), 'domain': result.get('domain'), 'created_at': result.get('createdAt'), 'tracking_url': f"{self.umami_url}/script.js", 'tracking_script': self._get_tracking_script(result.get('id')) } except Exception as e: return {'success': False, 'error': str(e)} def get_website_by_domain(self, domain: str) -> Optional[Dict]: """Find website by domain""" try: websites = self.list_websites() for site in websites: if domain in site.get('domain', ''): return site return None except: return None def list_websites(self) -> List[Dict]: """Get all websites""" try: url = f"{self.api_url}/websites" response = requests.get(url, headers=self._get_headers()) response.raise_for_status() result = response.json() # Handle both array and paginated response if isinstance(result, list): return result elif 'data' in result: return result['data'] else: return [] except Exception as e: print(f"Error listing websites: {e}") return [] def get_stats(self, website_id: str, days: int = 30) -> Dict: """ Get website statistics Args: website_id: Umami website ID days: Number of days to look back Returns: Analytics stats """ try: end_date = datetime.now() start_date = end_date - timedelta(days=days) url = f"{self.api_url}/websites/{website_id}/stats" params = { 'startAt': int(start_date.timestamp() * 1000), 'endAt': int(end_date.timestamp() * 1000) } response = requests.get(url, headers=self._get_headers(), params=params) response.raise_for_status() stats = response.json() return { 'success': True, 'website_id': website_id, 'period': f'last_{days}_days', 'pageviews': stats.get('pageviews', 0), 'uniques': stats.get('uniques', 0), 'bounces': stats.get('bounces', 0), 'totaltime': stats.get('totaltime', 0), 'avg_session_duration': stats.get('totaltime', 0) / max(stats.get('visits', 1), 1), 'bounce_rate': stats.get('bounces', 0) / max(stats.get('visits', 1), 1) * 100 } except Exception as e: return {'success': False, 'error': str(e)} def _get_tracking_script(self, website_id: str) -> str: """Generate tracking script HTML""" return f'' def add_tracking_to_astro(self, website_repo: str, website_id: str) -> Dict: """ Add Umami tracking to Astro website Args: website_repo: Path to Astro website repo website_id: Umami website ID Returns: Result of adding tracking """ try: tracking_script = self._get_tracking_script(website_id) # Find Astro layout file layout_paths = [ os.path.join(website_repo, 'src/layouts/Layout.astro'), os.path.join(website_repo, 'src/layouts/BaseHead.astro'), os.path.join(website_repo, 'src/pages/_document.tsx'), os.path.join(website_repo, 'src/app.html') ] layout_file = None for path in layout_paths: if os.path.exists(path): layout_file = path break if not layout_file: # Try to find any .astro file in src/layouts layouts_dir = os.path.join(website_repo, 'src/layouts') if os.path.exists(layouts_dir): for f in os.listdir(layouts_dir): if f.endswith('.astro'): layout_file = os.path.join(layouts_dir, f) break if not layout_file: return {'success': False, 'error': 'No Astro layout file found'} # Read layout file with open(layout_file, 'r', encoding='utf-8') as f: content = f.read() # Add tracking before if '' in content: content = content.replace('', f' {tracking_script}\n ') else: # If no , add at end of file content += f'\n{tracking_script}\n' # Write back with open(layout_file, 'w', encoding='utf-8') as f: f.write(content) return { 'success': True, 'layout_file': layout_file, 'tracking_added': True } except Exception as e: return {'success': False, 'error': str(e)} def main(): """Main CLI entry point""" parser = argparse.ArgumentParser(description='Umami Analytics Client') parser.add_argument('--action', required=True, choices=['create-website', 'get-tracking', 'add-tracking', 'get-stats', 'list-websites']) parser.add_argument('--umami-url', required=True, help='Umami instance URL') parser.add_argument('--username', help='Umami username') parser.add_argument('--password', help='Umami password') parser.add_argument('--website-name', help='Website name (for create)') parser.add_argument('--website-domain', help='Website domain (for create/find)') parser.add_argument('--website-id', help='Website ID (for stats)') parser.add_argument('--website-repo', help='Path to website repo (for add-tracking)') parser.add_argument('--days', type=int, default=30, help='Days for stats') args = parser.parse_args() print(f"\nšŸ“Š Umami Analytics Client") print(f"URL: {args.umami_url}\n") # Initialize client client = UmamiClient(args.umami_url, args.username, args.password) if args.action == 'create-website': if not args.website_name or not args.website_domain: print("Error: --website-name and --website-domain required") return print(f"Creating website: {args.website_name} ({args.website_domain})") result = client.create_website(args.website_name, args.website_domain) if result['success']: print(f"\nāœ… Website created!") print(f" ID: {result['website_id']}") print(f" Name: {result['name']}") print(f" Domain: {result['domain']}") print(f" Tracking: {result['tracking_url']}") print(f"\nScript:\n{result['tracking_script']}") else: print(f"\nāŒ Failed: {result['error']}") elif args.action == 'get-tracking': if not args.website_id: print("Error: --website-id required") return script = client._get_tracking_script(args.website_id) print(f"\nTracking script for {args.website_id}:") print(script) elif args.action == 'add-tracking': if not args.website_id or not args.website_repo: print("Error: --website-id and --website-repo required") return print(f"Adding tracking to: {args.website_repo}") result = client.add_tracking_to_astro(args.website_repo, args.website_id) if result['success']: print(f"\nāœ… Tracking added!") print(f" Layout: {result['layout_file']}") else: print(f"\nāŒ Failed: {result['error']}") elif args.action == 'get-stats': if not args.website_id: print("Error: --website-id required") return print(f"Getting stats for last {args.days} days...") stats = client.get_stats(args.website_id, args.days) if stats['success']: print(f"\nšŸ“Š Analytics ({stats['period']}):") print(f" Pageviews: {stats['pageviews']:,}") print(f" Unique visitors: {stats['uniques']:,}") print(f" Bounces: {stats['bounces']:,}") print(f" Bounce rate: {stats['bounce_rate']:.1f}%") print(f" Avg session: {stats['avg_session_duration']:.1f}s") else: print(f"\nāŒ Failed: {stats['error']}") elif args.action == 'list-websites': print("Listing websites...") websites = client.list_websites() print(f"\nFound {len(websites)} websites:") for site in websites: print(f" • {site.get('name')} - {site.get('domain')}") if __name__ == '__main__': main()