Files
opencode-skill/skills/umami/scripts/umami_client.py
Kunthawat Greethong 7524c29ce5 fix: Umami skill auto-load from unified .env
- Load credentials from ~/.config/opencode/.env automatically
- No need to configure .env in each skill directory
- Fixes issue where new skills couldn't find credentials
2026-03-09 13:50:10 +07:00

332 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Umami Analytics Client
Self-hosted Umami integration with username/password authentication.
Creates websites, gets tracking codes, and fetches analytics data.
Automatically loads credentials from:
- ~/.config/opencode/.env (unified .env)
- .env (local, for development)
"""
import os
import sys
import requests
import argparse
from datetime import datetime, timedelta
from typing import Dict, Optional, List
from pathlib import Path
from dotenv import load_dotenv
# Load credentials from unified .env
load_dotenv(os.path.expanduser('~/.config/opencode/.env'))
load_dotenv() # Also load local .env for development
class UmamiClient:
"""Umami Analytics API client with username/password auth"""
def __init__(self, umami_url: str = None, username: str = None, password: str = None, token: str = None):
"""
Initialize Umami client
Args:
umami_url: Umami instance URL (loaded from .env if not provided)
username: Umami username/email (loaded from .env if not provided)
password: Umami password (loaded from .env if not provided)
token: Bearer token (optional, if already have)
"""
# Load from environment if not provided
self.umami_url = umami_url or os.getenv('UMAMI_URL', '')
self.username = username or os.getenv('UMAMI_USERNAME', '')
self.password = password or os.getenv('UMAMI_PASSWORD', '')
self.token = token
self.user_id = None
# Auto-login if credentials provided
if self.username and self.password and not token:
self.login()
def login(self) -> Dict:
"""Login to Umami and get bearer token"""
try:
url = f"{self.umami_url}/api/auth/login"
data = {
'username': self.username,
'password': self.password
}
response = requests.post(url, json=data)
response.raise_for_status()
result = response.json()
if 'token' in result:
self.token = result['token']
self.user_id = result.get('user', {}).get('id')
return {
'success': True,
'token': self.token,
'user_id': self.user_id,
'username': result.get('user', {}).get('username')
}
else:
return {'success': False, 'error': 'No token in response'}
except Exception as e:
return {'success': False, 'error': str(e)}
def _get_headers(self) -> Dict:
"""Get request headers with auth"""
if not self.token:
if self.username and self.password:
self.login()
return {
'Authorization': f'Bearer {self.token}',
'Content-Type': 'application/json',
'Accept': 'application/json'
}
def create_website(self, name: str, domain: str) -> Dict:
"""Create new Umami website"""
try:
url = f"{self.umami_url}/api/websites"
data = {
'name': name,
'domain': domain
}
response = requests.post(url, json=data, headers=self._get_headers())
response.raise_for_status()
result = response.json()
return {
'success': True,
'website_id': result.get('id'),
'name': result.get('name'),
'domain': result.get('domain'),
'created_at': result.get('createdAt'),
'tracking_url': f"{self.umami_url}/script.js",
'tracking_script': self._get_tracking_script(result.get('id'))
}
except Exception as e:
return {'success': False, 'error': str(e)}
def get_website_by_domain(self, domain: str) -> Optional[Dict]:
"""Find website by domain"""
try:
websites = self.list_websites()
for site in websites:
if domain in site.get('domain', ''):
return site
return None
except:
return None
def list_websites(self) -> List[Dict]:
"""Get all websites"""
try:
url = f"{self.umami_url}/api/websites"
response = requests.get(url, headers=self._get_headers())
response.raise_for_status()
result = response.json()
# Handle both array and paginated response
if isinstance(result, list):
return result
elif 'data' in result:
return result['data']
else:
return []
except Exception as e:
print(f"Error listing websites: {e}")
return []
def get_stats(self, website_id: str, days: int = 30) -> Dict:
"""Get website statistics"""
try:
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
url = f"{self.umami_url}/api/websites/{website_id}/stats"
params = {
'startAt': int(start_date.timestamp() * 1000),
'endAt': int(end_date.timestamp() * 1000)
}
response = requests.get(url, headers=self._get_headers(), params=params)
response.raise_for_status()
stats = response.json()
return {
'success': True,
'website_id': website_id,
'period': f'last_{days}_days',
'pageviews': stats.get('pageviews', 0),
'uniques': stats.get('uniques', 0),
'bounces': stats.get('bounces', 0),
'totaltime': stats.get('totaltime', 0),
'avg_session_duration': stats.get('totaltime', 0) / max(stats.get('visits', 1), 1),
'bounce_rate': stats.get('bounces', 0) / max(stats.get('visits', 1), 1) * 100
}
except Exception as e:
return {'success': False, 'error': str(e)}
def _get_tracking_script(self, website_id: str) -> str:
"""Generate tracking script HTML"""
return f'<script defer src="{self.umami_url}/script.js" data-website-id="{website_id}"></script>'
def add_tracking_to_astro(self, website_repo: str, website_id: str) -> Dict:
"""Add Umami tracking to Astro website"""
try:
tracking_script = self._get_tracking_script(website_id)
# Find Astro layout file
layout_paths = [
os.path.join(website_repo, 'src/layouts/Layout.astro'),
os.path.join(website_repo, 'src/layouts/BaseHead.astro'),
os.path.join(website_repo, 'src/pages/_document.tsx'),
os.path.join(website_repo, 'src/app.html')
]
layout_file = None
for path in layout_paths:
if os.path.exists(path):
layout_file = path
break
if not layout_file:
layouts_dir = os.path.join(website_repo, 'src/layouts')
if os.path.exists(layouts_dir):
for f in os.listdir(layouts_dir):
if f.endswith('.astro'):
layout_file = os.path.join(layouts_dir, f)
break
if not layout_file:
return {'success': False, 'error': 'No Astro layout file found'}
with open(layout_file, 'r', encoding='utf-8') as f:
content = f.read()
if '</head>' in content:
content = content.replace('</head>', f' {tracking_script}\n </head>')
else:
content += f'\n{tracking_script}\n'
with open(layout_file, 'w', encoding='utf-8') as f:
f.write(content)
return {
'success': True,
'layout_file': layout_file,
'tracking_added': True
}
except Exception as e:
return {'success': False, 'error': str(e)}
def main():
"""Main CLI entry point"""
parser = argparse.ArgumentParser(description='Umami Analytics Client')
parser.add_argument('--action', required=True,
choices=['create-website', 'get-tracking', 'add-tracking', 'get-stats', 'list-websites'])
parser.add_argument('--umami-url', help='Umami instance URL (or set UMAMI_URL in .env)')
parser.add_argument('--username', help='Umami username (or set UMAMI_USERNAME in .env)')
parser.add_argument('--password', help='Umami password (or set UMAMI_PASSWORD in .env)')
parser.add_argument('--website-name', help='Website name (for create)')
parser.add_argument('--website-domain', help='Website domain (for create/find)')
parser.add_argument('--website-id', help='Website ID (for stats)')
parser.add_argument('--website-repo', help='Path to website repo (for add-tracking)')
parser.add_argument('--days', type=int, default=30, help='Days for stats')
args = parser.parse_args()
print(f"\n📊 Umami Analytics Client")
print(f"URL: {args.umami_url or os.getenv('UMAMI_URL', 'Not set')}\n")
# Initialize client (loads credentials from .env automatically)
client = UmamiClient(
umami_url=args.umami_url,
username=args.username,
password=args.password
)
if args.action == 'create-website':
if not args.website_name or not args.website_domain:
print("Error: --website-name and --website-domain required")
return
print(f"Creating website: {args.website_name} ({args.website_domain})")
result = client.create_website(args.website_name, args.website_domain)
if result['success']:
print(f"\n✅ Website created!")
print(f" ID: {result['website_id']}")
print(f" Name: {result['name']}")
print(f" Domain: {result['domain']}")
print(f" Tracking: {result['tracking_url']}")
print(f"\nScript:\n{result['tracking_script']}")
else:
print(f"\n❌ Failed: {result['error']}")
elif args.action == 'get-tracking':
if not args.website_id:
print("Error: --website-id required")
return
script = client._get_tracking_script(args.website_id)
print(f"\nTracking script for {args.website_id}:")
print(script)
elif args.action == 'add-tracking':
if not args.website_id or not args.website_repo:
print("Error: --website-id and --website-repo required")
return
print(f"Adding tracking to: {args.website_repo}")
result = client.add_tracking_to_astro(args.website_repo, args.website_id)
if result['success']:
print(f"\n✅ Tracking added!")
print(f" Layout: {result['layout_file']}")
else:
print(f"\n❌ Failed: {result['error']}")
elif args.action == 'get-stats':
if not args.website_id:
print("Error: --website-id required")
return
print(f"Getting stats for last {args.days} days...")
stats = client.get_stats(args.website_id, args.days)
if stats['success']:
print(f"\n📊 Analytics ({stats['period']}):")
print(f" Pageviews: {stats['pageviews']:,}")
print(f" Unique visitors: {stats['uniques']:,}")
print(f" Bounces: {stats['bounces']:,}")
print(f" Bounce rate: {stats['bounce_rate']:.1f}%")
print(f" Avg session: {stats['avg_session_duration']:.1f}s")
else:
print(f"\n❌ Failed: {stats['error']}")
elif args.action == 'list-websites':
print("Listing websites...")
websites = client.list_websites()
print(f"\nFound {len(websites)} websites:")
for site in websites:
print(f"{site.get('name')} - {site.get('domain')}")
if __name__ == '__main__':
main()