215 lines
7.7 KiB
Python
215 lines
7.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Google Analytics 4 Connector
|
|
|
|
Fetch performance data from Google Analytics 4 API.
|
|
Requires service account credentials with GA4 read access.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional
|
|
from pathlib import Path
|
|
|
|
|
|
class GA4Connector:
|
|
"""Connect to Google Analytics 4 API"""
|
|
|
|
def __init__(self, property_id: str, credentials_path: str):
|
|
"""
|
|
Initialize GA4 connector
|
|
|
|
Args:
|
|
property_id: GA4 property ID (e.g., "G-XXXXXXXXXX")
|
|
credentials_path: Path to service account JSON file
|
|
"""
|
|
self.property_id = property_id
|
|
self.credentials_path = credentials_path
|
|
self.client = None
|
|
self._authenticate()
|
|
|
|
def _authenticate(self):
|
|
"""Authenticate with Google Analytics API"""
|
|
try:
|
|
from google.analytics.data_v1beta import BetaAnalyticsDataClient
|
|
from google.analytics.data_v1beta.types import DateRange, Metric, Dimension, RunReportRequest
|
|
from google.oauth2 import service_account
|
|
|
|
# Load credentials
|
|
if not os.path.exists(self.credentials_path):
|
|
raise FileNotFoundError(f"Credentials not found: {self.credentials_path}")
|
|
|
|
credentials = service_account.Credentials.from_service_account_file(
|
|
self.credentials_path,
|
|
scopes=["https://www.googleapis.com/auth/analytics.readonly"]
|
|
)
|
|
|
|
self.client = BetaAnalyticsDataClient(credentials=credentials)
|
|
self.types = {
|
|
'DateRange': DateRange,
|
|
'Metric': Metric,
|
|
'Dimension': Dimension,
|
|
'RunReportRequest': RunReportRequest
|
|
}
|
|
|
|
except ImportError as e:
|
|
raise ImportError(
|
|
"Google Analytics packages not installed. "
|
|
"Install with: pip install google-analytics-data google-auth google-auth-oauthlib"
|
|
) from e
|
|
except Exception as e:
|
|
raise Exception(f"Authentication failed: {e}") from e
|
|
|
|
def get_page_data(self, url: str, days: int = 30) -> Dict:
|
|
"""
|
|
Get page performance data
|
|
|
|
Args:
|
|
url: Page URL to analyze
|
|
days: Number of days to look back
|
|
|
|
Returns:
|
|
Dictionary with pageviews, sessions, engagement metrics
|
|
"""
|
|
if not self.client:
|
|
return {'error': 'Not authenticated'}
|
|
|
|
try:
|
|
# Calculate date range
|
|
end_date = datetime.now()
|
|
start_date = end_date - timedelta(days=days)
|
|
|
|
# Build request
|
|
request = self.types['RunReportRequest'](
|
|
property=f"properties/{self.property_id.replace('G-', '')}",
|
|
date_ranges=[self.types['DateRange'](
|
|
start_date=start_date.strftime("%Y-%m-%d"),
|
|
end_date=end_date.strftime("%Y-%m-%d")
|
|
)],
|
|
dimensions=[self.types['Dimension'](name="pagePath")],
|
|
metrics=[
|
|
self.types['Metric'](name="screenPageViews"),
|
|
self.types['Metric'](name="sessions"),
|
|
self.types['Metric'](name="averageSessionDuration"),
|
|
self.types['Metric'](name="bounceRate"),
|
|
self.types['Metric'](name="conversions")
|
|
],
|
|
dimension_filter={
|
|
'filter': {
|
|
'field_name': 'pagePath',
|
|
'string_filter': {
|
|
'match_type': 'CONTAINS',
|
|
'value': url
|
|
}
|
|
}
|
|
}
|
|
)
|
|
|
|
# Execute request
|
|
response = self.client.run_report(request)
|
|
|
|
# Parse response
|
|
if response.rows:
|
|
row = response.rows[0]
|
|
return {
|
|
'pageviews': int(row.metric_values[0].value),
|
|
'sessions': int(row.metric_values[1].value),
|
|
'avg_engagement_time': float(row.metric_values[2].value),
|
|
'bounce_rate': float(row.metric_values[3].value),
|
|
'conversions': int(row.metric_values[4].value)
|
|
}
|
|
else:
|
|
return {
|
|
'pageviews': 0,
|
|
'sessions': 0,
|
|
'avg_engagement_time': 0,
|
|
'bounce_rate': 0,
|
|
'conversions': 0,
|
|
'note': 'No data found for this URL'
|
|
}
|
|
|
|
except Exception as e:
|
|
return {'error': str(e)}
|
|
|
|
def get_top_pages(self, days: int = 30, limit: int = 10) -> List[Dict]:
|
|
"""Get top performing pages"""
|
|
if not self.client:
|
|
return []
|
|
|
|
try:
|
|
end_date = datetime.now()
|
|
start_date = end_date - timedelta(days=days)
|
|
|
|
request = self.types['RunReportRequest'](
|
|
property=f"properties/{self.property_id.replace('G-', '')}",
|
|
date_ranges=[self.types['DateRange'](
|
|
start_date=start_date.strftime("%Y-%m-%d"),
|
|
end_date=end_date.strftime("%Y-%m-%d")
|
|
)],
|
|
dimensions=[self.types['Dimension'](name="pagePath")],
|
|
metrics=[
|
|
self.types['Metric'](name="screenPageViews"),
|
|
self.types['Metric'](name="sessions"),
|
|
self.types['Metric'](name="averageSessionDuration")
|
|
],
|
|
order_bys=[{
|
|
'metric': {'metric_name': 'screenPageViews'},
|
|
'desc': True
|
|
}],
|
|
limit=limit
|
|
)
|
|
|
|
response = self.client.run_report(request)
|
|
|
|
pages = []
|
|
for row in response.rows:
|
|
pages.append({
|
|
'page': row.dimension_values[0].value,
|
|
'pageviews': int(row.metric_values[0].value),
|
|
'sessions': int(row.metric_values[1].value),
|
|
'avg_engagement': float(row.metric_values[2].value)
|
|
})
|
|
|
|
return pages
|
|
|
|
except Exception as e:
|
|
print(f"Error getting top pages: {e}")
|
|
return []
|
|
|
|
|
|
def main():
|
|
"""Test GA4 connector"""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description='Test GA4 Connector')
|
|
parser.add_argument('--property-id', required=True, help='GA4 Property ID')
|
|
parser.add_argument('--credentials', required=True, help='Path to credentials JSON')
|
|
parser.add_argument('--url', help='Page URL to analyze')
|
|
parser.add_argument('--days', type=int, default=30, help='Days to analyze')
|
|
|
|
args = parser.parse_args()
|
|
|
|
print(f"\n📊 Testing GA4 Connector")
|
|
print(f"Property: {args.property_id}\n")
|
|
|
|
try:
|
|
connector = GA4Connector(args.property_id, args.credentials)
|
|
|
|
if args.url:
|
|
print(f"Analyzing: {args.url}")
|
|
data = connector.get_page_data(args.url, args.days)
|
|
print(f"\nResults: {json.dumps(data, indent=2)}")
|
|
else:
|
|
print("Getting top pages...")
|
|
top_pages = connector.get_top_pages(args.days)
|
|
for i, page in enumerate(top_pages[:5], 1):
|
|
print(f"{i}. {page['page']}: {page['pageviews']:,} views")
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|