Compare commits

..

1 Commits

Author SHA1 Message Date
ي
6a182aecaf Support multi-source content asset filtering end-to-end 2026-05-18 14:36:16 +05:30
7 changed files with 158 additions and 285 deletions

View File

@@ -5,7 +5,7 @@ API endpoints for managing unified content assets across all modules.
from fastapi import APIRouter, Depends, HTTPException, Query, Body
from sqlalchemy.orm import Session
from typing import List, Optional, Dict, Any
from typing import List, Optional, Dict, Any, Set
from pydantic import BaseModel, Field
from datetime import datetime
@@ -47,6 +47,33 @@ class AssetResponse(BaseModel):
from_attributes = True
def _parse_source_modules(source_module: Optional[List[str]]) -> Optional[List[AssetSource]]:
"""Parse source_module query values from repeated params and/or comma-separated values."""
if not source_module:
return None
parsed_values: List[AssetSource] = []
seen: Set[AssetSource] = set()
for raw_value in source_module:
for value in raw_value.split(","):
normalized = value.strip().lower()
if not normalized:
continue
try:
module = AssetSource(normalized)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid source module: {value.strip()}")
if module not in seen:
seen.add(module)
parsed_values.append(module)
return parsed_values or None
class AssetListResponse(BaseModel):
"""Response model for asset list."""
assets: List[AssetResponse]
@@ -58,7 +85,7 @@ class AssetListResponse(BaseModel):
@router.get("/", response_model=AssetListResponse)
async def get_assets(
asset_type: Optional[str] = Query(None, description="Filter by asset type"),
source_module: Optional[str] = Query(None, description="Filter by source module"),
source_module: Optional[List[str]] = Query(None, description="Filter by source module(s); supports repeated params and comma-separated values"),
search: Optional[str] = Query(None, description="Search query"),
tags: Optional[str] = Query(None, description="Comma-separated tags"),
favorites_only: bool = Query(False, description="Only favorites"),
@@ -89,12 +116,7 @@ async def get_assets(
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid asset type: {asset_type}")
source_module_enum = None
if source_module:
try:
source_module_enum = AssetSource(source_module.lower())
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid source module: {source_module}")
source_modules_enum = _parse_source_modules(source_module)
tags_list = None
if tags:
@@ -126,7 +148,7 @@ async def get_assets(
assets, total = service.get_user_assets(
user_id=user_id,
asset_type=asset_type_enum,
source_module=source_module_enum,
source_modules=source_modules_enum,
search_query=search,
tags=tags_list,
favorites_only=favorites_only,
@@ -200,7 +222,7 @@ async def create_asset(
asset = service.create_asset(
user_id=user_id,
asset_type=asset_type_enum,
source_module=source_module_enum,
source_modules=source_modules_enum,
filename=asset_data.filename,
file_url=asset_data.file_url,
file_path=asset_data.file_path,

View File

@@ -107,6 +107,7 @@ class ContentAssetService:
user_id: str,
asset_type: Optional[AssetType] = None,
source_module: Optional[AssetSource] = None,
source_modules: Optional[List[AssetSource]] = None,
search_query: Optional[str] = None,
tags: Optional[List[str]] = None,
favorites_only: bool = False,
@@ -125,6 +126,7 @@ class ContentAssetService:
user_id: Clerk user ID
asset_type: Filter by asset type (optional)
source_module: Filter by source module (optional)
source_modules: Filter by multiple source modules (optional)
search_query: Search in title, description, prompt (optional)
tags: Filter by tags (optional)
favorites_only: Only return favorites (optional)
@@ -142,7 +144,9 @@ class ContentAssetService:
if asset_type:
query = query.filter(ContentAsset.asset_type == asset_type)
if source_module:
if source_modules:
query = query.filter(ContentAsset.source_module.in_(source_modules))
elif source_module:
query = query.filter(ContentAsset.source_module == source_module)
if favorites_only:

View File

@@ -1,271 +0,0 @@
"""Self-healing executor for social post engagement recovery.
Implements:
- Per-post evaluation windows and cooldown timers
- Stagnation trigger evaluation with tiered action selection
- Action idempotency keys for edit/comment/thread operations
- Duplicate and over-frequency suppression within cooldown boundaries
- Outcome persistence and safe retry policy for transient failures
"""
from __future__ import annotations
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta, timezone
from enum import Enum
import hashlib
import json
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
class ActionType(str, Enum):
EDIT = "edit"
COMMENT = "comment"
THREAD = "thread"
class ActionTier(str, Enum):
TIER_1 = "tier_1" # low-intensity nudge (comment)
TIER_2 = "tier_2" # medium-intensity enhancement (edit)
TIER_3 = "tier_3" # high-intensity amplification (thread)
SAFE_TRANSIENT_ERROR_CODES = {
"timeout",
"rate_limit",
"service_unavailable",
"network_error",
}
@dataclass
class EvaluationConfig:
per_post_window_minutes: int = 90
min_samples_required: int = 3
cooldown_by_action_seconds: Dict[ActionType, int] = field(
default_factory=lambda: {
ActionType.COMMENT: 30 * 60,
ActionType.EDIT: 2 * 60 * 60,
ActionType.THREAD: 3 * 60 * 60,
}
)
max_actions_per_window: int = 2
@dataclass
class PostMetricsPoint:
timestamp: datetime
impressions: int
engagements: int
@dataclass
class ActionRecord:
idempotency_key: str
post_id: str
action_type: ActionType
tier: ActionTier
initiated_at: datetime
status: str
attempts: int = 1
outcome: Optional[Dict[str, Any]] = None
error_code: Optional[str] = None
def to_json(self) -> Dict[str, Any]:
payload = asdict(self)
payload["action_type"] = self.action_type.value
payload["tier"] = self.tier.value
payload["initiated_at"] = self.initiated_at.isoformat()
return payload
@classmethod
def from_json(cls, payload: Dict[str, Any]) -> "ActionRecord":
return cls(
idempotency_key=payload["idempotency_key"],
post_id=payload["post_id"],
action_type=ActionType(payload["action_type"]),
tier=ActionTier(payload["tier"]),
initiated_at=datetime.fromisoformat(payload["initiated_at"]),
status=payload["status"],
attempts=payload.get("attempts", 1),
outcome=payload.get("outcome"),
error_code=payload.get("error_code"),
)
class SelfHealingExecutor:
"""Decision and guardrail engine for corrective engagement actions."""
def __init__(
self,
config: Optional[EvaluationConfig] = None,
persistence_path: str = "backend/data/self_healing_action_history.json",
) -> None:
self.config = config or EvaluationConfig()
self.persistence_path = Path(persistence_path)
self._history: List[ActionRecord] = self._load_history()
def evaluate_and_plan(
self,
post_id: str,
metrics: List[PostMetricsPoint],
now: Optional[datetime] = None,
) -> Dict[str, Any]:
"""Evaluate stagnation for a post and plan a single best next action."""
now = now or datetime.now(timezone.utc)
window_metrics = self._filter_window(metrics, now)
if len(window_metrics) < self.config.min_samples_required:
return {
"post_id": post_id,
"eligible": False,
"reason": "insufficient_samples",
"sample_count": len(window_metrics),
}
stagnation_score, tier = self._evaluate_stagnation(window_metrics)
action_type = self._choose_action_type(tier)
idempotency_key = self.generate_idempotency_key(post_id, action_type, tier)
if self._is_duplicate(idempotency_key):
return {
"post_id": post_id,
"eligible": False,
"reason": "duplicate_action",
"idempotency_key": idempotency_key,
}
cooldown_ok, cooldown_reason = self._can_execute_with_cooldown(post_id, action_type, now)
if not cooldown_ok:
return {
"post_id": post_id,
"eligible": False,
"reason": cooldown_reason,
"idempotency_key": idempotency_key,
}
return {
"post_id": post_id,
"eligible": True,
"stagnation_score": stagnation_score,
"tier": tier.value,
"action_type": action_type.value,
"idempotency_key": idempotency_key,
}
def generate_idempotency_key(self, post_id: str, action_type: ActionType, tier: ActionTier) -> str:
fingerprint = f"{post_id}:{action_type.value}:{tier.value}".encode("utf-8")
digest = hashlib.sha256(fingerprint).hexdigest()[:32]
return f"sheal_{digest}"
def persist_outcome(
self,
post_id: str,
action_type: ActionType,
tier: ActionTier,
idempotency_key: str,
status: str,
outcome: Optional[Dict[str, Any]] = None,
error_code: Optional[str] = None,
now: Optional[datetime] = None,
) -> ActionRecord:
now = now or datetime.now(timezone.utc)
existing = next((h for h in self._history if h.idempotency_key == idempotency_key), None)
if existing:
existing.status = status
existing.outcome = outcome
existing.error_code = error_code
existing.attempts += 1
existing.initiated_at = now
record = existing
else:
record = ActionRecord(
idempotency_key=idempotency_key,
post_id=post_id,
action_type=action_type,
tier=tier,
initiated_at=now,
status=status,
outcome=outcome,
error_code=error_code,
)
self._history.append(record)
self._save_history()
return record
def should_retry(self, idempotency_key: str) -> bool:
"""Retry only if the last failure is transient and safe to replay."""
rec = next((h for h in self._history if h.idempotency_key == idempotency_key), None)
if not rec or rec.status != "failed":
return False
if rec.error_code not in SAFE_TRANSIENT_ERROR_CODES:
return False
return rec.action_type in {ActionType.COMMENT, ActionType.EDIT, ActionType.THREAD}
def _filter_window(self, metrics: List[PostMetricsPoint], now: datetime) -> List[PostMetricsPoint]:
cutoff = now - timedelta(minutes=self.config.per_post_window_minutes)
return [m for m in metrics if m.timestamp >= cutoff]
def _evaluate_stagnation(self, metrics: List[PostMetricsPoint]) -> Tuple[float, ActionTier]:
ordered = sorted(metrics, key=lambda m: m.timestamp)
first, last = ordered[0], ordered[-1]
imp_delta = max(0, last.impressions - first.impressions)
eng_delta = max(0, last.engagements - first.engagements)
eng_rate = eng_delta / imp_delta if imp_delta > 0 else 0.0
stagnation_score = 1.0 - min(1.0, eng_rate * 20)
if stagnation_score >= 0.8:
return stagnation_score, ActionTier.TIER_3
if stagnation_score >= 0.55:
return stagnation_score, ActionTier.TIER_2
return stagnation_score, ActionTier.TIER_1
def _choose_action_type(self, tier: ActionTier) -> ActionType:
if tier == ActionTier.TIER_1:
return ActionType.COMMENT
if tier == ActionTier.TIER_2:
return ActionType.EDIT
return ActionType.THREAD
def _is_duplicate(self, idempotency_key: str) -> bool:
return any(h.idempotency_key == idempotency_key and h.status in {"success", "running"} for h in self._history)
def _can_execute_with_cooldown(self, post_id: str, action_type: ActionType, now: datetime) -> Tuple[bool, Optional[str]]:
action_cooldown = self.config.cooldown_by_action_seconds[action_type]
same_post = [h for h in self._history if h.post_id == post_id]
recent_in_window = [
h for h in same_post
if h.initiated_at >= now - timedelta(minutes=self.config.per_post_window_minutes)
]
if len(recent_in_window) >= self.config.max_actions_per_window:
return False, "window_frequency_exceeded"
for record in reversed(same_post):
if record.action_type != action_type:
continue
if (now - record.initiated_at).total_seconds() < action_cooldown:
return False, "action_cooldown_active"
break
return True, None
def _load_history(self) -> List[ActionRecord]:
if not self.persistence_path.exists():
return []
try:
payload = json.loads(self.persistence_path.read_text(encoding="utf-8"))
return [ActionRecord.from_json(item) for item in payload]
except (json.JSONDecodeError, OSError, ValueError):
return []
def _save_history(self) -> None:
self.persistence_path.parent.mkdir(parents=True, exist_ok=True)
payload = [item.to_json() for item in self._history]
self.persistence_path.write_text(json.dumps(payload, indent=2), encoding="utf-8")

View File

@@ -0,0 +1,31 @@
import importlib.util
from pathlib import Path
from fastapi import HTTPException
ROOT = Path(__file__).resolve().parents[3]
ROUTER_PATH = ROOT / 'backend' / 'api' / 'content_assets' / 'router.py'
MODELS_PATH = ROOT / 'backend' / 'models' / 'content_asset_models.py'
models_spec = importlib.util.spec_from_file_location('content_asset_models', MODELS_PATH)
models = importlib.util.module_from_spec(models_spec)
models_spec.loader.exec_module(models)
AssetSource = models.AssetSource
router_spec = importlib.util.spec_from_file_location('content_assets_router', ROUTER_PATH)
router = importlib.util.module_from_spec(router_spec)
router_spec.loader.exec_module(router)
def test_parse_source_modules_supports_repeated_and_csv_values():
parsed = router._parse_source_modules(["blog_writer", "youtube,podcast"])
assert parsed == [AssetSource.BLOG_WRITER, AssetSource.YOUTUBE, AssetSource.PODCAST]
def test_parse_source_modules_raises_for_invalid_values():
try:
router._parse_source_modules(["blog_writer,unknown"])
except HTTPException as exc:
assert exc.status_code == 400
assert "Invalid source module" in exc.detail
else:
raise AssertionError("Expected HTTPException for invalid source module")

View File

@@ -0,0 +1,50 @@
import importlib.util
from pathlib import Path
ROOT = Path(__file__).resolve().parents[3]
SERVICE_PATH = ROOT / 'backend' / 'services' / 'content_asset_service.py'
MODELS_PATH = ROOT / 'backend' / 'models' / 'content_asset_models.py'
models_spec = importlib.util.spec_from_file_location('content_asset_models', MODELS_PATH)
models = importlib.util.module_from_spec(models_spec)
models_spec.loader.exec_module(models)
AssetSource = models.AssetSource
service_spec = importlib.util.spec_from_file_location('content_asset_service', SERVICE_PATH)
service_module = importlib.util.module_from_spec(service_spec)
service_spec.loader.exec_module(service_module)
ContentAssetService = service_module.ContentAssetService
class DummyQuery:
def __init__(self):
self.filters = []
def filter(self, expr):
self.filters.append(expr)
return self
def count(self): return 0
def order_by(self, *_args, **_kwargs): return self
def limit(self, *_args, **_kwargs): return self
def offset(self, *_args, **_kwargs): return self
def all(self): return []
class DummyDB:
def __init__(self): self.query_obj = DummyQuery()
def query(self, *_args, **_kwargs): return self.query_obj
def test_get_user_assets_accepts_multiple_source_modules_filter():
db = DummyDB()
service = ContentAssetService(db)
assets, total = service.get_user_assets(
user_id="user-1",
source_modules=[AssetSource.BLOG_WRITER, AssetSource.YOUTUBE],
)
assert assets == []
assert total == 0
assert len(db.query_obj.filters) >= 2

View File

@@ -0,0 +1,35 @@
import { renderHook, waitFor } from '@testing-library/react';
import { useContentAssets } from '../useContentAssets';
const getTokenMock = jest.fn();
jest.mock('@clerk/clerk-react', () => ({
useAuth: () => ({ getToken: getTokenMock }),
}));
describe('useContentAssets', () => {
beforeEach(() => {
getTokenMock.mockResolvedValue('test-token');
global.fetch = jest.fn().mockResolvedValue({
ok: true,
json: async () => ({ assets: [], total: 0, limit: 100, offset: 0 }),
} as Response);
});
afterEach(() => {
jest.clearAllMocks();
});
it('sends all source_module values as repeated query params', async () => {
renderHook(() =>
useContentAssets({ source_module: ['blog_writer', 'youtube'], limit: 50, offset: 0 })
);
await waitFor(() => expect(global.fetch).toHaveBeenCalled());
const calledUrl = (global.fetch as jest.Mock).mock.calls[0][0] as string;
const params = new URL(calledUrl).searchParams;
expect(params.getAll('source_module')).toEqual(['blog_writer', 'youtube']);
});
});

View File

@@ -29,7 +29,7 @@ export interface ContentAsset {
export interface AssetFilters {
asset_type?: 'text' | 'image' | 'video' | 'audio';
source_module?: string | string[]; // Support single or multiple source modules
source_module?: string | string[]; // Supports single or multiple source modules
search?: string;
tags?: string[];
favorites_only?: boolean;
@@ -146,8 +146,10 @@ export const useContentAssets = (filters: AssetFilters = {}) => {
if (currentFilters.source_module) {
// Handle both string and array cases
if (Array.isArray(currentFilters.source_module)) {
// For arrays, use the first value (backend doesn't support multiple yet)
params.append('source_module', currentFilters.source_module[0]);
// Send every selected source module as repeated query params
currentFilters.source_module.forEach((module) => {
params.append('source_module', module);
});
} else {
params.append('source_module', currentFilters.source_module);
}