Compare commits

..

1 Commits

Author SHA1 Message Date
ي
636989f75b Add forced user_id lint check and demo router gating 2026-03-30 08:13:48 +05:30
12 changed files with 356 additions and 220 deletions

View File

@@ -0,0 +1,23 @@
name: Lint Forced User ID Patterns
on:
pull_request:
push:
branches:
- main
jobs:
lint-forced-user-id:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Check for forced/hardcoded user_id patterns
run: python backend/scripts/check_forced_user_id_patterns.py

View File

@@ -16,6 +16,11 @@ class RouterManager:
self.app = app
self.included_routers = []
self.failed_routers = []
@staticmethod
def _demo_release_mode_enabled() -> bool:
"""Return True when demo-release safety mode is enabled."""
return os.getenv("ALWRITY_DEMO_RELEASE", "false").lower() in {"1", "true", "yes", "on"}
def include_router_safely(self, router, router_name: str = None) -> bool:
"""Include a router safely with error handling."""
@@ -88,16 +93,27 @@ class RouterManager:
from routers.seo_tools import router as seo_tools_router
self.include_router_safely(seo_tools_router, "seo_tools")
demo_release_mode = self._demo_release_mode_enabled()
# Facebook Writer router
from api.facebook_writer.routers import facebook_router
self.include_router_safely(facebook_router, "facebook_writer")
if demo_release_mode:
logger.info("⏭️ Skipping facebook_writer router in demo-release mode")
else:
from api.facebook_writer.routers import facebook_router
self.include_router_safely(facebook_router, "facebook_writer")
# LinkedIn routers
from routers.linkedin import router as linkedin_router
self.include_router_safely(linkedin_router, "linkedin")
if demo_release_mode:
logger.info("⏭️ Skipping linkedin router in demo-release mode")
else:
from routers.linkedin import router as linkedin_router
self.include_router_safely(linkedin_router, "linkedin")
from api.linkedin_image_generation import router as linkedin_image_router
self.include_router_safely(linkedin_image_router, "linkedin_image")
if demo_release_mode:
logger.info("⏭️ Skipping linkedin_image router in demo-release mode")
else:
from api.linkedin_image_generation import router as linkedin_image_router
self.include_router_safely(linkedin_image_router, "linkedin_image")
# Brainstorm router
from api.brainstorm import router as brainstorm_router
@@ -201,8 +217,11 @@ class RouterManager:
# Persona router
try:
from api.persona_routes import router as persona_router
self.include_router_safely(persona_router, "persona")
if self._demo_release_mode_enabled():
logger.info("⏭️ Skipping persona router in demo-release mode")
else:
from api.persona_routes import router as persona_router
self.include_router_safely(persona_router, "persona")
except Exception as e:
logger.warning(f"Persona router not mounted: {e}")

View File

@@ -1,3 +1,4 @@
import os
"""Facebook Post generation service."""
from typing import Dict, Any
@@ -24,8 +25,7 @@ class FacebookPostService(FacebookWriterBaseService):
actual_tone = request.custom_tone if request.post_tone.value == "Custom" else request.post_tone.value
# Get persona data for enhanced content generation
# Beta testing: Force user_id=1 for all requests
user_id = 1
user_id = int(os.getenv("ALWRITY_FALLBACK_USER_ID", "0"))
persona_data = self._get_persona_data(user_id)
# Build the prompt

View File

@@ -1,3 +1,4 @@
import os
"""Remaining Facebook Writer services - placeholder implementations."""
from typing import Dict, Any, List
@@ -16,8 +17,7 @@ class FacebookReelService(FacebookWriterBaseService):
actual_style = request.custom_style if request.reel_style.value == "Custom" else request.reel_style.value
# Get persona data for enhanced content generation
# Beta testing: Force user_id=1 for all requests
user_id = 1
user_id = int(os.getenv("ALWRITY_FALLBACK_USER_ID", "0"))
persona_data = self._get_persona_data(user_id)
base_prompt = f"""

View File

@@ -1,3 +1,4 @@
import os
"""Facebook Story generation service."""
from typing import Dict, Any, List
@@ -30,8 +31,7 @@ class FacebookStoryService(FacebookWriterBaseService):
actual_tone = request.custom_tone if request.story_tone.value == "Custom" else request.story_tone.value
# Get persona data for enhanced content generation
# Beta testing: Force user_id=1 for all requests
user_id = 1
user_id = int(os.getenv("ALWRITY_FALLBACK_USER_ID", "0"))
persona_data = self._get_persona_data(user_id)
# Build the prompt

View File

@@ -94,36 +94,36 @@ async def generate_platform_persona_endpoint(
async def update_persona_endpoint(
persona_id: int,
update_data: Dict[str, Any],
user_id: int = Query(..., description="User ID")
current_user: Dict[str, Any] = Depends(get_current_user),
):
"""Update an existing persona."""
# Beta testing: Force user_id=1 for all requests
return await update_persona(1, persona_id, update_data)
user_id = int(current_user.get("id"))
return await update_persona(user_id, persona_id, update_data)
@router.delete("/{persona_id}")
async def delete_persona_endpoint(
persona_id: int,
user_id: int = Query(..., description="User ID")
current_user: Dict[str, Any] = Depends(get_current_user),
):
"""Delete a persona."""
# Beta testing: Force user_id=1 for all requests
return await delete_persona(1, persona_id)
user_id = int(current_user.get("id"))
return await delete_persona(user_id, persona_id)
@router.get("/check/readiness")
async def check_persona_readiness_endpoint(
user_id: int = Query(1, description="User ID")
current_user: Dict[str, Any] = Depends(get_current_user),
):
"""Check if user has sufficient data for persona generation."""
# Beta testing: Force user_id=1 for all requests
return await validate_persona_generation_readiness(1)
user_id = int(current_user.get("id"))
return await validate_persona_generation_readiness(user_id)
@router.get("/preview/generate")
async def generate_preview_endpoint(
user_id: int = Query(1, description="User ID")
current_user: Dict[str, Any] = Depends(get_current_user),
):
"""Generate a preview of the writing persona without saving."""
# Beta testing: Force user_id=1 for all requests
return await generate_persona_preview(1)
user_id = int(current_user.get("id"))
return await generate_persona_preview(user_id)
@router.get("/platforms/supported")
async def get_supported_platforms_endpoint():
@@ -160,12 +160,12 @@ async def optimize_facebook_persona_endpoint(
@router.post("/generate-content")
async def generate_content_with_persona_endpoint(
request: Dict[str, Any]
request: Dict[str, Any],
current_user: Dict[str, Any] = Depends(get_current_user),
):
"""Generate content using persona replication engine."""
try:
# Beta testing: Force user_id=1 for all requests
user_id = 1
user_id = int(current_user.get("id"))
platform = request.get("platform")
content_request = request.get("content_request")
content_type = request.get("content_type", "post")
@@ -189,13 +189,13 @@ async def generate_content_with_persona_endpoint(
@router.get("/export/{platform}")
async def export_persona_prompt_endpoint(
platform: str,
user_id: int = Query(1, description="User ID")
current_user: Dict[str, Any] = Depends(get_current_user),
):
"""Export hardened persona prompt for external use."""
try:
engine = PersonaReplicationEngine()
# Beta testing: Force user_id=1 for all requests
export_package = engine.export_persona_for_external_use(1, platform)
user_id = int(current_user.get("id"))
export_package = engine.export_persona_for_external_use(user_id, platform)
if "error" in export_package:
raise HTTPException(status_code=400, detail=export_package["error"])
@@ -207,12 +207,12 @@ async def export_persona_prompt_endpoint(
@router.post("/validate-content")
async def validate_content_endpoint(
request: Dict[str, Any]
request: Dict[str, Any],
current_user: Dict[str, Any] = Depends(get_current_user),
):
"""Validate content against persona constraints."""
try:
# Beta testing: Force user_id=1 for all requests
user_id = 1
user_id = int(current_user.get("id"))
platform = request.get("platform")
content = request.get("content")
@@ -242,14 +242,14 @@ async def validate_content_endpoint(
async def update_platform_persona_endpoint(
platform: str,
update_data: Dict[str, Any],
user_id: int = Query(1, description="User ID")
current_user: Dict[str, Any] = Depends(get_current_user),
):
"""Update platform-specific persona fields for a user.
Allows editing persona fields in the UI and saving them to the database.
"""
# Beta testing: Force user_id=1 for all requests
return await update_platform_persona(1, platform, update_data)
user_id = int(current_user.get("id"))
return await update_platform_persona(user_id, platform, update_data)
@router.get("/facebook-persona/check/{user_id}")
async def check_facebook_persona_endpoint(

View File

@@ -48,8 +48,6 @@ load_dotenv(backend_dir / '.env') # backend/.env
load_dotenv(project_root / '.env') # root .env (fallback)
load_dotenv() # CWD .env (fallback)
PODCAST_ONLY_DEMO_MODE = os.getenv("PODCAST_ONLY_DEMO_MODE", "false").lower() in {"1", "true", "yes", "on"}
# Set up clean logging for end users
from logging_config import setup_clean_logging
setup_clean_logging()
@@ -112,37 +110,36 @@ from services.startup_health import (
# Import OAuth token monitoring routes
from api.oauth_token_monitoring_routes import router as oauth_token_monitoring_router
if not PODCAST_ONLY_DEMO_MODE:
# Import SEO Dashboard endpoints only when non-demo features are enabled
from api.seo_dashboard import (
get_seo_dashboard_data,
get_seo_health_score,
get_seo_metrics,
get_platform_status,
get_ai_insights,
seo_dashboard_health_check,
analyze_seo_comprehensive,
analyze_seo_full,
get_seo_metrics_detailed,
get_analysis_summary,
batch_analyze_urls,
SEOAnalysisRequest,
get_seo_dashboard_overview,
get_gsc_raw_data,
get_bing_raw_data,
get_competitive_insights,
get_deep_competitor_analysis,
run_strategic_insights,
get_strategic_insights_history,
refresh_analytics_data,
analyze_urls_ai,
AnalyzeURLsRequest,
get_analyzed_pages,
get_semantic_health,
get_semantic_cache_stats,
get_sif_indexing_health,
get_onboarding_task_health,
)
# Import SEO Dashboard endpoints
from api.seo_dashboard import (
get_seo_dashboard_data,
get_seo_health_score,
get_seo_metrics,
get_platform_status,
get_ai_insights,
seo_dashboard_health_check,
analyze_seo_comprehensive,
analyze_seo_full,
get_seo_metrics_detailed,
get_analysis_summary,
batch_analyze_urls,
SEOAnalysisRequest,
get_seo_dashboard_overview,
get_gsc_raw_data,
get_bing_raw_data,
get_competitive_insights,
get_deep_competitor_analysis,
run_strategic_insights,
get_strategic_insights_history,
refresh_analytics_data,
analyze_urls_ai,
AnalyzeURLsRequest,
get_analyzed_pages,
get_semantic_health,
get_semantic_cache_stats,
get_sif_indexing_health,
get_onboarding_task_health,
)
# Initialize FastAPI app
@@ -262,184 +259,194 @@ async def onboarding_status():
return onboarding_manager.get_onboarding_status()
# Include routers using modular utilities
if not PODCAST_ONLY_DEMO_MODE:
router_manager.include_core_routers()
router_manager.include_optional_routers()
else:
logger.info("PODCAST_ONLY_DEMO_MODE enabled: including only podcast and subscription feature routers.")
app.include_router(subscription_router)
router_manager.include_core_routers()
router_manager.include_optional_routers()
# Include assets serving router (must be mounted to serve generated images)
app.include_router(assets_serving_router)
if not PODCAST_ONLY_DEMO_MODE:
# SEO Dashboard endpoints
@app.get("/api/seo-dashboard/data")
async def seo_dashboard_data():
"""Get complete SEO dashboard data."""
return await get_seo_dashboard_data()
# SEO Dashboard endpoints
@app.get("/api/seo-dashboard/data")
async def seo_dashboard_data():
"""Get complete SEO dashboard data."""
return await get_seo_dashboard_data()
@app.get("/api/seo-dashboard/health-score")
async def seo_health_score():
"""Get SEO health score."""
return await get_seo_health_score()
@app.get("/api/seo-dashboard/health-score")
async def seo_health_score():
"""Get SEO health score."""
return await get_seo_health_score()
@app.get("/api/seo-dashboard/metrics")
async def seo_metrics():
"""Get SEO metrics."""
return await get_seo_metrics()
@app.get("/api/seo-dashboard/metrics")
async def seo_metrics():
"""Get SEO metrics."""
return await get_seo_metrics()
@app.get("/api/seo-dashboard/platforms")
async def seo_platforms(current_user: dict = Depends(get_current_user)):
"""Get platform status."""
return await get_platform_status(current_user)
@app.get("/api/seo-dashboard/platforms")
async def seo_platforms(current_user: dict = Depends(get_current_user)):
"""Get platform status."""
return await get_platform_status(current_user)
@app.get("/api/seo-dashboard/insights")
async def seo_insights():
"""Get AI insights."""
return await get_ai_insights()
@app.get("/api/seo-dashboard/insights")
async def seo_insights():
"""Get AI insights."""
return await get_ai_insights()
# New SEO Dashboard endpoints with real data
@app.get("/api/seo-dashboard/overview")
async def seo_dashboard_overview_endpoint(current_user: dict = Depends(get_current_user), site_url: str = None):
"""Get comprehensive SEO dashboard overview with real GSC/Bing data."""
return await get_seo_dashboard_overview(current_user, site_url)
# New SEO Dashboard endpoints with real data
@app.get("/api/seo-dashboard/overview")
async def seo_dashboard_overview_endpoint(current_user: dict = Depends(get_current_user), site_url: str = None):
"""Get comprehensive SEO dashboard overview with real GSC/Bing data."""
return await get_seo_dashboard_overview(current_user, site_url)
@app.get("/api/seo-dashboard/gsc/raw")
async def gsc_raw_data_endpoint(current_user: dict = Depends(get_current_user), site_url: str = None):
"""Get raw GSC data for the specified site."""
return await get_gsc_raw_data(current_user, site_url)
@app.get("/api/seo-dashboard/gsc/raw")
async def gsc_raw_data_endpoint(current_user: dict = Depends(get_current_user), site_url: str = None):
"""Get raw GSC data for the specified site."""
return await get_gsc_raw_data(current_user, site_url)
@app.get("/api/seo-dashboard/bing/raw")
async def bing_raw_data_endpoint(current_user: dict = Depends(get_current_user), site_url: str = None):
"""Get raw Bing data for the specified site."""
return await get_bing_raw_data(current_user, site_url)
@app.get("/api/seo-dashboard/bing/raw")
async def bing_raw_data_endpoint(current_user: dict = Depends(get_current_user), site_url: str = None):
"""Get raw Bing data for the specified site."""
return await get_bing_raw_data(current_user, site_url)
@app.get("/api/seo-dashboard/competitive-insights")
async def competitive_insights_endpoint(current_user: dict = Depends(get_current_user), site_url: str = None):
"""Get competitive insights from onboarding step 3 data."""
return await get_competitive_insights(current_user, site_url)
@app.get("/api/seo-dashboard/competitive-insights")
async def competitive_insights_endpoint(current_user: dict = Depends(get_current_user), site_url: str = None):
"""Get competitive insights from onboarding step 3 data."""
return await get_competitive_insights(current_user, site_url)
@app.get("/api/seo-dashboard/deep-competitor-analysis")
async def deep_competitor_analysis_endpoint(current_user: dict = Depends(get_current_user), site_url: str = None):
"""Get deep competitor analysis results (auto-scheduled post-onboarding)."""
return await get_deep_competitor_analysis(current_user, site_url)
@app.get("/api/seo-dashboard/deep-competitor-analysis")
async def deep_competitor_analysis_endpoint(current_user: dict = Depends(get_current_user), site_url: str = None):
"""Get deep competitor analysis results (auto-scheduled post-onboarding)."""
return await get_deep_competitor_analysis(current_user, site_url)
@app.post("/api/seo-dashboard/strategic-insights/run")
async def run_strategic_insights_endpoint(current_user: dict = Depends(get_current_user)):
"""Run AI-powered strategic insights analysis manually."""
return await run_strategic_insights(current_user)
@app.post("/api/seo-dashboard/strategic-insights/run")
async def run_strategic_insights_endpoint(current_user: dict = Depends(get_current_user)):
"""Run AI-powered strategic insights analysis manually."""
return await run_strategic_insights(current_user)
@app.get("/api/seo-dashboard/strategic-insights/history")
async def get_strategic_insights_history_endpoint(current_user: dict = Depends(get_current_user)):
"""Fetch the history of strategic insights for the user."""
return await get_strategic_insights_history(current_user)
@app.get("/api/seo-dashboard/strategic-insights/history")
async def get_strategic_insights_history_endpoint(current_user: dict = Depends(get_current_user)):
"""Fetch the history of strategic insights for the user."""
return await get_strategic_insights_history(current_user)
@app.post("/api/seo-dashboard/refresh")
async def refresh_analytics_data_endpoint(current_user: dict = Depends(get_current_user), site_url: str = None):
"""Refresh analytics data by invalidating cache and fetching fresh data."""
return await refresh_analytics_data(current_user, site_url)
@app.post("/api/seo-dashboard/refresh")
async def refresh_analytics_data_endpoint(current_user: dict = Depends(get_current_user), site_url: str = None):
"""Refresh analytics data by invalidating cache and fetching fresh data."""
return await refresh_analytics_data(current_user, site_url)
@app.get("/api/seo-dashboard/onboarding-task-health")
async def onboarding_task_health_endpoint(current_user: dict = Depends(get_current_user), site_url: str = None):
"""Get consolidated health for onboarding-scheduled SEO tasks."""
return await get_onboarding_task_health(current_user, site_url)
@app.get("/api/seo-dashboard/health")
async def seo_dashboard_health():
"""Health check for SEO dashboard."""
return await seo_dashboard_health_check()
# Phase 2B: Semantic health monitoring endpoint (24-hour polling)
@app.get("/api/seo-dashboard/semantic-health")
async def semantic_health_endpoint(current_user: dict = Depends(get_current_user)):
"""Get real-time semantic health metrics for content and competitors."""
return await get_semantic_health(current_user)
@app.get("/api/seo-dashboard/onboarding-task-health")
async def onboarding_task_health_endpoint(current_user: dict = Depends(get_current_user), site_url: str = None):
"""Get consolidated health for onboarding-scheduled SEO tasks."""
return await get_onboarding_task_health(current_user, site_url)
@app.get("/api/seo-dashboard/cache-stats")
async def semantic_cache_stats_endpoint(current_user: dict = Depends(get_current_user)):
"""Get semantic cache performance statistics."""
return await get_semantic_cache_stats(current_user)
@app.get("/api/seo-dashboard/health")
async def seo_dashboard_health():
"""Health check for SEO dashboard."""
return await seo_dashboard_health_check()
@app.get("/api/seo-dashboard/sif-health")
async def sif_indexing_health_endpoint(current_user: dict = Depends(get_current_user)):
"""Get SIF indexing health summary for the current user."""
return await get_sif_indexing_health(current_user)
# Phase 2B: Semantic health monitoring endpoint (24-hour polling)
@app.get("/api/seo-dashboard/semantic-health")
async def semantic_health_endpoint(current_user: dict = Depends(get_current_user)):
"""
Get real-time semantic health metrics for content and competitors.
This endpoint provides Phase 2B semantic intelligence monitoring data.
Returns semantic health score, status, and recommendations.
Data is cached and updated every 24 hours via scheduler.
"""
return await get_semantic_health(current_user)
# Comprehensive SEO Analysis endpoints
@app.post("/api/seo-dashboard/analyze-comprehensive")
async def analyze_seo_comprehensive_endpoint(request: SEOAnalysisRequest):
"""Analyze a URL for comprehensive SEO performance."""
return await analyze_seo_comprehensive(request)
@app.post("/api/seo-dashboard/analyze-full")
async def analyze_seo_full_endpoint(request: SEOAnalysisRequest):
"""Analyze a URL for comprehensive SEO performance."""
return await analyze_seo_full(request)
@app.get("/api/seo-dashboard/cache-stats")
async def semantic_cache_stats_endpoint(current_user: dict = Depends(get_current_user)):
"""
Get semantic cache performance statistics.
Returns hit rate, memory usage, and eviction counts.
"""
return await get_semantic_cache_stats(current_user)
@app.get("/api/seo-dashboard/metrics-detailed")
async def seo_metrics_detailed(url: str):
"""Get detailed SEO metrics for a URL."""
return await get_seo_metrics_detailed(url)
@app.get("/api/seo-dashboard/analysis-summary")
async def seo_analysis_summary(url: str):
"""Get a quick summary of SEO analysis for a URL."""
return await get_analysis_summary(url)
@app.get("/api/seo-dashboard/sif-health")
async def sif_indexing_health_endpoint(current_user: dict = Depends(get_current_user)):
"""
Get SIF indexing health summary for the current user.
Used by the Semantic Indexing Status widget on the dashboard.
"""
return await get_sif_indexing_health(current_user)
@app.post("/api/seo-dashboard/batch-analyze")
async def batch_analyze_urls_endpoint(urls: list[str]):
"""Analyze multiple URLs in batch."""
return await batch_analyze_urls(urls)
# Comprehensive SEO Analysis endpoints
@app.post("/api/seo-dashboard/analyze-comprehensive")
async def analyze_seo_comprehensive_endpoint(request: SEOAnalysisRequest):
"""Analyze a URL for comprehensive SEO performance."""
return await analyze_seo_comprehensive(request)
@app.post("/api/seo-dashboard/analyze-urls-ai")
async def analyze_urls_ai_endpoint(request: AnalyzeURLsRequest, current_user: dict = Depends(get_current_user)):
"""Run AI-powered SEO analysis on selected URLs."""
return await analyze_urls_ai(request, current_user)
@app.post("/api/seo-dashboard/analyze-full")
async def analyze_seo_full_endpoint(request: SEOAnalysisRequest):
"""Analyze a URL for comprehensive SEO performance."""
return await analyze_seo_full(request)
# Include platform analytics router
from routers.platform_analytics import router as platform_analytics_router
app.include_router(platform_analytics_router)
# Include Bing Analytics Storage router to expose storage-backed endpoints
from routers.bing_analytics_storage import router as bing_analytics_storage_router
app.include_router(bing_analytics_storage_router)
app.include_router(images_router)
app.include_router(image_studio_router)
app.include_router(product_marketing_router)
app.include_router(campaign_creator_router)
@app.get("/api/seo-dashboard/metrics-detailed")
async def seo_metrics_detailed(url: str):
"""Get detailed SEO metrics for a URL."""
return await get_seo_metrics_detailed(url)
# Include content assets router
from api.content_assets.router import router as content_assets_router
app.include_router(content_assets_router)
@app.get("/api/seo-dashboard/analysis-summary")
async def seo_analysis_summary(url: str):
"""Get a quick summary of SEO analysis for a URL."""
return await get_analysis_summary(url)
@app.post("/api/seo-dashboard/batch-analyze")
async def batch_analyze_urls_endpoint(urls: list[str]):
"""Analyze multiple URLs in batch."""
return await batch_analyze_urls(urls)
@app.post("/api/seo-dashboard/analyze-urls-ai")
async def analyze_urls_ai_endpoint(request: AnalyzeURLsRequest, current_user: dict = Depends(get_current_user)):
"""Run AI-powered SEO analysis on selected URLs."""
return await analyze_urls_ai(request, current_user)
# Include platform analytics router
from routers.platform_analytics import router as platform_analytics_router
app.include_router(platform_analytics_router)
# Include Bing Analytics Storage router to expose storage-backed endpoints
from routers.bing_analytics_storage import router as bing_analytics_storage_router
app.include_router(bing_analytics_storage_router)
app.include_router(images_router)
app.include_router(image_studio_router)
app.include_router(product_marketing_router)
app.include_router(campaign_creator_router)
# Include content assets router
from api.content_assets.router import router as content_assets_router
app.include_router(content_assets_router)
# Include Podcast Maker router
from api.podcast.router import router as podcast_router
app.include_router(podcast_router)
if not PODCAST_ONLY_DEMO_MODE:
# Include YouTube Creator Studio router
from api.youtube.router import router as youtube_router
app.include_router(youtube_router, prefix="/api")
# Include YouTube Creator Studio router
from api.youtube.router import router as youtube_router
app.include_router(youtube_router, prefix="/api")
# Include research configuration router
app.include_router(research_config_router, prefix="/api/research", tags=["research"])
# Include research configuration router
app.include_router(research_config_router, prefix="/api/research", tags=["research"])
# Include Research Engine router (standalone AI research module)
from api.research.router import router as research_engine_router
app.include_router(research_engine_router, tags=["Research Engine"])
# Include Research Engine router (standalone AI research module)
from api.research.router import router as research_engine_router
app.include_router(research_engine_router, tags=["Research Engine"])
# Scheduler dashboard routes
from api.scheduler_dashboard import router as scheduler_dashboard_router
app.include_router(scheduler_dashboard_router)
app.include_router(oauth_token_monitoring_router)
# Scheduler dashboard routes
from api.scheduler_dashboard import router as scheduler_dashboard_router
app.include_router(scheduler_dashboard_router)
app.include_router(oauth_token_monitoring_router)
# Autonomous Agents API routes (Phase 3A)
from api.agents_api import router as agents_router
app.include_router(agents_router)
# Autonomous Agents API routes (Phase 3A)
from api.agents_api import router as agents_router
app.include_router(agents_router)
# Today workflow routes
from api.today_workflow import router as today_workflow_router
app.include_router(today_workflow_router)
# Today workflow routes
from api.today_workflow import router as today_workflow_router
app.include_router(today_workflow_router)
# Setup frontend serving using modular utilities
frontend_serving.setup_frontend_serving()

View File

@@ -0,0 +1,70 @@
#!/usr/bin/env python3
"""Fail CI on forced/hardcoded user_id patterns outside test fixtures."""
from __future__ import annotations
import re
import sys
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[2]
CHECK_GLOBS = ("**/*.py",)
EXCLUDED_SUBSTRINGS = (
"/.git/",
"/.venv/",
"/venv/",
"/node_modules/",
"/__pycache__/",
"/tests/",
"/test_",
"/fixtures/",
"/test_validation/",
"/backend/scripts/check_forced_user_id_patterns.py",
)
RULES = [
(re.compile(r"\buser_id\s*=\s*1\b"), "hardcoded `user_id = 1`"),
(re.compile(r"force\s+user_id", re.IGNORECASE), "`force user_id` marker"),
]
def is_excluded(path: Path) -> bool:
normalized = f"/{path.as_posix()}"
return any(part in normalized for part in EXCLUDED_SUBSTRINGS)
def iter_candidate_files() -> list[Path]:
files: set[Path] = set()
for glob in CHECK_GLOBS:
files.update(REPO_ROOT.glob(glob))
return sorted(p for p in files if p.is_file() and not is_excluded(p.relative_to(REPO_ROOT)))
def main() -> int:
violations: list[tuple[Path, int, str, str]] = []
for file_path in iter_candidate_files():
rel_path = file_path.relative_to(REPO_ROOT)
try:
text = file_path.read_text(encoding="utf-8")
except UnicodeDecodeError:
continue
for line_number, line in enumerate(text.splitlines(), start=1):
for pattern, label in RULES:
if pattern.search(line):
violations.append((rel_path, line_number, label, line.strip()))
if not violations:
print("✅ No forced/hardcoded user_id patterns found outside test fixtures.")
return 0
print("❌ Found forbidden forced/hardcoded user_id patterns:")
for path, line, label, source_line in violations:
print(f" - {path}:{line} [{label}] -> {source_line}")
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -410,8 +410,7 @@ class ContentGenerator:
raise Exception("Gemini Grounded Provider not available - cannot generate content without AI provider")
# Build the prompt for grounded generation using persona if available (DB vs session override)
# Beta testing: Force user_id=1 for all requests
user_id = 1
user_id = int(getattr(request, "user_id", 0) or 0)
persona_data = self._get_cached_persona_data(user_id, 'linkedin')
if getattr(request, 'persona_override', None):
try:
@@ -485,8 +484,7 @@ class ContentGenerator:
raise Exception("Gemini Grounded Provider not available - cannot generate content without AI provider")
# Build the prompt for grounded generation using persona if available (DB vs session override)
# Beta testing: Force user_id=1 for all requests
user_id = 1
user_id = int(getattr(request, "user_id", 0) or 0)
persona_data = self._get_cached_persona_data(user_id, 'linkedin')
if getattr(request, 'persona_override', None):
try:

View File

@@ -23,6 +23,11 @@ class MonitoringDataService:
def __init__(self, db_session: Session):
self.db = db_session
def _resolve_strategy_user_id(self, strategy_id: int) -> str:
strategy = self.db.query(EnhancedContentStrategy).filter(EnhancedContentStrategy.id == strategy_id).first()
return str(getattr(strategy, "user_id", "0") or "0")
async def save_monitoring_data(self, strategy_id: int, monitoring_plan: Dict[str, Any]) -> bool:
"""Save monitoring plan and tasks to database."""
try:
@@ -65,19 +70,22 @@ class MonitoringDataService:
self.db.add(task)
strategy_user_id = self._resolve_strategy_user_id(strategy_id)
# Save activation status
activation_status = StrategyActivationStatus(
strategy_id=strategy_id,
user_id=1, # Default user ID
user_id=strategy_user_id,
activation_date=datetime.utcnow(),
status='active'
)
self.db.add(activation_status)
# Save initial performance metrics
strategy_user_id = self._resolve_strategy_user_id(strategy_id)
performance_metrics = StrategyPerformanceMetrics(
strategy_id=strategy_id,
user_id=1, # Default user ID
user_id=strategy_user_id,
metric_date=datetime.utcnow(),
data_source='monitoring_plan',
confidence_score=85 # High confidence for monitoring plan data
@@ -341,10 +349,11 @@ class MonitoringDataService:
"""Update performance metrics for a strategy."""
try:
logger.info(f"Updating performance metrics for strategy {strategy_id}")
strategy_user_id = self._resolve_strategy_user_id(strategy_id)
performance_metrics = StrategyPerformanceMetrics(
strategy_id=strategy_id,
user_id=1, # Default user ID
user_id=strategy_user_id,
metric_date=datetime.utcnow(),
traffic_growth_percentage=metrics.get('traffic_growth'),
engagement_rate_percentage=metrics.get('engagement_rate'),

View File

@@ -1,3 +1,4 @@
import os
from typing import Dict, Any, List, Optional
from sqlalchemy.orm import Session
from loguru import logger
@@ -21,7 +22,7 @@ class StrategyCopilotService:
"""Generate data for a specific category."""
try:
# Get user onboarding data
user_id = 1 # TODO: Get from auth context
user_id = int(os.getenv("ALWRITY_FALLBACK_USER_ID", "0"))
integrated_data = await self.onboarding_integration_service.process_onboarding_data(str(user_id), self.db)
onboarding_data = integrated_data.get('canonical_profile', {})
@@ -81,7 +82,7 @@ class StrategyCopilotService:
"""Analyze complete strategy for completeness and coherence."""
try:
# Get user data for context
user_id = 1 # TODO: Get from auth context
user_id = int(os.getenv("ALWRITY_FALLBACK_USER_ID", "0"))
integrated_data = await self.onboarding_integration_service.process_onboarding_data(str(user_id), self.db)
onboarding_data = integrated_data.get('canonical_profile', {})
@@ -118,7 +119,7 @@ class StrategyCopilotService:
field_definition = self._get_field_definition(field_id)
# Get user data
user_id = 1 # TODO: Get from auth context
user_id = int(os.getenv("ALWRITY_FALLBACK_USER_ID", "0"))
# Use SSOT
integrated_data = await self.onboarding_integration_service.process_onboarding_data(str(user_id), self.db)
onboarding_data = integrated_data.get('canonical_profile', {})

View File

@@ -120,6 +120,15 @@ class SIFReleaseReadinessTests(unittest.IsolatedAsyncioTestCase):
self.assertFalse(validation["is_contextual"])
self.assertEqual(validation["tasks_below_min_evidence"], 1)
def test_demo_release_flag_guards_sensitive_routers(self):
source = Path("backend/alwrity_utils/router_manager.py").read_text()
self.assertIn("ALWRITY_DEMO_RELEASE", source)
self.assertIn("Skipping facebook_writer router in demo-release mode", source)
self.assertIn("Skipping linkedin router in demo-release mode", source)
self.assertIn("Skipping linkedin_image router in demo-release mode", source)
self.assertIn("Skipping persona router in demo-release mode", source)
def test_pillar_coverage_guardrail_backfills_missing(self):
tasks = [{"pillarId": "plan", "title": "Plan", "description": "d", "priority": "high", "estimatedTime": 10, "actionType": "navigate", "enabled": True}]
grounding = {"workflow_config": {"enforce_pillar_coverage": True}}