204 lines
7.8 KiB
Python
204 lines
7.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Generate Test Monitoring Data
|
|
Creates sample API monitoring data to demonstrate the dashboard charts and animations.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import random
|
|
from datetime import datetime, timedelta
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import text
|
|
|
|
# Add the backend directory to the path
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from services.database import get_db
|
|
from models.api_monitoring import APIRequest, APIEndpointStats
|
|
from loguru import logger
|
|
|
|
def generate_test_monitoring_data():
|
|
"""Generate test monitoring data for demonstration."""
|
|
logger.info("🎯 Generating test monitoring data...")
|
|
|
|
db = next(get_db())
|
|
|
|
try:
|
|
# Sample endpoints
|
|
endpoints = [
|
|
("GET", "/api/content-planning/strategies"),
|
|
("POST", "/api/content-planning/calendar-generation/start"),
|
|
("GET", "/api/content-planning/monitoring/lightweight-stats"),
|
|
("GET", "/api/content-planning/health"),
|
|
("POST", "/api/content-planning/ai-analytics/analyze"),
|
|
("GET", "/api/content-planning/gap-analysis"),
|
|
("PUT", "/api/content-planning/strategies/1"),
|
|
("DELETE", "/api/content-planning/strategies/2"),
|
|
]
|
|
|
|
# Generate requests for the last 30 minutes
|
|
now = datetime.utcnow()
|
|
start_time = now - timedelta(minutes=30)
|
|
|
|
logger.info(f"📊 Generating data from {start_time} to {now}")
|
|
|
|
for i in range(100): # Generate 100 requests
|
|
# Random time within the last 30 minutes
|
|
timestamp = start_time + timedelta(
|
|
seconds=random.randint(0, 30 * 60)
|
|
)
|
|
|
|
# Random endpoint
|
|
method, path = random.choice(endpoints)
|
|
|
|
# Random status code (mostly 200, some errors)
|
|
if random.random() < 0.9: # 90% success rate
|
|
status_code = 200
|
|
else:
|
|
status_code = random.choice([400, 401, 403, 404, 500, 502, 503])
|
|
|
|
# Random duration (0.1 to 2.0 seconds)
|
|
duration = random.uniform(0.1, 2.0)
|
|
|
|
# Random cache hit
|
|
cache_hit = random.choice([True, False, None])
|
|
|
|
# Create API request
|
|
api_request = APIRequest(
|
|
path=path,
|
|
method=method,
|
|
status_code=status_code,
|
|
duration=duration,
|
|
user_id=f"user_{random.randint(1, 10)}",
|
|
cache_hit=cache_hit,
|
|
request_size=random.randint(100, 5000),
|
|
response_size=random.randint(500, 10000),
|
|
user_agent="Mozilla/5.0 (Test Browser)",
|
|
ip_address=f"192.168.1.{random.randint(1, 255)}",
|
|
timestamp=timestamp
|
|
)
|
|
db.add(api_request)
|
|
|
|
# Generate endpoint stats
|
|
for method, path in endpoints:
|
|
endpoint_key = f"{method} {path}"
|
|
|
|
# Check if stats already exist
|
|
existing_stats = db.query(APIEndpointStats).filter(
|
|
APIEndpointStats.endpoint == endpoint_key
|
|
).first()
|
|
|
|
if existing_stats:
|
|
# Update existing stats
|
|
total_requests = random.randint(50, 200)
|
|
total_errors = random.randint(0, total_requests // 10)
|
|
total_duration = random.uniform(10.0, 100.0)
|
|
|
|
existing_stats.total_requests = total_requests
|
|
existing_stats.total_errors = total_errors
|
|
existing_stats.total_duration = total_duration
|
|
existing_stats.avg_duration = total_duration / total_requests
|
|
existing_stats.min_duration = random.uniform(0.05, 0.5)
|
|
existing_stats.max_duration = random.uniform(1.0, 3.0)
|
|
existing_stats.cache_hits = random.randint(0, total_requests // 2)
|
|
existing_stats.cache_misses = random.randint(0, total_requests // 3)
|
|
existing_stats.last_called = now
|
|
|
|
if existing_stats.cache_hits + existing_stats.cache_misses > 0:
|
|
existing_stats.cache_hit_rate = (
|
|
existing_stats.cache_hits /
|
|
(existing_stats.cache_hits + existing_stats.cache_misses)
|
|
) * 100
|
|
else:
|
|
# Create new stats
|
|
total_requests = random.randint(50, 200)
|
|
total_errors = random.randint(0, total_requests // 10)
|
|
total_duration = random.uniform(10.0, 100.0)
|
|
cache_hits = random.randint(0, total_requests // 2)
|
|
cache_misses = random.randint(0, total_requests // 3)
|
|
|
|
endpoint_stats = APIEndpointStats(
|
|
endpoint=endpoint_key,
|
|
total_requests=total_requests,
|
|
total_errors=total_errors,
|
|
total_duration=total_duration,
|
|
avg_duration=total_duration / total_requests,
|
|
min_duration=random.uniform(0.05, 0.5),
|
|
max_duration=random.uniform(1.0, 3.0),
|
|
cache_hits=cache_hits,
|
|
cache_misses=cache_misses,
|
|
cache_hit_rate=(cache_hits / (cache_hits + cache_misses)) * 100 if (cache_hits + cache_misses) > 0 else 0,
|
|
last_called=now
|
|
)
|
|
db.add(endpoint_stats)
|
|
|
|
db.commit()
|
|
logger.info("✅ Test monitoring data generated successfully!")
|
|
|
|
# Show summary
|
|
total_requests = db.query(APIRequest).count()
|
|
total_errors = db.query(APIRequest).filter(APIRequest.status_code >= 400).count()
|
|
total_endpoints = db.query(APIEndpointStats).count()
|
|
|
|
logger.info(f"📈 Generated {total_requests} API requests")
|
|
logger.info(f"❌ Generated {total_errors} error requests")
|
|
logger.info(f"🔗 Generated stats for {total_endpoints} endpoints")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error generating test data: {str(e)}")
|
|
db.rollback()
|
|
return False
|
|
finally:
|
|
db.close()
|
|
|
|
def clear_test_data():
|
|
"""Clear all test monitoring data."""
|
|
logger.info("🗑️ Clearing test monitoring data...")
|
|
|
|
db = next(get_db())
|
|
|
|
try:
|
|
# Clear all data
|
|
db.execute(text("DELETE FROM api_requests"))
|
|
db.execute(text("DELETE FROM api_endpoint_stats"))
|
|
db.execute(text("DELETE FROM system_health"))
|
|
db.execute(text("DELETE FROM cache_performance"))
|
|
|
|
db.commit()
|
|
logger.info("✅ Test monitoring data cleared successfully!")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error clearing test data: {str(e)}")
|
|
db.rollback()
|
|
return False
|
|
finally:
|
|
db.close()
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Generate test monitoring data")
|
|
parser.add_argument("--action", choices=["generate", "clear"], default="generate",
|
|
help="Action to perform (generate or clear test data)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.action == "generate":
|
|
success = generate_test_monitoring_data()
|
|
if success:
|
|
logger.info("🎉 Test data generation completed successfully!")
|
|
else:
|
|
logger.error("💥 Test data generation failed!")
|
|
sys.exit(1)
|
|
elif args.action == "clear":
|
|
success = clear_test_data()
|
|
if success:
|
|
logger.info("🗑️ Test data cleared successfully!")
|
|
else:
|
|
logger.error("💥 Failed to clear test data!")
|
|
sys.exit(1)
|