Added enhanced linguistic analyzer and persona quality improver

This commit is contained in:
ajaysi
2025-09-14 10:21:36 +05:30
parent fe277afc62
commit 380bb19673
16 changed files with 34 additions and 308 deletions

View File

@@ -0,0 +1,43 @@
import requests
import json
# Test the research endpoint with more detailed output
url = "http://localhost:8000/api/blog/research"
payload = {
"keywords": ["AI content generation", "blog writing"],
"topic": "ALwrity content generation",
"industry": "Technology",
"target_audience": "content creators"
}
try:
print("Sending request to research endpoint...")
response = requests.post(url, json=payload, timeout=60)
print(f"Status Code: {response.status_code}")
if response.status_code == 200:
data = response.json()
print("\n=== FULL RESPONSE ===")
print(json.dumps(data, indent=2))
# Check if we got the expected fields
expected_fields = ['success', 'sources', 'keyword_analysis', 'competitor_analysis', 'suggested_angles', 'search_widget', 'search_queries']
print(f"\n=== FIELD ANALYSIS ===")
for field in expected_fields:
value = data.get(field)
if field == 'sources':
print(f"{field}: {len(value) if value else 0} items")
elif field == 'search_queries':
print(f"{field}: {len(value) if value else 0} items")
elif field == 'search_widget':
print(f"{field}: {'Present' if value else 'Missing'}")
else:
print(f"{field}: {type(value).__name__} - {str(value)[:100]}...")
else:
print(f"Error Response: {response.text}")
except Exception as e:
print(f"Request failed: {e}")
import traceback
traceback.print_exc()

View File

@@ -0,0 +1,60 @@
import asyncio
from services.llm_providers.gemini_grounded_provider import GeminiGroundedProvider
async def test_gemini_direct():
gemini = GeminiGroundedProvider()
prompt = """
Research the topic "AI content generation" in the Technology industry for content creators audience. Provide a comprehensive analysis including:
1. Current trends and insights (2024-2025)
2. Key statistics and data points with sources
3. Industry expert opinions and quotes
4. Recent developments and news
5. Market analysis and forecasts
6. Best practices and case studies
7. Keyword analysis: primary, secondary, and long-tail opportunities
8. Competitor analysis: top players and content gaps
9. Content angle suggestions: 5 compelling angles for blog posts
Focus on factual, up-to-date information from credible sources.
Include specific data points, percentages, and recent developments.
Structure your response with clear sections for each analysis area.
"""
try:
result = await gemini.generate_grounded_content(
prompt=prompt,
content_type="research",
max_tokens=2000
)
print("=== GEMINI RESULT ===")
print(f"Type: {type(result)}")
print(f"Keys: {list(result.keys()) if isinstance(result, dict) else 'Not a dict'}")
if isinstance(result, dict):
print(f"Sources count: {len(result.get('sources', []))}")
print(f"Search queries count: {len(result.get('search_queries', []))}")
print(f"Has search widget: {bool(result.get('search_widget'))}")
print(f"Content length: {len(result.get('content', ''))}")
print("\n=== FIRST SOURCE ===")
sources = result.get('sources', [])
if sources:
print(f"Source: {sources[0]}")
print("\n=== SEARCH QUERIES (First 3) ===")
queries = result.get('search_queries', [])
for i, query in enumerate(queries[:3]):
print(f"{i+1}. {query}")
else:
print(f"Result is not a dict: {result}")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(test_gemini_direct())

View File

@@ -0,0 +1,58 @@
import requests
import json
# Test the research endpoint
url = "http://localhost:8000/api/blog/research"
payload = {
"keywords": ["AI content generation", "blog writing"],
"topic": "ALwrity content generation",
"industry": "Technology",
"target_audience": "content creators"
}
try:
response = requests.post(url, json=payload)
print(f"Status Code: {response.status_code}")
print(f"Response Headers: {dict(response.headers)}")
if response.status_code == 200:
data = response.json()
print("\n=== RESEARCH RESPONSE ===")
print(f"Success: {data.get('success')}")
print(f"Sources Count: {len(data.get('sources', []))}")
print(f"Search Queries Count: {len(data.get('search_queries', []))}")
print(f"Has Search Widget: {bool(data.get('search_widget'))}")
print(f"Suggested Angles Count: {len(data.get('suggested_angles', []))}")
print("\n=== SOURCES ===")
for i, source in enumerate(data.get('sources', [])[:3]):
print(f"Source {i+1}: {source.get('title', 'No title')}")
print(f" URL: {source.get('url', 'No URL')}")
print(f" Type: {source.get('type', 'Unknown')}")
print("\n=== SEARCH QUERIES (First 5) ===")
for i, query in enumerate(data.get('search_queries', [])[:5]):
print(f"{i+1}. {query}")
print("\n=== SUGGESTED ANGLES ===")
for i, angle in enumerate(data.get('suggested_angles', [])[:3]):
print(f"{i+1}. {angle}")
print("\n=== KEYWORD ANALYSIS ===")
kw_analysis = data.get('keyword_analysis', {})
print(f"Primary: {kw_analysis.get('primary', [])}")
print(f"Secondary: {kw_analysis.get('secondary', [])}")
print(f"Search Intent: {kw_analysis.get('search_intent', 'Unknown')}")
print("\n=== SEARCH WIDGET (First 200 chars) ===")
widget = data.get('search_widget', '')
if widget:
print(widget[:200] + "..." if len(widget) > 200 else widget)
else:
print("No search widget provided")
else:
print(f"Error: {response.text}")
except Exception as e:
print(f"Request failed: {e}")

View File

@@ -0,0 +1,115 @@
import requests
import json
from datetime import datetime
# Test the research endpoint and capture full response
url = "http://localhost:8000/api/blog/research"
payload = {
"keywords": ["AI content generation", "blog writing"],
"topic": "ALwrity content generation",
"industry": "Technology",
"target_audience": "content creators"
}
try:
print("Sending request to research endpoint...")
response = requests.post(url, json=payload, timeout=120)
print(f"Status Code: {response.status_code}")
if response.status_code == 200:
data = response.json()
# Create analysis file with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"research_analysis_{timestamp}.json"
# Save full response to file
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"\n=== RESEARCH RESPONSE ANALYSIS ===")
print(f"✅ Full response saved to: {filename}")
print(f"Success: {data.get('success')}")
print(f"Sources Count: {len(data.get('sources', []))}")
print(f"Search Queries Count: {len(data.get('search_queries', []))}")
print(f"Has Search Widget: {bool(data.get('search_widget'))}")
print(f"Suggested Angles Count: {len(data.get('suggested_angles', []))}")
print(f"\n=== SOURCES ANALYSIS ===")
sources = data.get('sources', [])
for i, source in enumerate(sources[:5]): # Show first 5
print(f"Source {i+1}: {source.get('title', 'No title')}")
print(f" URL: {source.get('url', 'No URL')[:100]}...")
print(f" Type: {source.get('type', 'Unknown')}")
print(f" Credibility: {source.get('credibility_score', 'N/A')}")
print(f"\n=== SEARCH QUERIES ANALYSIS ===")
queries = data.get('search_queries', [])
print(f"Total queries: {len(queries)}")
for i, query in enumerate(queries[:10]): # Show first 10
print(f"{i+1:2d}. {query}")
print(f"\n=== SEARCH WIDGET ANALYSIS ===")
widget = data.get('search_widget', '')
if widget:
print(f"Widget HTML length: {len(widget)} characters")
print(f"Contains Google branding: {'Google' in widget}")
print(f"Contains search chips: {'chip' in widget}")
print(f"Contains carousel: {'carousel' in widget}")
print(f"First 200 chars: {widget[:200]}...")
else:
print("No search widget provided")
print(f"\n=== KEYWORD ANALYSIS ===")
kw_analysis = data.get('keyword_analysis', {})
print(f"Primary keywords: {kw_analysis.get('primary', [])}")
print(f"Secondary keywords: {kw_analysis.get('secondary', [])}")
print(f"Long-tail keywords: {kw_analysis.get('long_tail', [])}")
print(f"Search intent: {kw_analysis.get('search_intent', 'Unknown')}")
print(f"Difficulty score: {kw_analysis.get('difficulty', 'N/A')}")
print(f"\n=== SUGGESTED ANGLES ===")
angles = data.get('suggested_angles', [])
for i, angle in enumerate(angles):
print(f"{i+1}. {angle}")
print(f"\n=== UI REPRESENTATION RECOMMENDATIONS ===")
print("Based on the response, here's what should be displayed in the Editor UI:")
print(f"1. Research Sources Panel: {len(sources)} real web sources")
print(f"2. Search Widget: Interactive Google search chips ({len(queries)} queries)")
print(f"3. Keyword Analysis: Primary/Secondary/Long-tail breakdown")
print(f"4. Content Angles: {len(angles)} suggested blog post angles")
print(f"5. Search Queries: {len(queries)} research queries for reference")
# Additional analysis for UI components
print(f"\n=== UI COMPONENT BREAKDOWN ===")
# Sources for UI
print("SOURCES FOR UI:")
for i, source in enumerate(sources[:3]):
print(f" - {source.get('title')} (Credibility: {source.get('credibility_score')})")
# Search widget for UI
print(f"\nSEARCH WIDGET FOR UI:")
print(f" - HTML length: {len(widget)} chars")
print(f" - Can be embedded directly in UI")
print(f" - Contains {len(queries)} search suggestions")
# Keywords for UI
print(f"\nKEYWORDS FOR UI:")
print(f" - Primary: {', '.join(kw_analysis.get('primary', []))}")
print(f" - Secondary: {', '.join(kw_analysis.get('secondary', []))}")
print(f" - Long-tail: {', '.join(kw_analysis.get('long_tail', []))}")
# Angles for UI
print(f"\nCONTENT ANGLES FOR UI:")
for i, angle in enumerate(angles[:3]):
print(f" - {angle}")
else:
print(f"Error: {response.text}")
except Exception as e:
print(f"Request failed: {e}")
import traceback
traceback.print_exc()

View File

@@ -0,0 +1,276 @@
"""
Test Script for Subscription System
Tests the core functionality of the usage-based subscription system.
"""
import sys
import os
from pathlib import Path
import asyncio
import json
# Add the backend directory to Python path
backend_dir = Path(__file__).parent
sys.path.insert(0, str(backend_dir))
from sqlalchemy.orm import sessionmaker
from loguru import logger
from services.database import engine
from services.pricing_service import PricingService
from services.usage_tracking_service import UsageTrackingService
from models.subscription_models import APIProvider, SubscriptionTier
async def test_pricing_service():
"""Test the pricing service functionality."""
logger.info("🧪 Testing Pricing Service...")
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
db = SessionLocal()
try:
pricing_service = PricingService(db)
# Test cost calculation
cost_data = pricing_service.calculate_api_cost(
provider=APIProvider.GEMINI,
model_name="gemini-2.5-flash",
tokens_input=1000,
tokens_output=500,
request_count=1
)
logger.info(f"✅ Cost calculation: {cost_data}")
# Test user limits
limits = pricing_service.get_user_limits("test_user")
logger.info(f"✅ User limits: {limits}")
# Test usage limit checking
can_proceed, message, usage_info = pricing_service.check_usage_limits(
user_id="test_user",
provider=APIProvider.GEMINI,
tokens_requested=100
)
logger.info(f"✅ Usage check: {can_proceed} - {message}")
logger.info(f" Usage info: {usage_info}")
return True
except Exception as e:
logger.error(f"❌ Pricing service test failed: {e}")
return False
finally:
db.close()
async def test_usage_tracking():
"""Test the usage tracking service."""
logger.info("🧪 Testing Usage Tracking Service...")
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
db = SessionLocal()
try:
usage_service = UsageTrackingService(db)
# Test tracking an API usage
result = await usage_service.track_api_usage(
user_id="test_user",
provider=APIProvider.GEMINI,
endpoint="/api/generate",
method="POST",
model_used="gemini-2.5-flash",
tokens_input=500,
tokens_output=300,
response_time=1.5,
status_code=200
)
logger.info(f"✅ Usage tracking result: {result}")
# Test getting usage stats
stats = usage_service.get_user_usage_stats("test_user")
logger.info(f"✅ Usage stats: {json.dumps(stats, indent=2)}")
# Test usage trends
trends = usage_service.get_usage_trends("test_user", 3)
logger.info(f"✅ Usage trends: {json.dumps(trends, indent=2)}")
return True
except Exception as e:
logger.error(f"❌ Usage tracking test failed: {e}")
return False
finally:
db.close()
async def test_limit_enforcement():
"""Test usage limit enforcement."""
logger.info("🧪 Testing Limit Enforcement...")
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
db = SessionLocal()
try:
usage_service = UsageTrackingService(db)
# Test multiple API calls to approach limits
for i in range(5):
result = await usage_service.track_api_usage(
user_id="test_user_limits",
provider=APIProvider.GEMINI,
endpoint="/api/generate",
method="POST",
model_used="gemini-2.5-flash",
tokens_input=1000,
tokens_output=800,
response_time=2.0,
status_code=200
)
logger.info(f"Call {i+1}: {result}")
# Check if limits are being enforced
can_proceed, message, usage_info = await usage_service.enforce_usage_limits(
user_id="test_user_limits",
provider=APIProvider.GEMINI,
tokens_requested=5000
)
logger.info(f"✅ Limit enforcement: {can_proceed} - {message}")
logger.info(f" Usage info: {usage_info}")
return True
except Exception as e:
logger.error(f"❌ Limit enforcement test failed: {e}")
return False
finally:
db.close()
def test_database_tables():
"""Test that all subscription tables exist."""
logger.info("🧪 Testing Database Tables...")
try:
from sqlalchemy import text
with engine.connect() as conn:
# Check for subscription tables
tables_query = text("""
SELECT name FROM sqlite_master
WHERE type='table' AND (
name LIKE '%subscription%' OR
name LIKE '%usage%' OR
name LIKE '%pricing%' OR
name LIKE '%billing%'
)
ORDER BY name
""")
result = conn.execute(tables_query)
tables = result.fetchall()
expected_tables = [
'api_provider_pricing',
'api_usage_logs',
'billing_history',
'subscription_plans',
'usage_alerts',
'usage_summaries',
'user_subscriptions'
]
found_tables = [t[0] for t in tables]
logger.info(f"Found tables: {found_tables}")
missing_tables = [t for t in expected_tables if t not in found_tables]
if missing_tables:
logger.error(f"❌ Missing tables: {missing_tables}")
return False
# Check table data
for table in ['subscription_plans', 'api_provider_pricing']:
count_query = text(f"SELECT COUNT(*) FROM {table}")
result = conn.execute(count_query)
count = result.fetchone()[0]
logger.info(f"{table}: {count} records")
return True
except Exception as e:
logger.error(f"❌ Database tables test failed: {e}")
return False
async def run_comprehensive_test():
"""Run comprehensive test suite."""
logger.info("🚀 Starting Subscription System Comprehensive Test")
logger.info("="*60)
test_results = {}
# Test 1: Database Tables
logger.info("\n1. Testing Database Tables...")
test_results['database_tables'] = test_database_tables()
# Test 2: Pricing Service
logger.info("\n2. Testing Pricing Service...")
test_results['pricing_service'] = await test_pricing_service()
# Test 3: Usage Tracking
logger.info("\n3. Testing Usage Tracking...")
test_results['usage_tracking'] = await test_usage_tracking()
# Test 4: Limit Enforcement
logger.info("\n4. Testing Limit Enforcement...")
test_results['limit_enforcement'] = await test_limit_enforcement()
# Summary
logger.info("\n" + "="*60)
logger.info("TEST RESULTS SUMMARY")
logger.info("="*60)
passed = sum(1 for result in test_results.values() if result)
total = len(test_results)
for test_name, result in test_results.items():
status = "✅ PASS" if result else "❌ FAIL"
logger.info(f"{test_name.upper().replace('_', ' ')}: {status}")
logger.info(f"\nOverall: {passed}/{total} tests passed")
if passed == total:
logger.info("🎉 All tests passed! Subscription system is ready.")
logger.info("\n" + "="*60)
logger.info("NEXT STEPS:")
logger.info("="*60)
logger.info("1. Start the FastAPI server:")
logger.info(" cd backend && python start_alwrity_backend.py")
logger.info("\n2. Test the API endpoints:")
logger.info(" GET http://localhost:8000/api/subscription/plans")
logger.info(" GET http://localhost:8000/api/subscription/pricing")
logger.info(" GET http://localhost:8000/api/subscription/usage/test_user")
logger.info("\n3. Integrate with your frontend dashboard")
logger.info("4. Set up user authentication/identification")
logger.info("5. Configure payment processing (Stripe, etc.)")
logger.info("="*60)
return True
else:
logger.error("❌ Some tests failed. Please check the errors above.")
return False
if __name__ == "__main__":
# Run the comprehensive test
success = asyncio.run(run_comprehensive_test())
if not success:
sys.exit(1)
logger.info("✅ Test completed successfully!")

View File

@@ -0,0 +1,91 @@
#!/usr/bin/env python3
"""
Database validation script for billing system
"""
import sqlite3
from datetime import datetime
def validate_database():
conn = sqlite3.connect('alwrity.db')
cursor = conn.cursor()
print('=== BILLING DATABASE VALIDATION ===')
print(f'Validation timestamp: {datetime.now()}')
print()
# Check subscription-related tables
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND (
name LIKE '%subscription%' OR
name LIKE '%usage%' OR
name LIKE '%billing%' OR
name LIKE '%pricing%' OR
name LIKE '%alert%'
)
ORDER BY name
""")
tables = cursor.fetchall()
print('=== SUBSCRIPTION TABLES ===')
for table in tables:
table_name = table[0]
print(f'\nTable: {table_name}')
# Get table schema
cursor.execute(f'PRAGMA table_info({table_name})')
columns = cursor.fetchall()
print(' Schema:')
for col in columns:
col_id, name, type_name, not_null, default, pk = col
constraints = []
if pk:
constraints.append('PRIMARY KEY')
if not_null:
constraints.append('NOT NULL')
if default:
constraints.append(f'DEFAULT {default}')
constraint_str = f' ({", ".join(constraints)})' if constraints else ''
print(f' {name}: {type_name}{constraint_str}')
# Get row count
cursor.execute(f'SELECT COUNT(*) FROM {table_name}')
count = cursor.fetchone()[0]
print(f' Row count: {count}')
# Sample data for non-empty tables
if count > 0 and count <= 10:
cursor.execute(f'SELECT * FROM {table_name} LIMIT 3')
rows = cursor.fetchall()
print(' Sample data:')
for i, row in enumerate(rows):
print(f' Row {i+1}: {row}')
# Check for user-specific data
print('\n=== USER DATA VALIDATION ===')
# Check if we have user-specific usage data
cursor.execute("SELECT DISTINCT user_id FROM usage_summary LIMIT 5")
users = cursor.fetchall()
print(f'Users with usage data: {[u[0] for u in users]}')
# Check user subscriptions
cursor.execute("SELECT DISTINCT user_id FROM user_subscriptions LIMIT 5")
user_subs = cursor.fetchall()
print(f'Users with subscriptions: {[u[0] for u in user_subs]}')
# Check API usage logs
cursor.execute("SELECT COUNT(*) FROM api_usage_logs")
api_logs_count = cursor.fetchone()[0]
print(f'Total API usage logs: {api_logs_count}')
if api_logs_count > 0:
cursor.execute("SELECT DISTINCT user_id FROM api_usage_logs LIMIT 5")
api_users = cursor.fetchall()
print(f'Users with API usage logs: {[u[0] for u in api_users]}')
conn.close()
print('\n=== VALIDATION COMPLETE ===')
if __name__ == '__main__':
validate_database()

View File

@@ -0,0 +1,255 @@
"""
Simple validation script for LinkedIn content generation structure.
This script validates the code structure without requiring external dependencies.
"""
import os
import sys
import ast
import traceback
from pathlib import Path
def validate_file_syntax(file_path: str) -> bool:
"""Validate Python file syntax."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
ast.parse(content)
print(f"{file_path}: Syntax valid")
return True
except SyntaxError as e:
print(f"{file_path}: Syntax error - {e}")
return False
except Exception as e:
print(f"{file_path}: Error - {e}")
return False
def validate_import_structure(file_path: str) -> bool:
"""Validate import structure without actually importing."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
tree = ast.parse(content)
imports = []
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
imports.append(alias.name)
elif isinstance(node, ast.ImportFrom):
module = node.module or ""
for alias in node.names:
imports.append(f"{module}.{alias.name}")
print(f"{file_path}: Found {len(imports)} imports")
return True
except Exception as e:
print(f"{file_path}: Import validation error - {e}")
return False
def check_class_structure(file_path: str, expected_classes: list) -> bool:
"""Check if expected classes are defined."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
tree = ast.parse(content)
found_classes = []
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
found_classes.append(node.name)
missing_classes = set(expected_classes) - set(found_classes)
if missing_classes:
print(f"⚠️ {file_path}: Missing classes: {missing_classes}")
else:
print(f"{file_path}: All expected classes found")
print(f" Found classes: {found_classes}")
return len(missing_classes) == 0
except Exception as e:
print(f"{file_path}: Class validation error - {e}")
return False
def check_function_structure(file_path: str, expected_functions: list) -> bool:
"""Check if expected functions are defined."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
tree = ast.parse(content)
found_functions = []
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
found_functions.append(node.name)
elif isinstance(node, ast.AsyncFunctionDef):
found_functions.append(node.name)
missing_functions = set(expected_functions) - set(found_functions)
if missing_functions:
print(f"⚠️ {file_path}: Missing functions: {missing_functions}")
else:
print(f"{file_path}: All expected functions found")
return len(missing_functions) == 0
except Exception as e:
print(f"{file_path}: Function validation error - {e}")
return False
def validate_linkedin_models():
"""Validate LinkedIn models file."""
print("\n🔍 Validating LinkedIn Models")
print("-" * 40)
file_path = "models/linkedin_models.py"
if not os.path.exists(file_path):
print(f"{file_path}: File does not exist")
return False
# Check syntax
syntax_ok = validate_file_syntax(file_path)
# Check imports
imports_ok = validate_import_structure(file_path)
# Check expected classes
expected_classes = [
"LinkedInPostRequest", "LinkedInArticleRequest", "LinkedInCarouselRequest",
"LinkedInVideoScriptRequest", "LinkedInCommentResponseRequest",
"LinkedInPostResponse", "LinkedInArticleResponse", "LinkedInCarouselResponse",
"LinkedInVideoScriptResponse", "LinkedInCommentResponseResult",
"PostContent", "ArticleContent", "CarouselContent", "VideoScript"
]
classes_ok = check_class_structure(file_path, expected_classes)
return syntax_ok and imports_ok and classes_ok
def validate_linkedin_service():
"""Validate LinkedIn service file."""
print("\n🔍 Validating LinkedIn Service")
print("-" * 40)
file_path = "services/linkedin_service.py"
if not os.path.exists(file_path):
print(f"{file_path}: File does not exist")
return False
# Check syntax
syntax_ok = validate_file_syntax(file_path)
# Check imports
imports_ok = validate_import_structure(file_path)
# Check expected classes
expected_classes = ["LinkedInContentService"]
classes_ok = check_class_structure(file_path, expected_classes)
# Check expected methods
expected_functions = [
"generate_post", "generate_article", "generate_carousel",
"generate_video_script", "generate_comment_response"
]
functions_ok = check_function_structure(file_path, expected_functions)
return syntax_ok and imports_ok and classes_ok and functions_ok
def validate_linkedin_router():
"""Validate LinkedIn router file."""
print("\n🔍 Validating LinkedIn Router")
print("-" * 40)
file_path = "routers/linkedin.py"
if not os.path.exists(file_path):
print(f"{file_path}: File does not exist")
return False
# Check syntax
syntax_ok = validate_file_syntax(file_path)
# Check imports
imports_ok = validate_import_structure(file_path)
# Check expected functions (endpoints)
expected_functions = [
"health_check", "generate_post", "generate_article",
"generate_carousel", "generate_video_script", "generate_comment_response",
"get_content_types", "get_usage_stats"
]
functions_ok = check_function_structure(file_path, expected_functions)
return syntax_ok and imports_ok and functions_ok
def check_file_exists(file_path: str) -> bool:
"""Check if file exists."""
exists = os.path.exists(file_path)
status = "" if exists else ""
print(f"{status} {file_path}: {'Exists' if exists else 'Missing'}")
return exists
def validate_file_structure():
"""Validate the overall file structure."""
print("\n🔍 Validating File Structure")
print("-" * 40)
required_files = [
"models/linkedin_models.py",
"services/linkedin_service.py",
"routers/linkedin.py",
"test_linkedin_endpoints.py"
]
all_exist = True
for file_path in required_files:
if not check_file_exists(file_path):
all_exist = False
return all_exist
def main():
"""Run all validations."""
print("🚀 LinkedIn Content Generation Structure Validation")
print("=" * 60)
results = {}
# Validate file structure
results["file_structure"] = validate_file_structure()
# Validate individual components
results["models"] = validate_linkedin_models()
results["service"] = validate_linkedin_service()
results["router"] = validate_linkedin_router()
# Summary
print("\n📊 Validation Results")
print("=" * 40)
passed = sum(results.values())
total = len(results)
for component, result in results.items():
status = "✅ PASSED" if result else "❌ FAILED"
print(f"{component}: {status}")
print(f"\nOverall: {passed}/{total} validations passed ({(passed/total)*100:.1f}%)")
if passed == total:
print("\n🎉 All structure validations passed!")
print("The LinkedIn content generation migration is structurally complete.")
print("\nNext steps:")
print("1. Install required dependencies (fastapi, pydantic, etc.)")
print("2. Configure API keys (GEMINI_API_KEY)")
print("3. Start the FastAPI server")
print("4. Test the endpoints")
else:
print(f"\n⚠️ {total - passed} validation(s) failed. Please review the implementation.")
return passed == total
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)

View File

@@ -0,0 +1,280 @@
"""
Comprehensive verification script for billing and subscription system setup.
Checks that all files are created, tables exist, and the system is properly integrated.
"""
import os
import sys
from pathlib import Path
def check_file_exists(file_path, description):
"""Check if a file exists and report status."""
if os.path.exists(file_path):
print(f"{description}: {file_path}")
return True
else:
print(f"{description}: {file_path} - NOT FOUND")
return False
def check_file_content(file_path, search_terms, description):
"""Check if file contains expected content."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
missing_terms = []
for term in search_terms:
if term not in content:
missing_terms.append(term)
if not missing_terms:
print(f"{description}: All expected content found")
return True
else:
print(f"{description}: Missing content - {missing_terms}")
return False
except Exception as e:
print(f"{description}: Error reading file - {e}")
return False
def check_database_tables():
"""Check if billing database tables exist."""
print("\n🗄️ Checking Database Tables:")
print("-" * 30)
try:
# Add backend to path
backend_dir = Path(__file__).parent
sys.path.insert(0, str(backend_dir))
from services.database import get_db_session, DATABASE_URL
from sqlalchemy import text
session = get_db_session()
if not session:
print("❌ Could not get database session")
return False
# Check for billing tables
tables_query = text("""
SELECT name FROM sqlite_master
WHERE type='table' AND (
name LIKE '%subscription%' OR
name LIKE '%usage%' OR
name LIKE '%billing%' OR
name LIKE '%pricing%' OR
name LIKE '%alert%'
)
ORDER BY name
""")
result = session.execute(tables_query)
tables = result.fetchall()
expected_tables = [
'api_provider_pricing',
'api_usage_logs',
'subscription_plans',
'usage_alerts',
'usage_summaries',
'user_subscriptions'
]
found_tables = [t[0] for t in tables]
print(f"Found tables: {found_tables}")
missing_tables = [t for t in expected_tables if t not in found_tables]
if missing_tables:
print(f"❌ Missing tables: {missing_tables}")
return False
# Check table data
for table in ['subscription_plans', 'api_provider_pricing']:
count_query = text(f"SELECT COUNT(*) FROM {table}")
result = session.execute(count_query)
count = result.fetchone()[0]
print(f"{table}: {count} records")
session.close()
return True
except Exception as e:
print(f"❌ Database check failed: {e}")
return False
def main():
"""Main verification function."""
print("🔍 ALwrity Billing & Subscription System Setup Verification")
print("=" * 70)
backend_dir = Path(__file__).parent
# Files to check
files_to_check = [
(backend_dir / "models" / "subscription_models.py", "Subscription Models"),
(backend_dir / "services" / "pricing_service.py", "Pricing Service"),
(backend_dir / "services" / "usage_tracking_service.py", "Usage Tracking Service"),
(backend_dir / "services" / "subscription_exception_handler.py", "Exception Handler"),
(backend_dir / "api" / "subscription_api.py", "Subscription API"),
(backend_dir / "scripts" / "create_billing_tables.py", "Billing Migration Script"),
(backend_dir / "scripts" / "create_subscription_tables.py", "Subscription Migration Script"),
(backend_dir / "start_alwrity_backend.py", "Backend Startup Script"),
]
# Check file existence
print("\n📁 Checking File Existence:")
print("-" * 30)
files_exist = 0
for file_path, description in files_to_check:
if check_file_exists(file_path, description):
files_exist += 1
# Check content of key files
print("\n📝 Checking File Content:")
print("-" * 30)
content_checks = [
(
backend_dir / "models" / "subscription_models.py",
["SubscriptionPlan", "APIUsageLog", "UsageSummary", "APIProviderPricing"],
"Subscription Models Content"
),
(
backend_dir / "services" / "pricing_service.py",
["calculate_api_cost", "check_usage_limits", "initialize_default_pricing"],
"Pricing Service Content"
),
(
backend_dir / "services" / "usage_tracking_service.py",
["track_api_usage", "get_user_usage_stats", "enforce_usage_limits"],
"Usage Tracking Content"
),
(
backend_dir / "api" / "subscription_api.py",
["get_user_usage", "get_subscription_plans", "get_dashboard_data"],
"API Endpoints Content"
),
(
backend_dir / "start_alwrity_backend.py",
["setup_billing_tables", "verify_billing_tables"],
"Backend Startup Integration"
)
]
content_valid = 0
for file_path, search_terms, description in content_checks:
if os.path.exists(file_path):
if check_file_content(file_path, search_terms, description):
content_valid += 1
else:
print(f"{description}: File not found")
# Check database tables
database_ok = check_database_tables()
# Check middleware integration
print("\n🔧 Checking Middleware Integration:")
print("-" * 30)
middleware_file = backend_dir / "middleware" / "monitoring_middleware.py"
middleware_terms = [
"UsageTrackingService",
"detect_api_provider",
"track_api_usage",
"check_usage_limits_middleware"
]
middleware_ok = check_file_content(
middleware_file,
middleware_terms,
"Middleware Integration"
)
# Check app.py integration
print("\n🚀 Checking FastAPI Integration:")
print("-" * 30)
app_file = backend_dir / "app.py"
app_terms = [
"from api.subscription_api import router as subscription_router",
"app.include_router(subscription_router)"
]
app_ok = check_file_content(
app_file,
app_terms,
"FastAPI App Integration"
)
# Check database service integration
print("\n💾 Checking Database Integration:")
print("-" * 30)
db_file = backend_dir / "services" / "database.py"
db_terms = [
"from models.subscription_models import Base as SubscriptionBase",
"SubscriptionBase.metadata.create_all(bind=engine)"
]
db_ok = check_file_content(
db_file,
db_terms,
"Database Service Integration"
)
# Summary
print("\n" + "=" * 70)
print("📊 VERIFICATION SUMMARY")
print("=" * 70)
total_files = len(files_to_check)
total_content = len(content_checks)
print(f"Files Created: {files_exist}/{total_files}")
print(f"Content Valid: {content_valid}/{total_content}")
print(f"Database Tables: {'' if database_ok else ''}")
print(f"Middleware Integration: {'' if middleware_ok else ''}")
print(f"FastAPI Integration: {'' if app_ok else ''}")
print(f"Database Integration: {'' if db_ok else ''}")
# Overall status
all_checks = [
files_exist == total_files,
content_valid == total_content,
database_ok,
middleware_ok,
app_ok,
db_ok
]
if all(all_checks):
print("\n🎉 ALL CHECKS PASSED!")
print("✅ Billing and subscription system setup is complete and ready to use.")
print("\n" + "=" * 70)
print("🚀 NEXT STEPS:")
print("=" * 70)
print("1. Start the backend server:")
print(" python start_alwrity_backend.py")
print("\n2. Test the API endpoints:")
print(" GET http://localhost:8000/api/subscription/plans")
print(" GET http://localhost:8000/api/subscription/usage/demo")
print(" GET http://localhost:8000/api/subscription/dashboard/demo")
print(" GET http://localhost:8000/api/subscription/pricing")
print("\n3. Access the frontend billing dashboard")
print("4. Monitor usage through the API monitoring middleware")
print("5. Set up user identification for production use")
print("=" * 70)
else:
print("\n❌ SOME CHECKS FAILED!")
print("Please review the errors above and fix any issues.")
return False
return True
if __name__ == "__main__":
success = main()
if not success:
sys.exit(1)

View File

@@ -0,0 +1,205 @@
"""
Simple verification script for subscription system setup.
Checks that all files are created and properly structured.
"""
import os
import sys
from pathlib import Path
def check_file_exists(file_path, description):
"""Check if a file exists and report status."""
if os.path.exists(file_path):
print(f"{description}: {file_path}")
return True
else:
print(f"{description}: {file_path} - NOT FOUND")
return False
def check_file_content(file_path, search_terms, description):
"""Check if file contains expected content."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
missing_terms = []
for term in search_terms:
if term not in content:
missing_terms.append(term)
if not missing_terms:
print(f"{description}: All expected content found")
return True
else:
print(f"{description}: Missing content - {missing_terms}")
return False
except Exception as e:
print(f"{description}: Error reading file - {e}")
return False
def main():
"""Main verification function."""
print("🔍 ALwrity Subscription System Setup Verification")
print("=" * 60)
backend_dir = Path(__file__).parent
# Files to check
files_to_check = [
(backend_dir / "models" / "subscription_models.py", "Subscription Models"),
(backend_dir / "services" / "pricing_service.py", "Pricing Service"),
(backend_dir / "services" / "usage_tracking_service.py", "Usage Tracking Service"),
(backend_dir / "services" / "subscription_exception_handler.py", "Exception Handler"),
(backend_dir / "api" / "subscription_api.py", "Subscription API"),
(backend_dir / "scripts" / "create_subscription_tables.py", "Migration Script"),
(backend_dir / "test_subscription_system.py", "Test Script"),
(backend_dir / "SUBSCRIPTION_SYSTEM_README.md", "Documentation")
]
# Check file existence
print("\n📁 Checking File Existence:")
print("-" * 30)
files_exist = 0
for file_path, description in files_to_check:
if check_file_exists(file_path, description):
files_exist += 1
# Check content of key files
print("\n📝 Checking File Content:")
print("-" * 30)
content_checks = [
(
backend_dir / "models" / "subscription_models.py",
["SubscriptionPlan", "APIUsageLog", "UsageSummary", "APIProvider"],
"Subscription Models Content"
),
(
backend_dir / "services" / "pricing_service.py",
["calculate_api_cost", "check_usage_limits", "APIProvider.GEMINI"],
"Pricing Service Content"
),
(
backend_dir / "services" / "usage_tracking_service.py",
["track_api_usage", "get_user_usage_stats", "enforce_usage_limits"],
"Usage Tracking Content"
),
(
backend_dir / "api" / "subscription_api.py",
["get_user_usage", "get_subscription_plans", "get_dashboard_data"],
"API Endpoints Content"
)
]
content_valid = 0
for file_path, search_terms, description in content_checks:
if os.path.exists(file_path):
if check_file_content(file_path, search_terms, description):
content_valid += 1
else:
print(f"{description}: File not found")
# Check middleware integration
print("\n🔧 Checking Middleware Integration:")
print("-" * 30)
middleware_file = backend_dir / "middleware" / "monitoring_middleware.py"
middleware_terms = [
"UsageTrackingService",
"detect_api_provider",
"track_api_usage",
"check_usage_limits_middleware"
]
middleware_ok = check_file_content(
middleware_file,
middleware_terms,
"Middleware Integration"
)
# Check app.py integration
print("\n🚀 Checking FastAPI Integration:")
print("-" * 30)
app_file = backend_dir / "app.py"
app_terms = [
"from api.subscription_api import router as subscription_router",
"app.include_router(subscription_router)"
]
app_ok = check_file_content(
app_file,
app_terms,
"FastAPI App Integration"
)
# Check database service integration
print("\n💾 Checking Database Integration:")
print("-" * 30)
db_file = backend_dir / "services" / "database.py"
db_terms = [
"from models.subscription_models import Base as SubscriptionBase",
"SubscriptionBase.metadata.create_all(bind=engine)"
]
db_ok = check_file_content(
db_file,
db_terms,
"Database Service Integration"
)
# Summary
print("\n" + "=" * 60)
print("📊 VERIFICATION SUMMARY")
print("=" * 60)
total_files = len(files_to_check)
total_content = len(content_checks)
print(f"Files Created: {files_exist}/{total_files}")
print(f"Content Valid: {content_valid}/{total_content}")
print(f"Middleware Integration: {'' if middleware_ok else ''}")
print(f"FastAPI Integration: {'' if app_ok else ''}")
print(f"Database Integration: {'' if db_ok else ''}")
# Overall status
all_checks = [
files_exist == total_files,
content_valid == total_content,
middleware_ok,
app_ok,
db_ok
]
if all(all_checks):
print("\n🎉 ALL CHECKS PASSED!")
print("✅ Subscription system setup is complete and ready to use.")
print("\n" + "=" * 60)
print("🚀 NEXT STEPS:")
print("=" * 60)
print("1. Install dependencies (if not already done):")
print(" pip install sqlalchemy loguru fastapi")
print("\n2. Run the migration script:")
print(" python scripts/create_subscription_tables.py")
print("\n3. Test the system:")
print(" python test_subscription_system.py")
print("\n4. Start the server:")
print(" python start_alwrity_backend.py")
print("\n5. Test API endpoints:")
print(" GET http://localhost:8000/api/subscription/plans")
print(" GET http://localhost:8000/api/subscription/pricing")
else:
print("\n❌ SOME CHECKS FAILED!")
print("Please review the errors above and fix any issues.")
return False
return True
if __name__ == "__main__":
success = main()
if not success:
sys.exit(1)