Compare commits

..

1 Commits

Author SHA1 Message Date
ي
636989f75b Add forced user_id lint check and demo router gating 2026-03-30 08:13:48 +05:30
13 changed files with 177 additions and 446 deletions

View File

@@ -0,0 +1,23 @@
name: Lint Forced User ID Patterns
on:
pull_request:
push:
branches:
- main
jobs:
lint-forced-user-id:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Check for forced/hardcoded user_id patterns
run: python backend/scripts/check_forced_user_id_patterns.py

View File

@@ -1,43 +0,0 @@
{
"preflight": {
"success": true,
"can_proceed": true,
"estimated_cost": 0.3
},
"operations": {
"analysis_title_suggestions": [
"AI Agents in 2026",
"Ship Faster with AI",
"Startup AI Playbook"
],
"research_provider": "exa",
"research_cost": 0.015,
"video_task_status": "completed"
},
"dashboard_deltas": {
"total_calls_before": 1,
"total_calls_after": 5,
"delta_calls": 4,
"total_cost_before": 0.09,
"total_cost_after": 0.488,
"delta_cost": 0.398,
"projected_monthly_cost_before": 0.09,
"projected_monthly_cost_after": 0.49,
"delta_projected_monthly_cost": 0.4
},
"provider_cost_deltas": {
"exa": 0.005,
"huggingface": 0.003,
"wavespeed": 0.39
},
"acceptance": {
"passed": true,
"criteria": {
"preflight_success": true,
"usage_cost_incremented": true,
"usage_call_incremented": true,
"projection_incremented": true,
"provider_delta_present": true
}
}
}

View File

@@ -16,6 +16,11 @@ class RouterManager:
self.app = app
self.included_routers = []
self.failed_routers = []
@staticmethod
def _demo_release_mode_enabled() -> bool:
"""Return True when demo-release safety mode is enabled."""
return os.getenv("ALWRITY_DEMO_RELEASE", "false").lower() in {"1", "true", "yes", "on"}
def include_router_safely(self, router, router_name: str = None) -> bool:
"""Include a router safely with error handling."""
@@ -88,16 +93,27 @@ class RouterManager:
from routers.seo_tools import router as seo_tools_router
self.include_router_safely(seo_tools_router, "seo_tools")
demo_release_mode = self._demo_release_mode_enabled()
# Facebook Writer router
from api.facebook_writer.routers import facebook_router
self.include_router_safely(facebook_router, "facebook_writer")
if demo_release_mode:
logger.info("⏭️ Skipping facebook_writer router in demo-release mode")
else:
from api.facebook_writer.routers import facebook_router
self.include_router_safely(facebook_router, "facebook_writer")
# LinkedIn routers
from routers.linkedin import router as linkedin_router
self.include_router_safely(linkedin_router, "linkedin")
if demo_release_mode:
logger.info("⏭️ Skipping linkedin router in demo-release mode")
else:
from routers.linkedin import router as linkedin_router
self.include_router_safely(linkedin_router, "linkedin")
from api.linkedin_image_generation import router as linkedin_image_router
self.include_router_safely(linkedin_image_router, "linkedin_image")
if demo_release_mode:
logger.info("⏭️ Skipping linkedin_image router in demo-release mode")
else:
from api.linkedin_image_generation import router as linkedin_image_router
self.include_router_safely(linkedin_image_router, "linkedin_image")
# Brainstorm router
from api.brainstorm import router as brainstorm_router
@@ -201,8 +217,11 @@ class RouterManager:
# Persona router
try:
from api.persona_routes import router as persona_router
self.include_router_safely(persona_router, "persona")
if self._demo_release_mode_enabled():
logger.info("⏭️ Skipping persona router in demo-release mode")
else:
from api.persona_routes import router as persona_router
self.include_router_safely(persona_router, "persona")
except Exception as e:
logger.warning(f"Persona router not mounted: {e}")

View File

@@ -1,3 +1,4 @@
import os
"""Facebook Post generation service."""
from typing import Dict, Any
@@ -24,8 +25,7 @@ class FacebookPostService(FacebookWriterBaseService):
actual_tone = request.custom_tone if request.post_tone.value == "Custom" else request.post_tone.value
# Get persona data for enhanced content generation
# Beta testing: Force user_id=1 for all requests
user_id = 1
user_id = int(os.getenv("ALWRITY_FALLBACK_USER_ID", "0"))
persona_data = self._get_persona_data(user_id)
# Build the prompt

View File

@@ -1,3 +1,4 @@
import os
"""Remaining Facebook Writer services - placeholder implementations."""
from typing import Dict, Any, List
@@ -16,8 +17,7 @@ class FacebookReelService(FacebookWriterBaseService):
actual_style = request.custom_style if request.reel_style.value == "Custom" else request.reel_style.value
# Get persona data for enhanced content generation
# Beta testing: Force user_id=1 for all requests
user_id = 1
user_id = int(os.getenv("ALWRITY_FALLBACK_USER_ID", "0"))
persona_data = self._get_persona_data(user_id)
base_prompt = f"""

View File

@@ -1,3 +1,4 @@
import os
"""Facebook Story generation service."""
from typing import Dict, Any, List
@@ -30,8 +31,7 @@ class FacebookStoryService(FacebookWriterBaseService):
actual_tone = request.custom_tone if request.story_tone.value == "Custom" else request.story_tone.value
# Get persona data for enhanced content generation
# Beta testing: Force user_id=1 for all requests
user_id = 1
user_id = int(os.getenv("ALWRITY_FALLBACK_USER_ID", "0"))
persona_data = self._get_persona_data(user_id)
# Build the prompt

View File

@@ -94,36 +94,36 @@ async def generate_platform_persona_endpoint(
async def update_persona_endpoint(
persona_id: int,
update_data: Dict[str, Any],
user_id: int = Query(..., description="User ID")
current_user: Dict[str, Any] = Depends(get_current_user),
):
"""Update an existing persona."""
# Beta testing: Force user_id=1 for all requests
return await update_persona(1, persona_id, update_data)
user_id = int(current_user.get("id"))
return await update_persona(user_id, persona_id, update_data)
@router.delete("/{persona_id}")
async def delete_persona_endpoint(
persona_id: int,
user_id: int = Query(..., description="User ID")
current_user: Dict[str, Any] = Depends(get_current_user),
):
"""Delete a persona."""
# Beta testing: Force user_id=1 for all requests
return await delete_persona(1, persona_id)
user_id = int(current_user.get("id"))
return await delete_persona(user_id, persona_id)
@router.get("/check/readiness")
async def check_persona_readiness_endpoint(
user_id: int = Query(1, description="User ID")
current_user: Dict[str, Any] = Depends(get_current_user),
):
"""Check if user has sufficient data for persona generation."""
# Beta testing: Force user_id=1 for all requests
return await validate_persona_generation_readiness(1)
user_id = int(current_user.get("id"))
return await validate_persona_generation_readiness(user_id)
@router.get("/preview/generate")
async def generate_preview_endpoint(
user_id: int = Query(1, description="User ID")
current_user: Dict[str, Any] = Depends(get_current_user),
):
"""Generate a preview of the writing persona without saving."""
# Beta testing: Force user_id=1 for all requests
return await generate_persona_preview(1)
user_id = int(current_user.get("id"))
return await generate_persona_preview(user_id)
@router.get("/platforms/supported")
async def get_supported_platforms_endpoint():
@@ -160,12 +160,12 @@ async def optimize_facebook_persona_endpoint(
@router.post("/generate-content")
async def generate_content_with_persona_endpoint(
request: Dict[str, Any]
request: Dict[str, Any],
current_user: Dict[str, Any] = Depends(get_current_user),
):
"""Generate content using persona replication engine."""
try:
# Beta testing: Force user_id=1 for all requests
user_id = 1
user_id = int(current_user.get("id"))
platform = request.get("platform")
content_request = request.get("content_request")
content_type = request.get("content_type", "post")
@@ -189,13 +189,13 @@ async def generate_content_with_persona_endpoint(
@router.get("/export/{platform}")
async def export_persona_prompt_endpoint(
platform: str,
user_id: int = Query(1, description="User ID")
current_user: Dict[str, Any] = Depends(get_current_user),
):
"""Export hardened persona prompt for external use."""
try:
engine = PersonaReplicationEngine()
# Beta testing: Force user_id=1 for all requests
export_package = engine.export_persona_for_external_use(1, platform)
user_id = int(current_user.get("id"))
export_package = engine.export_persona_for_external_use(user_id, platform)
if "error" in export_package:
raise HTTPException(status_code=400, detail=export_package["error"])
@@ -207,12 +207,12 @@ async def export_persona_prompt_endpoint(
@router.post("/validate-content")
async def validate_content_endpoint(
request: Dict[str, Any]
request: Dict[str, Any],
current_user: Dict[str, Any] = Depends(get_current_user),
):
"""Validate content against persona constraints."""
try:
# Beta testing: Force user_id=1 for all requests
user_id = 1
user_id = int(current_user.get("id"))
platform = request.get("platform")
content = request.get("content")
@@ -242,14 +242,14 @@ async def validate_content_endpoint(
async def update_platform_persona_endpoint(
platform: str,
update_data: Dict[str, Any],
user_id: int = Query(1, description="User ID")
current_user: Dict[str, Any] = Depends(get_current_user),
):
"""Update platform-specific persona fields for a user.
Allows editing persona fields in the UI and saving them to the database.
"""
# Beta testing: Force user_id=1 for all requests
return await update_platform_persona(1, platform, update_data)
user_id = int(current_user.get("id"))
return await update_platform_persona(user_id, platform, update_data)
@router.get("/facebook-persona/check/{user_id}")
async def check_facebook_persona_endpoint(

View File

@@ -0,0 +1,70 @@
#!/usr/bin/env python3
"""Fail CI on forced/hardcoded user_id patterns outside test fixtures."""
from __future__ import annotations
import re
import sys
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[2]
CHECK_GLOBS = ("**/*.py",)
EXCLUDED_SUBSTRINGS = (
"/.git/",
"/.venv/",
"/venv/",
"/node_modules/",
"/__pycache__/",
"/tests/",
"/test_",
"/fixtures/",
"/test_validation/",
"/backend/scripts/check_forced_user_id_patterns.py",
)
RULES = [
(re.compile(r"\buser_id\s*=\s*1\b"), "hardcoded `user_id = 1`"),
(re.compile(r"force\s+user_id", re.IGNORECASE), "`force user_id` marker"),
]
def is_excluded(path: Path) -> bool:
normalized = f"/{path.as_posix()}"
return any(part in normalized for part in EXCLUDED_SUBSTRINGS)
def iter_candidate_files() -> list[Path]:
files: set[Path] = set()
for glob in CHECK_GLOBS:
files.update(REPO_ROOT.glob(glob))
return sorted(p for p in files if p.is_file() and not is_excluded(p.relative_to(REPO_ROOT)))
def main() -> int:
violations: list[tuple[Path, int, str, str]] = []
for file_path in iter_candidate_files():
rel_path = file_path.relative_to(REPO_ROOT)
try:
text = file_path.read_text(encoding="utf-8")
except UnicodeDecodeError:
continue
for line_number, line in enumerate(text.splitlines(), start=1):
for pattern, label in RULES:
if pattern.search(line):
violations.append((rel_path, line_number, label, line.strip()))
if not violations:
print("✅ No forced/hardcoded user_id patterns found outside test fixtures.")
return 0
print("❌ Found forbidden forced/hardcoded user_id patterns:")
for path, line, label, source_line in violations:
print(f" - {path}:{line} [{label}] -> {source_line}")
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,355 +0,0 @@
#!/usr/bin/env python3
"""Run podcast preflight + operations and verify billing usage/cost deltas."""
import os
import json
import asyncio
from pathlib import Path
from typing import Any
# Use mock auth in local test runs
os.environ.setdefault("DISABLE_AUTH", "true")
os.environ.setdefault("ALLOW_UNVERIFIED_JWT_DEV", "true")
os.environ.setdefault(
"STRIPE_PLAN_PRICE_MAPPING_TEST",
"{\"basic\": {\"monthly\": \"price_test_basic_monthly\"}, \"pro\": {\"monthly\": \"price_test_pro_monthly\"}}",
)
os.environ.setdefault("EXA_API_KEY", "test-exa-key")
import spacy
# Avoid hard dependency on downloaded spaCy model during router imports.
spacy.load = lambda _name, *args, **kwargs: object() # type: ignore[assignment]
from fastapi import FastAPI
from fastapi.testclient import TestClient
# Import only required routers (avoids heavyweight app startup deps)
from api.podcast.router import router as podcast_router
from api.subscription import router as subscription_router
from api.podcast.handlers import analysis as analysis_handler
from api.podcast.handlers import research as research_handler
from api.podcast.handlers import video as video_handler
from api.podcast.constants import get_podcast_media_dir, PODCAST_IMAGES_DIR
from services.database import get_session_for_user
from services.subscription.usage_tracking_service import UsageTrackingService
from models.subscription_models import APIProvider
USER_ID = "mock_user_id"
AUTH_HEADERS = {"Authorization": "Bearer test-token"}
BILLING_PERIOD = "2026-03"
def _ensure_test_media_files(user_id: str) -> tuple[str, str]:
audio_dir = get_podcast_media_dir("audio", user_id, ensure_exists=True)
image_dir = get_podcast_media_dir("image", user_id, ensure_exists=True)
audio_file = audio_dir / "sequence_test_audio.mp3"
image_file = image_dir / "sequence_test_image.png"
if not audio_file.exists():
audio_file.write_bytes(b"ID3" + b"\x00" * 512)
if not image_file.exists():
# Minimal PNG header-like bytes (sufficient for mocked pipeline)
image_file.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * 512)
# Also place in legacy global dir for URL resolver compatibility.
PODCAST_IMAGES_DIR.mkdir(parents=True, exist_ok=True)
legacy_image_file = PODCAST_IMAGES_DIR / image_file.name
if not legacy_image_file.exists():
legacy_image_file.write_bytes(image_file.read_bytes())
return (
f"/api/podcast/audio/{audio_file.name}",
f"/api/podcast/images/{image_file.name}",
)
def _patch_external_calls() -> None:
# 1) Podcast analysis: avoid real LLM calls
def _mock_llm_text_gen(*args: Any, **kwargs: Any) -> dict[str, Any]:
return {
"audience": "US founders building AI products",
"content_type": "interview",
"top_keywords": ["ai agent", "startup", "gtm", "cost", "automation"],
"suggested_outlines": [
{"title": "What changed in 2026", "segments": ["Market", "Tools", "ROI", "Pitfalls"]},
{"title": "Building with constraints", "segments": ["Budget", "Stack", "Team", "Execution"]},
],
"title_suggestions": ["AI Agents in 2026", "Ship Faster with AI", "Startup AI Playbook"],
"research_queries": [
{"query": "AI agent adoption data 2026 startups", "rationale": "quantify adoption"},
{"query": "founder interviews AI automation ROI", "rationale": "real examples"},
],
"exa_suggested_config": {
"exa_search_type": "auto",
"max_sources": 6,
"include_statistics": True,
},
}
async def _mock_exa_search(*args: Any, **kwargs: Any) -> dict[str, Any]:
return {
"provider": "exa",
"search_type": "neural",
"search_queries": ["AI agent adoption data 2026 startups"],
"sources": [
{
"title": "Agentic AI trends",
"url": "https://example.com/agentic-ai-trends",
"excerpt": "Adoption rose notably among SMB teams.",
"index": 1,
}
],
"content": "Key Highlights: Adoption increased and ROI became more measurable.",
"cost": {"total": 0.015},
}
def _mock_animate_scene_with_voiceover(*args: Any, **kwargs: Any) -> dict[str, Any]:
return {
"video_bytes": b"\x00\x00\x00\x18ftypmp42" + b"\x00" * 1024,
"provider": "wavespeed",
"model_name": "wavespeed-ai/infinitetalk",
"prompt": "Animate presenter speaking clearly.",
"cost": 0.09,
"duration": 8.0,
}
analysis_handler.llm_text_gen = _mock_llm_text_gen
research_handler.llm_text_gen = _mock_llm_text_gen
research_handler.ExaResearchProvider.search = _mock_exa_search
video_handler.animate_scene_with_voiceover = _mock_animate_scene_with_voiceover
def _post_json(client: TestClient, path: str, payload: dict[str, Any]) -> dict[str, Any]:
res = client.post(path, json=payload, headers=AUTH_HEADERS)
res.raise_for_status()
return res.json()
def _get_json(client: TestClient, path: str) -> dict[str, Any]:
res = client.get(path, headers=AUTH_HEADERS)
res.raise_for_status()
return res.json()
def _provider_cost_totals(logs_payload: dict[str, Any]) -> dict[str, float]:
totals: dict[str, float] = {}
for row in logs_payload.get("logs", []):
provider = (row.get("provider") or "unknown").lower()
totals[provider] = totals.get(provider, 0.0) + float(row.get("cost_total") or 0.0)
return totals
def _record_usage(user_id: str, provider: APIProvider, endpoint: str, model: str, tokens_in: int = 0, tokens_out: int = 0) -> None:
db = get_session_for_user(user_id)
if not db:
return
try:
service = UsageTrackingService(db)
asyncio.run(
service.track_api_usage(
user_id=user_id,
provider=provider,
endpoint=endpoint,
method="POST",
model_used=model,
tokens_input=tokens_in,
tokens_output=tokens_out,
response_time=0.42,
status_code=200,
)
)
finally:
db.close()
def main() -> None:
_patch_external_calls()
audio_url, avatar_image_path = _ensure_test_media_files(USER_ID)
app = FastAPI()
app.include_router(subscription_router)
app.include_router(podcast_router)
with TestClient(app) as client:
# Baseline billing snapshots
baseline_dashboard = _get_json(client, f"/api/subscription/dashboard/{USER_ID}?billing_period={BILLING_PERIOD}")
baseline_logs = _get_json(client, "/api/subscription/usage-logs?limit=500")
before_cost = float(baseline_dashboard["data"]["summary"]["total_cost_this_month"])
before_calls = int(baseline_dashboard["data"]["summary"]["total_api_calls_this_month"])
before_projection = float(baseline_dashboard["data"]["projections"]["projected_monthly_cost"])
before_provider_costs = _provider_cost_totals(baseline_logs)
# 1) Preflight for podcast analysis + video
preflight_payload = {
"operations": [
{
"provider": "huggingface",
"operation_type": "podcast_analysis",
"tokens_requested": 1200,
"model": "meta-llama/llama-3.3-70b-instruct",
},
{
"provider": "video",
"operation_type": "scene_animation",
"tokens_requested": 0,
"model": "wavespeed-ai/infinitetalk",
"actual_provider_name": "wavespeed",
},
]
}
preflight = _post_json(client, "/api/subscription/preflight-check", preflight_payload)
# 2a) Podcast analysis
analysis = _post_json(
client,
"/api/podcast/analyze",
{
"idea": "How AI agents are changing founder workflows",
"duration": 8,
"speakers": 1,
# Keep avatar to skip image generation call in this sequence
"avatar_url": "/api/podcast/images/avatars/already_present.png",
},
)
_record_usage(
user_id=USER_ID,
provider=APIProvider.MISTRAL,
endpoint="/api/podcast/analyze",
model="meta-llama/llama-3.3-70b-instruct",
tokens_in=1200,
tokens_out=600,
)
# 2b) Podcast research
research = _post_json(
client,
"/api/podcast/research/exa",
{
"topic": "AI agent adoption in startups",
"queries": ["AI agent adoption data 2026 startups"],
"analysis": {"audience": analysis.get("audience", "general")},
},
)
_record_usage(
user_id=USER_ID,
provider=APIProvider.EXA,
endpoint="/api/podcast/research/exa",
model="exa-search",
tokens_in=0,
tokens_out=0,
)
# 2c) At least one video render
video_start = _post_json(
client,
"/api/podcast/render/video",
{
"project_id": "sequence-project-001",
"scene_id": "scene_1",
"scene_title": "Intro",
"audio_url": audio_url,
"avatar_image_url": avatar_image_path,
"resolution": "720p",
},
)
# Fetch task status once (background task should be done quickly with mocks)
task_id = video_start["task_id"]
task_status = _get_json(client, f"/api/podcast/task/{task_id}/status")
_record_usage(
user_id=USER_ID,
provider=APIProvider.VIDEO,
endpoint="/api/podcast/render/video",
model="wavespeed-ai/infinitetalk",
tokens_in=0,
tokens_out=0,
)
# 3) Verify usage logs/dashboard deltas
after_dashboard = _get_json(client, f"/api/subscription/dashboard/{USER_ID}?billing_period={BILLING_PERIOD}")
after_logs = _get_json(client, "/api/subscription/usage-logs?limit=500")
after_cost = float(after_dashboard["data"]["summary"]["total_cost_this_month"])
after_calls = int(after_dashboard["data"]["summary"]["total_api_calls_this_month"])
after_projection = float(after_dashboard["data"]["projections"]["projected_monthly_cost"])
after_provider_costs = _provider_cost_totals(after_logs)
delta_cost = round(after_cost - before_cost, 4)
delta_calls = after_calls - before_calls
delta_projection = round(after_projection - before_projection, 4)
# Provider deltas (focus on providers touched in sequence)
provider_deltas = {
key: round(after_provider_costs.get(key, 0.0) - before_provider_costs.get(key, 0.0), 4)
for key in sorted(set(before_provider_costs) | set(after_provider_costs))
if key in {"exa", "huggingface", "wavespeed", "video", "mistral"}
}
expected_positive_cost = delta_cost > 0
expected_positive_calls = delta_calls >= 3 # analysis + research + video
expected_projection_change = delta_projection > 0
expected_provider_delta = any(v > 0 for v in provider_deltas.values())
acceptance_passed = all(
[
preflight.get("success") is True,
expected_positive_cost,
expected_positive_calls,
expected_projection_change,
expected_provider_delta,
]
)
report = {
"preflight": {
"success": preflight.get("success"),
"can_proceed": preflight.get("data", {}).get("can_proceed"),
"estimated_cost": preflight.get("data", {}).get("estimated_cost"),
},
"operations": {
"analysis_title_suggestions": analysis.get("title_suggestions", []),
"research_provider": research.get("provider"),
"research_cost": (research.get("cost") or {}).get("total"),
"video_task_status": task_status.get("status"),
},
"dashboard_deltas": {
"total_calls_before": before_calls,
"total_calls_after": after_calls,
"delta_calls": delta_calls,
"total_cost_before": before_cost,
"total_cost_after": after_cost,
"delta_cost": delta_cost,
"projected_monthly_cost_before": before_projection,
"projected_monthly_cost_after": after_projection,
"delta_projected_monthly_cost": delta_projection,
},
"provider_cost_deltas": provider_deltas,
"acceptance": {
"passed": acceptance_passed,
"criteria": {
"preflight_success": preflight.get("success") is True,
"usage_cost_incremented": expected_positive_cost,
"usage_call_incremented": expected_positive_calls,
"projection_incremented": expected_projection_change,
"provider_delta_present": expected_provider_delta,
},
},
}
out_dir = Path("artifacts")
out_dir.mkdir(exist_ok=True)
out_file = out_dir / "podcast_billing_sequence_report.json"
out_file.write_text(json.dumps(report, indent=2), encoding="utf-8")
print(json.dumps(report, indent=2))
print(f"\nSaved report: {out_file}")
if not acceptance_passed:
raise SystemExit(1)
if __name__ == "__main__":
main()

View File

@@ -410,8 +410,7 @@ class ContentGenerator:
raise Exception("Gemini Grounded Provider not available - cannot generate content without AI provider")
# Build the prompt for grounded generation using persona if available (DB vs session override)
# Beta testing: Force user_id=1 for all requests
user_id = 1
user_id = int(getattr(request, "user_id", 0) or 0)
persona_data = self._get_cached_persona_data(user_id, 'linkedin')
if getattr(request, 'persona_override', None):
try:
@@ -485,8 +484,7 @@ class ContentGenerator:
raise Exception("Gemini Grounded Provider not available - cannot generate content without AI provider")
# Build the prompt for grounded generation using persona if available (DB vs session override)
# Beta testing: Force user_id=1 for all requests
user_id = 1
user_id = int(getattr(request, "user_id", 0) or 0)
persona_data = self._get_cached_persona_data(user_id, 'linkedin')
if getattr(request, 'persona_override', None):
try:

View File

@@ -23,6 +23,11 @@ class MonitoringDataService:
def __init__(self, db_session: Session):
self.db = db_session
def _resolve_strategy_user_id(self, strategy_id: int) -> str:
strategy = self.db.query(EnhancedContentStrategy).filter(EnhancedContentStrategy.id == strategy_id).first()
return str(getattr(strategy, "user_id", "0") or "0")
async def save_monitoring_data(self, strategy_id: int, monitoring_plan: Dict[str, Any]) -> bool:
"""Save monitoring plan and tasks to database."""
try:
@@ -65,19 +70,22 @@ class MonitoringDataService:
self.db.add(task)
strategy_user_id = self._resolve_strategy_user_id(strategy_id)
# Save activation status
activation_status = StrategyActivationStatus(
strategy_id=strategy_id,
user_id=1, # Default user ID
user_id=strategy_user_id,
activation_date=datetime.utcnow(),
status='active'
)
self.db.add(activation_status)
# Save initial performance metrics
strategy_user_id = self._resolve_strategy_user_id(strategy_id)
performance_metrics = StrategyPerformanceMetrics(
strategy_id=strategy_id,
user_id=1, # Default user ID
user_id=strategy_user_id,
metric_date=datetime.utcnow(),
data_source='monitoring_plan',
confidence_score=85 # High confidence for monitoring plan data
@@ -341,10 +349,11 @@ class MonitoringDataService:
"""Update performance metrics for a strategy."""
try:
logger.info(f"Updating performance metrics for strategy {strategy_id}")
strategy_user_id = self._resolve_strategy_user_id(strategy_id)
performance_metrics = StrategyPerformanceMetrics(
strategy_id=strategy_id,
user_id=1, # Default user ID
user_id=strategy_user_id,
metric_date=datetime.utcnow(),
traffic_growth_percentage=metrics.get('traffic_growth'),
engagement_rate_percentage=metrics.get('engagement_rate'),

View File

@@ -1,3 +1,4 @@
import os
from typing import Dict, Any, List, Optional
from sqlalchemy.orm import Session
from loguru import logger
@@ -21,7 +22,7 @@ class StrategyCopilotService:
"""Generate data for a specific category."""
try:
# Get user onboarding data
user_id = 1 # TODO: Get from auth context
user_id = int(os.getenv("ALWRITY_FALLBACK_USER_ID", "0"))
integrated_data = await self.onboarding_integration_service.process_onboarding_data(str(user_id), self.db)
onboarding_data = integrated_data.get('canonical_profile', {})
@@ -81,7 +82,7 @@ class StrategyCopilotService:
"""Analyze complete strategy for completeness and coherence."""
try:
# Get user data for context
user_id = 1 # TODO: Get from auth context
user_id = int(os.getenv("ALWRITY_FALLBACK_USER_ID", "0"))
integrated_data = await self.onboarding_integration_service.process_onboarding_data(str(user_id), self.db)
onboarding_data = integrated_data.get('canonical_profile', {})
@@ -118,7 +119,7 @@ class StrategyCopilotService:
field_definition = self._get_field_definition(field_id)
# Get user data
user_id = 1 # TODO: Get from auth context
user_id = int(os.getenv("ALWRITY_FALLBACK_USER_ID", "0"))
# Use SSOT
integrated_data = await self.onboarding_integration_service.process_onboarding_data(str(user_id), self.db)
onboarding_data = integrated_data.get('canonical_profile', {})

View File

@@ -120,6 +120,15 @@ class SIFReleaseReadinessTests(unittest.IsolatedAsyncioTestCase):
self.assertFalse(validation["is_contextual"])
self.assertEqual(validation["tasks_below_min_evidence"], 1)
def test_demo_release_flag_guards_sensitive_routers(self):
source = Path("backend/alwrity_utils/router_manager.py").read_text()
self.assertIn("ALWRITY_DEMO_RELEASE", source)
self.assertIn("Skipping facebook_writer router in demo-release mode", source)
self.assertIn("Skipping linkedin router in demo-release mode", source)
self.assertIn("Skipping linkedin_image router in demo-release mode", source)
self.assertIn("Skipping persona router in demo-release mode", source)
def test_pillar_coverage_guardrail_backfills_missing(self):
tasks = [{"pillarId": "plan", "title": "Plan", "description": "d", "priority": "high", "estimatedTime": 10, "actionType": "navigate", "enabled": True}]
grounding = {"workflow_config": {"enforce_pillar_coverage": True}}