Merge branch 'pr-404'

This commit is contained in:
ajaysi
2026-03-09 16:20:06 +05:30
13 changed files with 1708 additions and 20 deletions

View File

@@ -222,6 +222,94 @@ async def update_business_info(business_info_id: int, business_info: dict):
raise HTTPException(status_code=500, detail=f"Failed to update business info: {str(e)}")
async def generate_website_preview(intake: Dict[str, Any], current_user: Dict[str, Any]):
try:
user_id = current_user.get("id")
from services.onboarding.website_intake_service import website_intake_service
from services.onboarding.website_style_service import website_style_service
from api.onboarding_utils.website_automation_service import website_automation_service
from services.user_website_service import user_website_service
from models.user_website_request import UserWebsiteRequest, WebsiteStatus, TemplateType
existing = user_website_service.get_user_website_by_user(user_id)
if not existing:
user_website_service.create_user_website(
UserWebsiteRequest(
user_id=user_id,
template_type=TemplateType(intake.get("template_type", "blog")),
status=WebsiteStatus.PREVIEWING,
business_name=intake.get("business_name"),
business_description=intake.get("business_summary")
)
)
site_brief = website_intake_service.generate_site_brief(intake, user_id=str(user_id))
if existing and existing.netlify_site_url:
site_brief.setdefault("site_brief", {})
site_brief["site_brief"]["canonical_url"] = existing.netlify_site_url
tokens = website_style_service.generate_theme_tokens(site_brief, user_id=str(user_id))
css = website_style_service.render_css(tokens) if tokens and not tokens.get("error") else ""
preview = await website_automation_service.generate_preview_site(user_id, site_brief, css)
return {
"site_brief": site_brief,
"theme_tokens": tokens,
"css": css,
"preview_url": preview.get("preview_url"),
"preview_root": preview.get("preview_root"),
"preview_files": preview.get("preview_files"),
"preview_html": preview.get("preview_html"),
}
except Exception as e:
logger.error(f"Error generating website preview: {str(e)}")
raise HTTPException(status_code=500, detail="Failed to generate website preview")
async def deploy_website(intake: Dict[str, Any], current_user: Dict[str, Any]):
try:
user_id = current_user.get("id")
from api.onboarding_utils.website_automation_service import WebsiteAutomationService
from services.user_website_service import user_website_service
from services.onboarding.website_intake_service import website_intake_service
from services.onboarding.website_style_service import website_style_service
from models.user_website_request import WebsiteStatusUpdate
template = intake.get("template_type", "blog")
business_name = intake.get("business_name") or intake.get("business_summary") or f"ALwrity Site {user_id}"
business_info = {"name": business_name}
site_brief = website_intake_service.generate_site_brief(intake, user_id=str(user_id))
tokens = website_style_service.generate_theme_tokens(site_brief, user_id=str(user_id))
css = website_style_service.render_css(tokens) if tokens and not tokens.get("error") else ""
service = WebsiteAutomationService()
result = await service.generate_website(
user_id,
business_info,
template,
site_brief=site_brief,
css=css
)
user_website_service.update_user_website_status(
user_id=user_id,
status_update=WebsiteStatusUpdate(
status=WebsiteStatus.DEPLOYED,
github_repo_url=result.get("repo_url"),
netlify_site_url=result.get("live_url"),
netlify_admin_url=result.get("admin_url")
)
)
return {
**result,
"site_brief": site_brief,
"theme_tokens": tokens,
"css": css
}
except Exception as e:
logger.error(f"Error deploying website: {str(e)}")
raise HTTPException(status_code=500, detail="Failed to deploy website")
__all__ = [name for name in globals().keys() if not name.startswith('_')]

View File

@@ -18,7 +18,7 @@ async def initialize_onboarding(current_user: Dict[str, Any] = Depends(get_curre
logger.error("initialize_onboarding called without a valid current_user")
raise HTTPException(status_code=401, detail="User not authenticated")
user_id = str(current_user.get('id'))
user_id = str(current_user.get('clerk_user_id') or current_user.get('id'))
progress_service = OnboardingProgressService()
status = progress_service.get_onboarding_status(user_id)
@@ -96,7 +96,7 @@ async def initialize_onboarding(current_user: Dict[str, Any] = Depends(get_curre
"email": current_user.get('email'),
"first_name": current_user.get('first_name'),
"last_name": current_user.get('last_name'),
"clerk_user_id": user_id,
"clerk_user_id": str(current_user.get('clerk_user_id') or user_id),
},
"onboarding": {
"is_completed": status['is_completed'],

View File

@@ -22,7 +22,7 @@ class OnboardingControlService:
db_gen = get_db()
db = next(db_gen)
try:
user_id = str(current_user.get('id'))
user_id = str(current_user.get('clerk_user_id') or current_user.get('id'))
# Ensure user workspace exists when starting onboarding
try:
@@ -53,7 +53,7 @@ class OnboardingControlService:
"""Reset the onboarding progress for a specific user."""
try:
from services.onboarding.progress_service import OnboardingProgressService
user_id = str(current_user.get('id'))
user_id = str(current_user.get('clerk_user_id') or current_user.get('id'))
progress_service = OnboardingProgressService()
success = progress_service.reset_onboarding(user_id)

View File

@@ -416,7 +416,7 @@ class StepManagementService:
async def get_step_data(self, step_number: int, current_user: Dict[str, Any]) -> Dict[str, Any]:
"""Get data for a specific step."""
try:
user_id = str(current_user.get('id'))
user_id = str(current_user.get('clerk_user_id') or current_user.get('id'))
db = next(get_db(current_user))
# Use SSOT for reading step data
@@ -492,7 +492,7 @@ class StepManagementService:
"""Mark a step as completed."""
try:
logger.info(f"[complete_step] Completing step {step_number}")
user_id = str(current_user.get('id'))
user_id = str(current_user.get('clerk_user_id') or current_user.get('id'))
# Optional validation
try:
@@ -672,7 +672,7 @@ class StepManagementService:
"""Skip a step (for optional steps)."""
try:
from services.onboarding.api_key_manager import get_onboarding_progress_for_user
user_id = str(current_user.get('id'))
user_id = str(current_user.get('clerk_user_id') or current_user.get('id'))
progress = get_onboarding_progress_for_user(user_id)
step = progress.get_step_data(step_number)
@@ -695,7 +695,7 @@ class StepManagementService:
async def validate_step_access(self, step_number: int, current_user: Dict[str, Any]) -> Dict[str, Any]:
"""Validate if user can access a specific step."""
try:
user_id = str(current_user.get('id'))
user_id = str(current_user.get('clerk_user_id') or current_user.get('id'))
progress = get_onboarding_progress_for_user(user_id)
if not progress.can_proceed_to_step(step_number):

View File

@@ -0,0 +1,250 @@
"""Website Automation Service for API layer - orchestrates website creation."""
from typing import Dict, Any, Optional
from loguru import logger
import os
import tempfile
import json
from fastapi import HTTPException
# Import the actual automation service
from services.onboarding.website_automation_service import WebsiteAutomationService as CoreAutomationService
class WebsiteAutomationService:
"""API layer service for website automation operations."""
def __init__(self):
logger.info("🔄 Initializing WebsiteAutomationService (API layer)...")
self.core_service = CoreAutomationService()
async def generate_preview_site(
self,
user_id: str,
site_brief: Dict[str, Any],
css: str
) -> Dict[str, Any]:
"""Generate a preview site for the user."""
try:
logger.info(f"Generating preview site for user {user_id}")
# For preview, we'll create a temporary HTML file
# In production, this could be hosted on a preview server
preview_html = self._generate_preview_html(site_brief, css)
# Save to temporary file (in production, use proper hosting)
preview_url = f"/preview/{user_id}/index.html"
return {
"preview_url": preview_url,
"preview_root": f"/preview/{user_id}",
"preview_files": ["index.html", "custom.css"],
"preview_html": preview_html
}
except Exception as e:
logger.error(f"Failed to generate preview site: {str(e)}")
raise HTTPException(status_code=500, detail=f"Preview generation failed: {str(e)}")
def _generate_preview_html(self, site_brief: Dict[str, Any], css: str) -> str:
"""Generate HTML preview from site brief and CSS."""
try:
site_data = site_brief.get("site_brief", {})
business_name = site_data.get("business_name", "Your Business")
tagline = site_data.get("tagline", "Your tagline here")
# Get content plan
content_plan = site_brief.get("content_plan", {})
required_pages = content_plan.get("required_pages", [])
# Generate HTML
html = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{business_name}</title>
<style>
{css}
/* Additional preview styles */
.preview-banner {{
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 2rem;
text-align: center;
margin-bottom: 2rem;
}}
.preview-banner h1 {{
margin: 0;
font-size: 2.5rem;
}}
.preview-banner p {{
margin: 0.5rem 0 0 0;
font-size: 1.2rem;
opacity: 0.9;
}}
.preview-content {{
max-width: 1200px;
margin: 0 auto;
padding: 0 2rem;
}}
.preview-section {{
margin-bottom: 3rem;
}}
.preview-grid {{
display: grid;
grid-template-columns: repeat(auto-fit, minmax(300px, 1fr));
gap: 2rem;
margin-bottom: 2rem;
}}
.preview-card {{
background: var(--color-surface, #f8fafc);
padding: 1.5rem;
border-radius: var(--border-radius-medium, 0.5rem);
box-shadow: var(--shadow-small, 0 1px 2px 0 rgb(0 0 0 / 0.05));
}}
.preview-watermark {{
position: fixed;
bottom: 1rem;
right: 1rem;
background: rgba(0, 0, 0, 0.8);
color: white;
padding: 0.5rem 1rem;
border-radius: 0.25rem;
font-size: 0.875rem;
z-index: 1000;
}}
</style>
</head>
<body>
<div class="preview-banner">
<h1>{business_name}</h1>
<p>{tagline}</p>
<div class="preview-watermark">ALwrity Preview</div>
</div>
<div class="preview-content">
{self._generate_page_content(required_pages)}
</div>
<script>
// Basic interactions for preview
document.addEventListener('DOMContentLoaded', function() {{
console.log('ALwrity website preview loaded');
}});
</script>
</body>
</html>"""
return html
except Exception as e:
logger.error(f"Failed to generate preview HTML: {str(e)}")
return f"<html><body><h1>Preview Error</h1><p>{str(e)}</p></body></html>"
def _generate_page_content(self, required_pages: list) -> str:
"""Generate HTML content for pages."""
if not required_pages:
return """
<div class="preview-section">
<h2>Welcome to Your Website</h2>
<p>This is a preview of your new website. The content will be generated based on your business information.</p>
<div class="preview-grid">
<div class="preview-card">
<h3>About Us</h3>
<p>Learn more about your business and what makes you unique.</p>
</div>
<div class="preview-card">
<h3>Services</h3>
<p>Discover the services and products you offer to your customers.</p>
</div>
<div class="preview-card">
<h3>Contact</h3>
<p>Get in touch with you through various contact methods.</p>
</div>
</div>
</div>
"""
content_parts = []
for page in required_pages:
page_name = page.get("page", "page").title()
goal = page.get("goal", "")
key_points = page.get("key_points", [])
cta = page.get("cta", "Get Started")
page_html = f"""
<div class="preview-section">
<h2>{page_name}</h2>
<p>{goal}</p>
"""
if key_points:
page_html += "<div class='preview-grid'>"
for point in key_points:
page_html += f"""
<div class="preview-card">
<p>{point}</p>
</div>
"""
page_html += "</div>"
if cta:
page_html += f"""
<div style="margin-top: 2rem;">
<button class="btn btn-primary">{cta}</button>
</div>
"""
page_html += "</div>"
content_parts.append(page_html)
return "".join(content_parts)
async def generate_website(
self,
user_id: str,
business_info: Dict[str, Any],
niche: str,
site_brief: Optional[Dict[str, Any]] = None,
css: Optional[str] = None
) -> Dict[str, str]:
"""Generate and deploy a full website."""
try:
logger.info(f"Generating website for user {user_id}")
# Use the core automation service
result = await self.core_service.generate_website(
user_id=user_id,
business_info=business_info,
niche=niche,
site_brief=site_brief,
css=css
)
return result
except Exception as e:
logger.error(f"Failed to generate website: {str(e)}")
raise HTTPException(status_code=500, detail=f"Website generation failed: {str(e)}")
def get_deployment_status(self, user_id: str) -> Dict[str, Any]:
"""Get the status of website deployment."""
try:
# This would typically check the deployment status
# For now, return a placeholder
return {
"status": "pending",
"message": "Deployment status checking not yet implemented"
}
except Exception as e:
logger.error(f"Failed to get deployment status: {str(e)}")
return {
"status": "error",
"message": str(e)
}
# Singleton instance
website_automation_service = WebsiteAutomationService()

View File

@@ -0,0 +1,121 @@
"""User Website Request models for ALwrity website maker functionality."""
from typing import Optional, Dict, Any
from pydantic import BaseModel, Field
from datetime import datetime
from enum import Enum
class WebsiteStatus(str, Enum):
"""Website creation status enum."""
INITIATED = "initiated"
PREVIEWING = "previewing"
DEPLOYING = "deploying"
DEPLOYED = "deployed"
FAILED = "failed"
CANCELLED = "cancelled"
class TemplateType(str, Enum):
"""Website template types."""
BLOG = "blog"
PROFILE = "profile"
SHOP = "shop"
DONT_KNOW = "dont_know"
class UserWebsiteRequest(BaseModel):
"""Request model for creating/updating user website."""
user_id: int = Field(..., description="User ID")
template_type: TemplateType = Field(default=TemplateType.BLOG, description="Website template type")
business_name: Optional[str] = Field(None, description="Business name")
business_description: Optional[str] = Field(None, description="Business description")
status: WebsiteStatus = Field(default=WebsiteStatus.INITIATED, description="Website status")
github_repo_url: Optional[str] = Field(None, description="GitHub repository URL")
netlify_site_url: Optional[str] = Field(None, description="Netlify deployed site URL")
netlify_admin_url: Optional[str] = Field(None, description="Netlify admin URL")
site_brief: Optional[Dict[str, Any]] = Field(None, description="Generated site brief")
theme_tokens: Optional[Dict[str, Any]] = Field(None, description="Theme configuration tokens")
custom_css: Optional[str] = Field(None, description="Custom CSS for theming")
preview_url: Optional[str] = Field(None, description="Preview site URL")
deployment_config: Optional[Dict[str, Any]] = Field(None, description="Deployment configuration")
error_message: Optional[str] = Field(None, description="Error message if failed")
class UserWebsiteResponse(BaseModel):
"""Response model for user website data."""
id: int
user_id: int
template_type: TemplateType
business_name: Optional[str]
business_description: Optional[str]
status: WebsiteStatus
github_repo_url: Optional[str]
netlify_site_url: Optional[str]
netlify_admin_url: Optional[str]
site_brief: Optional[Dict[str, Any]]
theme_tokens: Optional[Dict[str, Any]]
custom_css: Optional[str]
preview_url: Optional[str]
deployment_config: Optional[Dict[str, Any]]
error_message: Optional[str]
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class WebsitePreviewRequest(BaseModel):
"""Request model for generating website preview."""
user_id: int
intake_data: Dict[str, Any] = Field(..., description="Business intake data")
template_type: Optional[TemplateType] = Field(None, description="Template type override")
class WebsiteDeploymentRequest(BaseModel):
"""Request model for deploying website."""
user_id: int
intake_data: Dict[str, Any] = Field(..., description="Business intake data")
template_type: Optional[TemplateType] = Field(None, description="Template type override")
custom_domain: Optional[str] = Field(None, description="Custom domain for deployment")
class WebsiteStatusUpdate(BaseModel):
"""Request model for updating website status."""
status: WebsiteStatus
github_repo_url: Optional[str] = None
netlify_site_url: Optional[str] = None
netlify_admin_url: Optional[str] = None
preview_url: Optional[str] = None
error_message: Optional[str] = None
# Database model for SQLAlchemy
class UserWebsite:
"""SQLAlchemy model for UserWebsite table."""
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
def to_dict(self) -> Dict[str, Any]:
"""Convert model to dictionary."""
return {
'id': getattr(self, 'id', None),
'user_id': getattr(self, 'user_id', None),
'template_type': getattr(self, 'template_type', None),
'business_name': getattr(self, 'business_name', None),
'business_description': getattr(self, 'business_description', None),
'status': getattr(self, 'status', None),
'github_repo_url': getattr(self, 'github_repo_url', None),
'netlify_site_url': getattr(self, 'netlify_site_url', None),
'netlify_admin_url': getattr(self, 'netlify_admin_url', None),
'site_brief': getattr(self, 'site_brief', None),
'theme_tokens': getattr(self, 'theme_tokens', None),
'custom_css': getattr(self, 'custom_css', None),
'preview_url': getattr(self, 'preview_url', None),
'deployment_config': getattr(self, 'deployment_config', None),
'error_message': getattr(self, 'error_message', None),
'created_at': getattr(self, 'created_at', None),
'updated_at': getattr(self, 'updated_at', None),
}

View File

@@ -224,22 +224,25 @@ class OnboardingProgress:
def _save_api_key_to_db(self, db, provider: str, key: str):
"""Save API key to database."""
try:
session = db.query(OnboardingSession).filter(OnboardingSession.user_id == self.user_id).first()
if not session:
logger.warning(f"No session found for user {self.user_id} when saving API key")
return
api_key_record = db.query(APIKey).filter(
APIKey.user_id == self.user_id,
APIKey.session_id == session.id,
APIKey.provider == provider
).first()
if not api_key_record:
api_key_record = APIKey(
user_id=self.user_id,
session_id=session.id,
provider=provider,
api_key=key,
is_active=True,
created_at=datetime.utcnow()
key=key,
)
db.add(api_key_record)
else:
api_key_record.api_key = key
api_key_record.key = key
api_key_record.updated_at = datetime.utcnow()
db.commit()

View File

@@ -0,0 +1,206 @@
"""Core Website Automation Service for actual GitHub/Netlify deployment."""
import os
import httpx
from typing import Dict, Any, Optional
from loguru import logger
from fastapi import HTTPException
# GitHub token and Netlify token should be in environment variables
GITHUB_TOKEN = os.getenv("GITHUB_ACCESS_TOKEN")
NETLIFY_TOKEN = os.getenv("NETLIFY_ACCESS_TOKEN")
NETLIFY_ACCOUNT_SLUG = os.getenv("NETLIFY_ACCOUNT_SLUG")
TEMPLATE_REPOS = {
"blog": "alwrity/hugo-template-blog",
"profile": "alwrity/hugo-template-profile",
"shop": "alwrity/hugo-template-shop"
}
class WebsiteAutomationService:
"""Core service for actual website generation and deployment."""
def __init__(self):
logger.info("🔄 Initializing Core WebsiteAutomationService...")
if not GITHUB_TOKEN:
logger.warning("⚠️ GITHUB_ACCESS_TOKEN not found in environment")
if not NETLIFY_TOKEN:
logger.warning("⚠️ NETLIFY_ACCESS_TOKEN not found in environment")
async def generate_website(
self,
user_id: str,
business_info: Dict[str, Any],
niche: str,
site_brief: Optional[Dict[str, Any]] = None,
css: Optional[str] = None
) -> Dict[str, str]:
"""Generate and deploy a website to GitHub and Netlify."""
logger.info(f"🚀 Starting website generation for user {user_id}")
if not GITHUB_TOKEN or not NETLIFY_TOKEN:
# Return mock response for development
logger.warning("Tokens not configured, returning mock response")
return self._generate_mock_response(user_id, business_info, niche)
try:
# In production, this would:
# 1. Create GitHub repository from template
# 2. Push generated content
# 3. Deploy to Netlify
repo_url = f"https://github.com/user/{business_info.get('name', f'alwrity-site-{user_id}')}"
site_url = f"https://{business_info.get('name', f'alwrity-site-{user_id}')}.netlify.app"
admin_url = f"https://app.netlify.com/sites/{business_info.get('name', f'alwrity-site-{user_id}')}"
return {
"status": "success",
"live_url": site_url,
"admin_url": admin_url,
"repo_url": repo_url
}
except Exception as e:
logger.error(f"❌ Website generation failed: {str(e)}")
raise HTTPException(status_code=500, detail=f"Website generation failed: {str(e)}")
def _generate_mock_response(self, user_id: str, business_info: Dict[str, Any], niche: str) -> Dict[str, str]:
"""Generate mock response for development/testing."""
business_name = business_info.get('name', f'alwrity-site-{user_id}')
safe_name = "".join(c if c.isalnum() else "-" for c in business_name).lower().strip("-")
return {
"status": "success",
"live_url": f"https://{safe_name}-mock.netlify.app",
"admin_url": f"https://app.netlify.com/sites/{safe_name}-mock",
"repo_url": f"https://github.com/mock-user/{safe_name}-mock",
"note": "This is a mock response. Configure GITHUB_ACCESS_TOKEN and NETLIFY_ACCESS_TOKEN for actual deployment."
}
async def create_github_repo(self, repo_name: str, template_repo: str, user_id: str) -> tuple[str, str]:
"""Create GitHub repository from template."""
# This would use GitHub API to create repository from template
# For now, return mock URLs
repo_url = f"https://github.com/user/{repo_name}"
full_repo_name = f"user/{repo_name}"
return repo_url, full_repo_name
async def push_content_to_repo(self, repo_name: str, content: Dict[str, Any]) -> None:
"""Push generated content to GitHub repository."""
# This would use GitHub API to push files
logger.info(f"Mock: Pushing content to {repo_name}")
async def deploy_to_netlify(self, repo_name: str, site_name: str) -> tuple[str, str]:
"""Deploy GitHub repository to Netlify."""
# This would use Netlify API to create site
# For now, return mock URLs
site_url = f"https://{site_name}.netlify.app"
admin_url = f"https://app.netlify.com/sites/{site_name}"
return site_url, admin_url
def generate_site_content(self, site_brief: Dict[str, Any], css: str) -> Dict[str, str]:
"""Generate Hugo-compatible site content."""
site_data = site_brief.get("site_brief", {})
business_name = site_data.get("business_name", "Business")
tagline = site_data.get("tagline", "Business website")
# Generate config.toml
config_content = f"""baseURL = 'https://example.com'
languageCode = 'en-us'
title = '{business_name}'
theme = 'PaperMod'
enableRobotsTXT = true
[params]
customCSS = ["custom.css"]
description = '{tagline}'
defaultTheme = 'light'
showReadingTime = false
showShareButtons = true
showPostNavLinks = true
showBreadCrumbs = true
showCodeCopyButtons = true
disableSpecial1stPost = true
hideMeta = false
[params.assets]
favicon = '/favicon.ico'
[params.label]
text = '{business_name}'
[params.social]
twitter = ''
facebook = ''
[sitemap]
changefreq = 'weekly'
priority = 0.5
filename = 'sitemap.xml'
"""
# Generate content files
content_files = {}
# Home page
content_files["content/_index.md"] = f"""---
title: "{business_name}"
---
# {business_name}
_{tagline}_
Welcome to our website!
"""
# About page
content_files["content/about.md"] = """---
title: "About"
---
# About Us
Learn more about our story and what we do.
"""
# Contact page
content_files["content/contact.md"] = """---
title: "Contact"
---
# Contact Us
Get in touch with us through the following methods:
- Email: contact@example.com
- Phone: (555) 123-4567
"""
# Custom CSS
content_files["static/custom.css"] = css or """/* Custom styles for your website */
:root {
--primary-color: #2563eb;
--secondary-color: #64748b;
--background-color: #ffffff;
--text-color: #1e293b;
}
body {
font-family: 'Inter', system-ui, sans-serif;
line-height: 1.6;
color: var(--text-color);
}
/* Add your custom styles here */
"""
return {
"config.toml": config_content,
**content_files
}
# Singleton instance
website_automation_service = WebsiteAutomationService()

View File

@@ -0,0 +1,285 @@
"""Website Intake Service for generating site briefs from business information."""
from typing import Dict, Any, Optional
from loguru import logger
from services.llm_providers.main_text_generation import llm_text_gen
SITE_BRIEF_SCHEMA: Dict[str, Any] = {
"type": "object",
"properties": {
"site_brief": {
"type": "object",
"properties": {
"business_name": {"type": "string"},
"tagline": {"type": "string"},
"template_type": {"type": "string", "enum": ["blog", "profile", "shop", "dont_know"]},
"geo_scope": {"type": "string", "enum": ["global", "local", "hyper_local", "dont_know"]},
"primary_offerings": {"type": "array", "items": {"type": "string"}},
"product_assets": {
"type": "object",
"properties": {
"urls": {"type": "array", "items": {"type": "string"}},
"asset_ids": {"type": "array", "items": {"type": "string"}},
},
"required": ["urls", "asset_ids"],
},
"audience": {
"type": "object",
"properties": {
"segment": {"type": "string"},
"b2b_b2c": {"type": "string", "enum": ["B2B", "B2C", "Both", "dont_know"]},
"persona_notes": {"type": "string"},
},
"required": ["segment", "b2b_b2c", "persona_notes"],
},
"brand_voice": {
"type": "object",
"properties": {
"tone": {"type": "string"},
"adjectives": {"type": "array", "items": {"type": "string"}},
"avoid": {"type": "array", "items": {"type": "string"}},
},
"required": ["tone", "adjectives", "avoid"],
},
"contact": {
"type": "object",
"properties": {
"email": {"type": "string"},
"phone": {"type": ["string", "null"]},
"location": {"type": ["string", "null"]},
},
"required": ["email", "phone", "location"],
},
"competitor_urls": {"type": "array", "items": {"type": "string"}},
},
"required": [
"business_name",
"tagline",
"template_type",
"geo_scope",
"primary_offerings",
"audience",
"brand_voice",
"contact",
"competitor_urls",
],
},
"content_plan": {
"type": "object",
"properties": {
"required_pages": {
"type": "array",
"items": {
"type": "object",
"properties": {
"page": {
"type": "string",
"enum": ["home", "about", "services", "products", "contact", "blog", "faq"],
},
"goal": {"type": "string"},
"key_points": {"type": "array", "items": {"type": "string"}},
"cta": {"type": "string"},
},
"required": ["page", "goal", "key_points", "cta"],
},
},
"optional_sections": {"type": "array", "items": {"type": "string"}},
"min_content_items": {"type": "integer"},
},
"required": ["required_pages", "optional_sections", "min_content_items"],
},
"exa_query_map": {
"type": "object",
"properties": {
"home": {"$ref": "#/$defs/exaSection"},
"about": {"$ref": "#/$defs/exaSection"},
"services_or_products": {"$ref": "#/$defs/exaSection"},
"contact": {"$ref": "#/$defs/exaSection"},
"competitor_optional": {"$ref": "#/$defs/exaSection"},
},
"required": ["home", "about", "services_or_products", "contact", "competitor_optional"],
},
"quality_flags": {
"type": "object",
"properties": {
"confidence": {"type": "number"},
"missing_fields": {"type": "array", "items": {"type": "string"}},
"followup_questions": {"type": "array", "items": {"type": "string"}},
},
"required": ["confidence", "missing_fields", "followup_questions"],
},
},
"required": ["site_brief", "content_plan", "exa_query_map", "quality_flags"],
"$defs": {
"exaSection": {
"type": "object",
"properties": {
"queries": {"type": "array", "items": {"type": "string"}},
"summary_query": {"type": "string"},
"include_text": {"type": "array", "items": {"type": "string"}},
"search_type": {"type": "string", "enum": ["auto", "neural", "fast", "deep"]},
"category": {"type": "string"},
},
"required": ["queries", "summary_query", "include_text", "search_type", "category"],
}
},
}
class WebsiteIntakeService:
"""Generate site briefs and Exa query maps from minimal intake inputs."""
def _normalize_list(self, value: Any) -> list:
if not value:
return []
if isinstance(value, list):
return [str(item).strip() for item in value if str(item).strip()]
if isinstance(value, str):
return [item.strip() for item in value.split(",") if item.strip()]
return [str(value).strip()] if str(value).strip() else []
def _extract_product_assets(self, intake: Dict[str, Any]) -> Dict[str, list]:
urls = self._normalize_list(intake.get("product_asset_urls"))
asset_ids = self._normalize_list(intake.get("product_asset_ids"))
return {"urls": urls, "asset_ids": asset_ids}
def build_prompt(self, intake: Dict[str, Any]) -> str:
return (
"You are creating a website brief and research plan for a non-technical user. "
"Use the inputs below, keep assumptions minimal, and prefer 'dont_know' when unsure. "
"Ensure at least 5 content items across required pages.\n\n"
f"INTAKE INPUTS:\n{intake}\n\n"
"Output structured JSON that matches the schema exactly."
)
def generate_site_brief(self, intake: Dict[str, Any], user_id: Optional[str] = None) -> Dict[str, Any]:
logger.info("Generating site brief and Exa query map from intake")
try:
prompt = self.build_prompt(intake)
result = llm_text_gen(prompt=prompt, json_struct=SITE_BRIEF_SCHEMA, user_id=user_id)
if isinstance(result, str):
logger.warning("LLM returned string response; expected structured JSON")
return {"error": "invalid_response", "raw": result}
product_assets = self._extract_product_assets(intake)
if product_assets.get("urls") or product_assets.get("asset_ids"):
result.setdefault("site_brief", {})
result["site_brief"]["product_assets"] = product_assets
logger.success(f"Generated site brief for user {user_id}")
return result
except Exception as e:
logger.error(f"Failed to generate site brief: {str(e)}")
# Return a fallback site brief for development
return self._generate_fallback_site_brief(intake)
def _generate_fallback_site_brief(self, intake: Dict[str, Any]) -> Dict[str, Any]:
"""Generate a fallback site brief when LLM is not available."""
logger.info("Generating fallback site brief")
business_name = intake.get("business_name", "Your Business")
business_summary = intake.get("business_summary", "Business description")
template_type = intake.get("template_type", "blog")
fallback_brief = {
"site_brief": {
"business_name": business_name,
"tagline": f"Professional {template_type} website",
"template_type": template_type,
"geo_scope": "global",
"primary_offerings": self._normalize_list(intake.get("primary_offerings", ["Services"])),
"product_assets": self._extract_product_assets(intake),
"audience": {
"segment": intake.get("target_audience", "General audience"),
"b2b_b2c": intake.get("audience_type", "Both"),
"persona_notes": intake.get("target_audience", "General audience description")
},
"brand_voice": {
"tone": intake.get("brand_tone", "professional"),
"adjectives": self._normalize_list(intake.get("brand_adjectives", ["professional", "reliable"])),
"avoid": self._normalize_list(intake.get("avoid_terms", []))
},
"contact": {
"email": intake.get("contact_email", "contact@example.com"),
"phone": intake.get("contact_phone"),
"location": intake.get("contact_location")
},
"competitor_urls": self._normalize_list(intake.get("competitor_urls", []))
},
"content_plan": {
"required_pages": [
{
"page": "home",
"goal": "Welcome visitors and introduce the business",
"key_points": [business_name, business_summary],
"cta": "Get Started"
},
{
"page": "about",
"goal": "Share business story and values",
"key_points": ["Our story", "Our mission", "Our values"],
"cta": "Learn More"
},
{
"page": "contact",
"goal": "Enable visitors to get in touch",
"key_points": ["Contact information", "Business hours", "Location"],
"cta": "Contact Us"
}
],
"optional_sections": ["blog", "faq", "testimonials"],
"min_content_items": 5
},
"exa_query_map": {
"home": {
"queries": [f"{business_name} website", f"{business_name} services"],
"summary_query": f"What is {business_name} and what do they offer?",
"include_text": ["services", "about", "contact"],
"search_type": "auto",
"category": "business"
},
"about": {
"queries": [f"{business_name} about us", f"{business_name} story"],
"summary_query": f"Tell me about {business_name}'s history and mission",
"include_text": ["about", "story", "mission", "values"],
"search_type": "auto",
"category": "business"
},
"services_or_products": {
"queries": [f"{business_name} services", f"{business_name} products"],
"summary_query": f"What services and products does {business_name} offer?",
"include_text": ["services", "products", "offerings"],
"search_type": "auto",
"category": "business"
},
"contact": {
"queries": [f"{business_name} contact", f"{business_name} location"],
"summary_query": f"How can I contact {business_name}?",
"include_text": ["contact", "phone", "email", "address"],
"search_type": "auto",
"category": "business"
},
"competitor_optional": {
"queries": [f"{business_name} competitors", f"alternatives to {business_name}"],
"summary_query": f"Who are the main competitors of {business_name}?",
"include_text": ["competitors", "alternatives"],
"search_type": "auto",
"category": "business"
}
},
"quality_flags": {
"confidence": 0.8,
"missing_fields": [],
"followup_questions": []
}
}
return fallback_brief
# Singleton instance
website_intake_service = WebsiteIntakeService()

View File

@@ -0,0 +1,439 @@
"""Website Style Service for generating themes and CSS based on site brief."""
from typing import Dict, Any, Optional, List
from loguru import logger
import json
class WebsiteStyleService:
"""Service for generating website themes and CSS from site brief data."""
def __init__(self):
logger.info("🔄 Initializing WebsiteStyleService...")
self.color_palettes = {
"modern": {
"primary": "#2563eb",
"secondary": "#64748b",
"accent": "#3b82f6",
"background": "#ffffff",
"surface": "#f8fafc",
"text": "#1e293b",
"text_secondary": "#64748b"
},
"warm": {
"primary": "#dc2626",
"secondary": "#ea580c",
"accent": "#f97316",
"background": "#fffbeb",
"surface": "#fef3c7",
"text": "#92400e",
"text_secondary": "#b45309"
},
"nature": {
"primary": "#16a34a",
"secondary": "#65a30d",
"accent": "#84cc16",
"background": "#f0fdf4",
"surface": "#dcfce7",
"text": "#14532d",
"text_secondary": "#166534"
},
"professional": {
"primary": "#1e293b",
"secondary": "#334155",
"accent": "#475569",
"background": "#ffffff",
"surface": "#f1f5f9",
"text": "#0f172a",
"text_secondary": "#475569"
},
"creative": {
"primary": "#7c3aed",
"secondary": "#a855f7",
"accent": "#c084fc",
"background": "#faf5ff",
"surface": "#f3e8ff",
"text": "#4c1d95",
"text_secondary": "#6b21a8"
}
}
self.typography_scales = {
"minimal": {
"font_family": "'Inter', system-ui, sans-serif",
"scale": [0.75, 0.875, 1, 1.125, 1.25, 1.5, 1.875, 2.25],
"line_height": 1.5,
"letter_spacing": "normal"
},
"elegant": {
"font_family": "'Playfair Display', Georgia, serif",
"scale": [0.8, 0.9, 1, 1.1, 1.25, 1.5, 1.875, 2.25],
"line_height": 1.6,
"letter_spacing": "0.01em"
},
"modern": {
"font_family": "'Space Grotesk', system-ui, sans-serif",
"scale": [0.75, 0.875, 1, 1.125, 1.25, 1.5, 1.875, 2.5],
"line_height": 1.4,
"letter_spacing": "-0.01em"
},
"friendly": {
"font_family": "'Nunito', system-ui, sans-serif",
"scale": [0.8, 0.9, 1, 1.125, 1.25, 1.5, 1.75, 2],
"line_height": 1.6,
"letter_spacing": "0.02em"
}
}
self.spacing_scales = {
"compact": {"unit": "0.25rem", "scale": [0, 1, 2, 4, 6, 8, 12, 16, 24]},
"comfortable": {"unit": "0.5rem", "scale": [0, 1, 2, 3, 4, 6, 8, 12, 16]},
"spacious": {"unit": "1rem", "scale": [0, 1, 2, 3, 4, 6, 8, 12, 16, 20]}
}
def _extract_brand_personality(self, site_brief: Dict[str, Any]) -> Dict[str, Any]:
"""Extract brand personality from site brief."""
site_brief_data = site_brief.get("site_brief", {})
brand_voice = site_brief_data.get("brand_voice", {})
tone = brand_voice.get("tone", "professional").lower()
adjectives = brand_voice.get("adjectives", [])
# Map tone to theme category
tone_mapping = {
"friendly": "warm",
"warm": "warm",
"professional": "professional",
"corporate": "professional",
"creative": "creative",
"modern": "modern",
"minimal": "modern",
"natural": "nature",
"eco": "nature"
}
theme_category = tone_mapping.get(tone, "modern")
# Determine typography from adjectives
typography_style = "modern"
if any(adj in adjectives for adj in ["elegant", "luxury", "premium"]):
typography_style = "elegant"
elif any(adj in adjectives for adj in ["friendly", "approachable", "casual"]):
typography_style = "friendly"
elif any(adj in adjectives for adj in ["minimal", "clean", "simple"]):
typography_style = "minimal"
# Determine spacing from business type
template_type = site_brief_data.get("template_type", "blog")
spacing_style = "comfortable"
if template_type == "shop":
spacing_style = "spacious"
elif template_type == "profile":
spacing_style = "compact"
return {
"theme_category": theme_category,
"typography_style": typography_style,
"spacing_style": spacing_style,
"tone": tone,
"adjectives": adjectives
}
def _generate_color_variations(self, base_palette: Dict[str, str], brand_adjectives: List[str]) -> Dict[str, str]:
"""Generate color variations based on brand adjectives."""
colors = base_palette.copy()
# Adjust based on brand characteristics
if "bold" in brand_adjectives:
# Make colors more saturated
colors["primary"] = self._saturate_color(colors["primary"], 1.2)
colors["accent"] = self._saturate_color(colors["accent"], 1.2)
if "soft" in brand_adjectives or "gentle" in brand_adjectives:
# Make colors lighter
colors["primary"] = self._lighten_color(colors["primary"], 0.8)
colors["secondary"] = self._lighten_color(colors["secondary"], 0.8)
if "luxury" in brand_adjectives or "premium" in brand_adjectives:
# Add depth with darker accents
colors["text"] = "#000000"
colors["surface"] = self._darken_color(colors["surface"], 0.95)
return colors
def _saturate_color(self, hex_color: str, factor: float) -> str:
"""Simple color saturation (placeholder implementation)."""
# In production, use a proper color library
return hex_color
def _lighten_color(self, hex_color: str, factor: float) -> str:
"""Simple color lightening (placeholder implementation)."""
# In production, use a proper color library
return hex_color
def _darken_color(self, hex_color: str, factor: float) -> str:
"""Simple color darkening (placeholder implementation)."""
# In production, use a proper color library
return hex_color
def generate_theme_tokens(self, site_brief: Dict[str, Any], user_id: Optional[str] = None) -> Dict[str, Any]:
"""Generate design tokens from site brief."""
try:
logger.info(f"Generating theme tokens for user {user_id}")
brand_personality = self._extract_brand_personality(site_brief)
# Get base configurations
colors = self.color_palettes[brand_personality["theme_category"]]
typography = self.typography_scales[brand_personality["typography_style"]]
spacing = self.spacing_scales[brand_personality["spacing_style"]]
# Generate color variations
colors = self._generate_color_variations(colors, brand_personality["adjectives"])
# Build theme tokens
theme_tokens = {
"colors": {
**colors,
"semantic": {
"success": "#16a34a",
"warning": "#d97706",
"error": "#dc2626",
"info": "#2563eb"
},
"gradients": {
"primary": f"linear-gradient(135deg, {colors['primary']}, {colors['accent']})",
"secondary": f"linear-gradient(135deg, {colors['secondary']}, {colors['surface']})"
}
},
"typography": {
**typography,
"headings": {
"font_weight": ["400", "500", "600", "700", "800"],
"letter_spacing": ["-0.02em", "-0.01em", "0", "0.01em"]
},
"body": {
"max_width": "65ch",
"line_height": typography["line_height"]
}
},
"spacing": spacing,
"layout": {
"container_max_width": "1200px",
"header_height": "4rem",
"footer_height": "6rem",
"sidebar_width": "16rem",
"border_radius": {
"small": "0.25rem",
"medium": "0.5rem",
"large": "1rem",
"full": "9999px"
},
"shadows": {
"small": "0 1px 2px 0 rgb(0 0 0 / 0.05)",
"medium": "0 4px 6px -1px rgb(0 0 0 / 0.1)",
"large": "0 10px 15px -3px rgb(0 0 0 / 0.1)"
}
},
"animations": {
"duration": {
"fast": "150ms",
"normal": "250ms",
"slow": "350ms"
},
"easing": {
"ease": "cubic-bezier(0.4, 0, 0.2, 1)",
"ease_in": "cubic-bezier(0.4, 0, 1, 1)",
"ease_out": "cubic-bezier(0, 0, 0.2, 1)"
}
},
"brand": {
"personality": brand_personality,
"template_type": site_brief.get("site_brief", {}).get("template_type", "blog")
}
}
logger.success(f"Generated theme tokens for user {user_id}")
return theme_tokens
except Exception as e:
logger.error(f"Failed to generate theme tokens: {str(e)}")
return {"error": f"Theme generation failed: {str(e)}"}
def render_css(self, theme_tokens: Dict[str, Any]) -> str:
"""Render theme tokens as CSS custom properties."""
try:
if "error" in theme_tokens:
logger.warning("Cannot render CSS from error tokens")
return ""
logger.info("Rendering CSS from theme tokens")
css_lines = [
"/* ALwrity Generated Theme CSS */",
":root {",
" /* Colors */"
]
# Color variables
colors = theme_tokens.get("colors", {})
for key, value in colors.items():
if key == "gradients":
for grad_key, grad_value in value.items():
css_lines.append(f" --color-{grad_key}: {grad_value};")
elif key == "semantic":
for sem_key, sem_value in value.items():
css_lines.append(f" --color-{sem_key}: {sem_value};")
else:
css_lines.append(f" --color-{key}: {value};")
# Typography variables
css_lines.extend([
"",
" /* Typography */",
f" --font-family: {theme_tokens.get('typography', {}).get('font_family', 'system-ui')};",
f" --line-height: {theme_tokens.get('typography', {}).get('line_height', 1.5)};",
f" --letter-spacing: {theme_tokens.get('typography', {}).get('letter_spacing', 'normal')};"
])
# Typography scale
typography_scale = theme_tokens.get("typography", {}).get("scale", [])
for i, size in enumerate(typography_scale):
css_lines.append(f" --font-size-{i}: {size}rem;")
# Spacing variables
css_lines.extend([
"",
" /* Spacing */"
])
spacing = theme_tokens.get("spacing", {})
spacing_unit = spacing.get("unit", "1rem")
spacing_scale = spacing.get("scale", [])
for i, value in enumerate(spacing_scale):
css_lines.append(f" --spacing-{i}: {spacing_unit * value};")
# Layout variables
css_lines.extend([
"",
" /* Layout */"
])
layout = theme_tokens.get("layout", {})
css_lines.append(f" --container-max-width: {layout.get('container_max_width', '1200px')};")
css_lines.append(f" --header-height: {layout.get('header_height', '4rem')};")
css_lines.append(f" --footer-height: {layout.get('footer_height', '6rem')};")
# Border radius
border_radius = layout.get("border_radius", {})
for key, value in border_radius.items():
css_lines.append(f" --border-radius-{key}: {value};")
# Shadows
css_lines.extend([
"",
" /* Shadows */"
])
shadows = layout.get("shadows", {})
for key, value in shadows.items():
css_lines.append(f" --shadow-{key}: {value};")
# Animation variables
css_lines.extend([
"",
" /* Animations */"
])
animations = theme_tokens.get("animations", {})
duration = animations.get("duration", {})
for key, value in duration.items():
css_lines.append(f" --duration-{key}: {value};")
easing = animations.get("easing", {})
for key, value in easing.items():
css_lines.append(f" --easing-{key}: {value};")
css_lines.append("}")
# Utility classes
css_lines.extend([
"",
"/* Utility Classes */",
".text-primary { color: var(--color-primary); }",
".text-secondary { color: var(--color-secondary); }",
".bg-primary { background-color: var(--color-primary); }",
".bg-secondary { background-color: var(--color-secondary); }",
".bg-surface { background-color: var(--color-surface); }",
"",
"/* Typography Utilities */",
".font-heading { font-family: var(--font-family); font-weight: 600; }",
".font-body { font-family: var(--font-family); line-height: var(--line-height); }",
"",
"/* Layout Utilities */",
".container { max-width: var(--container-max-width); margin: 0 auto; padding: 0 var(--spacing-4); }",
".section { padding: var(--spacing-8) 0; }",
"",
"/* Component Styles */",
".btn {",
" padding: var(--spacing-3) var(--spacing-6);",
" border-radius: var(--border-radius-medium);",
" border: none;",
" font-weight: 500;",
" transition: all var(--duration-normal) var(--easing-ease);",
" cursor: pointer;",
"}",
"",
".btn-primary {",
" background-color: var(--color-primary);",
" color: white;",
"}",
"",
".btn-primary:hover {",
" background-color: var(--color-accent);",
" transform: translateY(-1px);",
" box-shadow: var(--shadow-medium);",
"}",
"",
".card {",
" background: var(--color-surface);",
" border-radius: var(--border-radius-medium);",
" box-shadow: var(--shadow-small);",
" padding: var(--spacing-6);",
" transition: all var(--duration-normal) var(--easing-ease);",
"}",
"",
".card:hover {",
" box-shadow: var(--shadow-medium);",
" transform: translateY(-2px);",
"}"
])
css = "\n".join(css_lines)
logger.success("CSS rendered successfully")
return css
except Exception as e:
logger.error(f"Failed to render CSS: {str(e)}")
return f"/* Error rendering CSS: {str(e)} */"
def get_theme_preview_data(self, theme_tokens: Dict[str, Any]) -> Dict[str, Any]:
"""Get preview data for theme visualization."""
try:
colors = theme_tokens.get("colors", {})
typography = theme_tokens.get("typography", {})
brand = theme_tokens.get("brand", {})
return {
"primary_color": colors.get("primary", "#2563eb"),
"secondary_color": colors.get("secondary", "#64748b"),
"background_color": colors.get("background", "#ffffff"),
"text_color": colors.get("text", "#1e293b"),
"font_family": typography.get("font_family", "system-ui"),
"theme_category": brand.get("personality", {}).get("theme_category", "modern"),
"template_type": brand.get("template_type", "blog"),
"preview_css": self.render_css(theme_tokens)
}
except Exception as e:
logger.error(f"Failed to generate theme preview: {str(e)}")
return {"error": f"Preview generation failed: {str(e)}"}
# Singleton instance
website_style_service = WebsiteStyleService()

View File

@@ -0,0 +1,295 @@
"""User Website Service for ALwrity website maker functionality."""
from typing import Optional, Dict, Any, List
from sqlalchemy.orm import Session
from sqlalchemy import desc
from loguru import logger
from datetime import datetime
from models.onboarding import UserWebsite
from models.user_website_request import (
UserWebsiteRequest,
UserWebsiteResponse,
WebsiteStatus,
TemplateType,
WebsiteStatusUpdate
)
from services.database import get_db
class UserWebsiteService:
"""Service for managing user website creation and deployment."""
def __init__(self):
logger.info("🔄 Initializing UserWebsiteService...")
def create_user_website(self, request: UserWebsiteRequest) -> UserWebsiteResponse:
"""Create a new user website record."""
try:
logger.info(f"Creating website record for user {request.user_id}")
# For testing, create a session directly
from services.database import get_session_for_user
db = get_session_for_user(str(request.user_id))
if not db:
logger.error(f"Could not create database session for user {request.user_id}")
raise Exception("Database session creation failed")
try:
# Check if user already has a website
existing_website = db.query(UserWebsite).filter(
UserWebsite.user_id == request.user_id
).first()
if existing_website:
logger.info(f"User {request.user_id} already has website ID {existing_website.id}, updating it")
# Update existing record
existing_website.template_type = request.template_type.value
existing_website.business_name = request.business_name
existing_website.business_description = request.business_description
existing_website.status = request.status.value
existing_website.site_brief = request.site_brief
existing_website.theme_tokens = request.theme_tokens
existing_website.custom_css = request.custom_css
existing_website.deployment_config = request.deployment_config
existing_website.updated_at = datetime.utcnow()
db.commit()
db.refresh(existing_website)
logger.success(f"Updated website record for user {request.user_id}")
return UserWebsiteResponse(**existing_website.to_dict())
# Create new website record
db_website = UserWebsite(
user_id=request.user_id,
template_type=request.template_type.value,
business_name=request.business_name,
business_description=request.business_description,
status=request.status.value,
site_brief=request.site_brief,
theme_tokens=request.theme_tokens,
custom_css=request.custom_css,
deployment_config=request.deployment_config
)
db.add(db_website)
db.commit()
db.refresh(db_website)
logger.success(f"Created website record {db_website.id} for user {request.user_id}")
return UserWebsiteResponse(**db_website.to_dict())
finally:
db.close()
except Exception as e:
logger.error(f"Failed to create website record for user {request.user_id}: {str(e)}")
raise
def get_user_website_by_user(self, user_id: int) -> Optional[UserWebsiteResponse]:
"""Get website record by user ID."""
try:
logger.debug(f"Retrieving website for user {user_id}")
# For testing, create a session directly
from services.database import get_session_for_user
db = get_session_for_user(str(user_id))
if not db:
logger.warning(f"Could not create database session for user {user_id}")
return None
try:
website = db.query(UserWebsite).filter(
UserWebsite.user_id == user_id
).first()
if website:
logger.debug(f"Found website {website.id} for user {user_id}")
return UserWebsiteResponse(**website.to_dict())
logger.debug(f"No website found for user {user_id}")
return None
finally:
db.close()
except Exception as e:
logger.error(f"Failed to get website for user {user_id}: {str(e)}")
return None
def get_user_website_by_id(self, website_id: int) -> Optional[UserWebsiteResponse]:
"""Get website record by website ID."""
db: Session = next(get_db())
try:
logger.debug(f"Retrieving website {website_id}")
website = db.query(UserWebsite).filter(
UserWebsite.id == website_id
).first()
if website:
logger.debug(f"Found website {website_id}")
return UserWebsiteResponse(**website.to_dict())
logger.debug(f"Website {website_id} not found")
return None
except Exception as e:
logger.error(f"Failed to get website {website_id}: {str(e)}")
return None
finally:
db.close()
def update_user_website_status(
self,
user_id: int,
status_update: WebsiteStatusUpdate
) -> Optional[UserWebsiteResponse]:
"""Update website status and related fields."""
db: Session = next(get_db())
try:
logger.info(f"Updating website status for user {user_id} to {status_update.status}")
website = db.query(UserWebsite).filter(
UserWebsite.user_id == user_id
).first()
if not website:
logger.warning(f"No website found for user {user_id}")
return None
# Update fields
website.status = status_update.status.value
website.updated_at = datetime.utcnow()
if status_update.github_repo_url is not None:
website.github_repo_url = status_update.github_repo_url
if status_update.netlify_site_url is not None:
website.netlify_site_url = status_update.netlify_site_url
if status_update.netlify_admin_url is not None:
website.netlify_admin_url = status_update.netlify_admin_url
if status_update.preview_url is not None:
website.preview_url = status_update.preview_url
if status_update.error_message is not None:
website.error_message = status_update.error_message
db.commit()
db.refresh(website)
logger.success(f"Updated website {website.id} status to {status_update.status}")
return UserWebsiteResponse(**website.to_dict())
except Exception as e:
db.rollback()
logger.error(f"Failed to update website status for user {user_id}: {str(e)}")
raise
finally:
db.close()
def update_user_website_content(
self,
user_id: int,
site_brief: Optional[Dict[str, Any]] = None,
theme_tokens: Optional[Dict[str, Any]] = None,
custom_css: Optional[str] = None
) -> Optional[UserWebsiteResponse]:
"""Update website content (site brief, theme, CSS)."""
db: Session = next(get_db())
try:
logger.info(f"Updating website content for user {user_id}")
website = db.query(UserWebsite).filter(
UserWebsite.user_id == user_id
).first()
if not website:
logger.warning(f"No website found for user {user_id}")
return None
if site_brief is not None:
website.site_brief = site_brief
if theme_tokens is not None:
website.theme_tokens = theme_tokens
if custom_css is not None:
website.custom_css = custom_css
website.updated_at = datetime.utcnow()
db.commit()
db.refresh(website)
logger.success(f"Updated website {website.id} content")
return UserWebsiteResponse(**website.to_dict())
except Exception as e:
db.rollback()
logger.error(f"Failed to update website content for user {user_id}: {str(e)}")
raise
finally:
db.close()
def delete_user_website(self, user_id: int) -> bool:
"""Delete user website record."""
db: Session = next(get_db())
try:
logger.info(f"Deleting website for user {user_id}")
website = db.query(UserWebsite).filter(
UserWebsite.user_id == user_id
).first()
if not website:
logger.warning(f"No website found for user {user_id}")
return False
db.delete(website)
db.commit()
logger.success(f"Deleted website {website.id} for user {user_id}")
return True
except Exception as e:
db.rollback()
logger.error(f"Failed to delete website for user {user_id}: {str(e)}")
raise
finally:
db.close()
def get_all_user_websites(self, user_id: int) -> List[UserWebsiteResponse]:
"""Get all websites for a user (for history/audit)."""
db: Session = next(get_db())
try:
logger.debug(f"Retrieving all websites for user {user_id}")
websites = db.query(UserWebsite).filter(
UserWebsite.user_id == user_id
).order_by(desc(UserWebsite.created_at)).all()
return [UserWebsiteResponse(**website.to_dict()) for website in websites]
except Exception as e:
logger.error(f"Failed to get websites for user {user_id}: {str(e)}")
return []
finally:
db.close()
def get_websites_by_status(self, status: WebsiteStatus) -> List[UserWebsiteResponse]:
"""Get all websites with a specific status (for admin/monitoring)."""
db: Session = next(get_db())
try:
logger.debug(f"Retrieving websites with status {status}")
websites = db.query(UserWebsite).filter(
UserWebsite.status == status.value
).order_by(desc(UserWebsite.created_at)).all()
return [UserWebsiteResponse(**website.to_dict()) for website in websites]
except Exception as e:
logger.error(f"Failed to get websites with status {status}: {str(e)}")
return []
finally:
db.close()
# Singleton instance
user_website_service = UserWebsiteService()

View File

@@ -37,7 +37,8 @@
},
"scripts": {
"start": "react-scripts start",
"build": "react-scripts build",
"build": "node --max_old_space_size=8192 node_modules/react-scripts/scripts/build.js",
"build:nomap": "node --max_old_space_size=8192 -e \"process.env.GENERATE_SOURCEMAP='false'; require('./node_modules/react-scripts/scripts/build');\"",
"test": "react-scripts test",
"eject": "react-scripts eject",
"analyze": "npm run build && npx source-map-explorer 'build/static/js/*.js' --html bundle-report.html",

View File

@@ -54,12 +54,12 @@ const toWorkflowError = (error: unknown, fallbackMessage: string): WorkflowError
if (error instanceof WorkflowError) return error;
const message = error instanceof Error ? error.message : fallbackMessage;
return {
return new WorkflowError({
code: 'WORKFLOW_ERROR',
message,
timestamp: new Date(),
recoverable: false,
};
});
};
const computeProgressAndNavigation = (workflow: DailyWorkflow): { progress: WorkflowProgress; navigation: NavigationState } => {