Compare commits

..

1 Commits

Author SHA1 Message Date
ي
6a182aecaf Support multi-source content asset filtering end-to-end 2026-05-18 14:36:16 +05:30
8 changed files with 163 additions and 169 deletions

View File

@@ -5,7 +5,7 @@ API endpoints for managing unified content assets across all modules.
from fastapi import APIRouter, Depends, HTTPException, Query, Body
from sqlalchemy.orm import Session
from typing import List, Optional, Dict, Any
from typing import List, Optional, Dict, Any, Set
from pydantic import BaseModel, Field
from datetime import datetime
@@ -47,6 +47,33 @@ class AssetResponse(BaseModel):
from_attributes = True
def _parse_source_modules(source_module: Optional[List[str]]) -> Optional[List[AssetSource]]:
"""Parse source_module query values from repeated params and/or comma-separated values."""
if not source_module:
return None
parsed_values: List[AssetSource] = []
seen: Set[AssetSource] = set()
for raw_value in source_module:
for value in raw_value.split(","):
normalized = value.strip().lower()
if not normalized:
continue
try:
module = AssetSource(normalized)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid source module: {value.strip()}")
if module not in seen:
seen.add(module)
parsed_values.append(module)
return parsed_values or None
class AssetListResponse(BaseModel):
"""Response model for asset list."""
assets: List[AssetResponse]
@@ -58,7 +85,7 @@ class AssetListResponse(BaseModel):
@router.get("/", response_model=AssetListResponse)
async def get_assets(
asset_type: Optional[str] = Query(None, description="Filter by asset type"),
source_module: Optional[str] = Query(None, description="Filter by source module"),
source_module: Optional[List[str]] = Query(None, description="Filter by source module(s); supports repeated params and comma-separated values"),
search: Optional[str] = Query(None, description="Search query"),
tags: Optional[str] = Query(None, description="Comma-separated tags"),
favorites_only: bool = Query(False, description="Only favorites"),
@@ -89,12 +116,7 @@ async def get_assets(
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid asset type: {asset_type}")
source_module_enum = None
if source_module:
try:
source_module_enum = AssetSource(source_module.lower())
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid source module: {source_module}")
source_modules_enum = _parse_source_modules(source_module)
tags_list = None
if tags:
@@ -126,7 +148,7 @@ async def get_assets(
assets, total = service.get_user_assets(
user_id=user_id,
asset_type=asset_type_enum,
source_module=source_module_enum,
source_modules=source_modules_enum,
search_query=search,
tags=tags_list,
favorites_only=favorites_only,
@@ -200,7 +222,7 @@ async def create_asset(
asset = service.create_asset(
user_id=user_id,
asset_type=asset_type_enum,
source_module=source_module_enum,
source_modules=source_modules_enum,
filename=asset_data.filename,
file_url=asset_data.file_url,
file_path=asset_data.file_path,

View File

@@ -107,6 +107,7 @@ class ContentAssetService:
user_id: str,
asset_type: Optional[AssetType] = None,
source_module: Optional[AssetSource] = None,
source_modules: Optional[List[AssetSource]] = None,
search_query: Optional[str] = None,
tags: Optional[List[str]] = None,
favorites_only: bool = False,
@@ -125,6 +126,7 @@ class ContentAssetService:
user_id: Clerk user ID
asset_type: Filter by asset type (optional)
source_module: Filter by source module (optional)
source_modules: Filter by multiple source modules (optional)
search_query: Search in title, description, prompt (optional)
tags: Filter by tags (optional)
favorites_only: Only return favorites (optional)
@@ -142,7 +144,9 @@ class ContentAssetService:
if asset_type:
query = query.filter(ContentAsset.asset_type == asset_type)
if source_module:
if source_modules:
query = query.filter(ContentAsset.source_module.in_(source_modules))
elif source_module:
query = query.filter(ContentAsset.source_module == source_module)
if favorites_only:

View File

@@ -99,17 +99,6 @@ class OptimizationRecommendation:
expires = datetime.utcnow().timestamp() + (7 * 24 * 60 * 60)
self.expires_at = datetime.fromtimestamp(expires).isoformat()
@dataclass
class TierPolicyConfig:
"""Structured policy for anomaly tiers and remediation controls"""
tier: int
trigger_metrics: List[str]
thresholds: Dict[str, float]
max_iterations: int
lock_criteria: Dict[str, Any]
class AgentPerformanceMonitor:
"""Main performance monitoring system for agents"""
@@ -119,32 +108,6 @@ class AgentPerformanceMonitor:
self.agent_snapshots: Dict[str, AgentPerformanceSnapshot] = {}
self.recommendations: List[OptimizationRecommendation] = []
self.performance_history: deque = deque(maxlen=1000) # Keep last 1000 data points
self.systemic_alerts: List[Dict[str, Any]] = []
# Structured tier policy config
self.tier_policy_config: Dict[int, TierPolicyConfig] = {
1: TierPolicyConfig(
tier=1,
trigger_metrics=["success_rate", "efficiency_score", "response_time"],
thresholds={"success_rate": 0.80, "efficiency_score": 0.65, "response_time": 45.0},
max_iterations=3,
lock_criteria={"min_confidence": 0.85, "consecutive_failures": 6}
),
2: TierPolicyConfig(
tier=2,
trigger_metrics=["success_rate", "efficiency_score", "response_time", "market_impact"],
thresholds={"success_rate": 0.70, "efficiency_score": 0.50, "response_time": 60.0, "market_impact": 0.35},
max_iterations=2,
lock_criteria={"min_confidence": 0.75, "consecutive_failures": 4}
),
3: TierPolicyConfig(
tier=3,
trigger_metrics=["success_rate", "efficiency_score", "response_time", "market_impact"],
thresholds={"success_rate": 0.55, "efficiency_score": 0.35, "response_time": 90.0, "market_impact": 0.25},
max_iterations=1,
lock_criteria={"min_confidence": 0.65, "consecutive_failures": 3}
)
}
# Performance thresholds and targets
self.performance_targets = {
@@ -550,54 +513,6 @@ class AgentPerformanceMonitor:
}
return priority_weights.get(priority, 0)
def _build_recommended_action_payload(self, agent_id: str, snapshot: AgentPerformanceSnapshot) -> Dict[str, Any]:
"""Build recommended action payload including tier and confidence."""
tier = 1
if (snapshot.success_rate <= self.tier_policy_config[3].thresholds["success_rate"] or
snapshot.efficiency_score <= self.tier_policy_config[3].thresholds["efficiency_score"] or
snapshot.average_response_time >= self.tier_policy_config[3].thresholds["response_time"] or
snapshot.market_impact_score <= self.tier_policy_config[3].thresholds["market_impact"]):
tier = 3
elif (snapshot.success_rate <= self.tier_policy_config[2].thresholds["success_rate"] or
snapshot.efficiency_score <= self.tier_policy_config[2].thresholds["efficiency_score"] or
snapshot.average_response_time >= self.tier_policy_config[2].thresholds["response_time"] or
snapshot.market_impact_score <= self.tier_policy_config[2].thresholds["market_impact"]):
tier = 2
confidence = round(max(0.0, min(1.0, 1.0 - abs(0.75 - self._calculate_health_score(snapshot)))) , 2)
policy = self.tier_policy_config[tier]
return {
"agent_id": agent_id,
"tier": tier,
"confidence": confidence,
"max_iterations": policy.max_iterations,
"lock_criteria": policy.lock_criteria,
"trigger_metrics": policy.trigger_metrics
}
def _route_tier3_systemic_alert(self, action_payload: Dict[str, Any], alerts: List[Dict[str, Any]]) -> None:
"""Route Tier 3 systemic anomalies to alerting subsystem with diagnostic brief."""
diagnostic_brief = {
"type": "systemic_anomaly",
"severity": "critical",
"tier": 3,
"confidence": action_payload.get("confidence", 0.0),
"agent_id": action_payload.get("agent_id"),
"timestamp": datetime.utcnow().isoformat(),
"diagnostic_brief": {
"trigger_metrics": action_payload.get("trigger_metrics", []),
"alerts": alerts,
"max_iterations": action_payload.get("max_iterations"),
"lock_criteria": action_payload.get("lock_criteria", {})
}
}
self.systemic_alerts.append(diagnostic_brief)
if len(self.systemic_alerts) > 200:
self.systemic_alerts = self.systemic_alerts[-200:]
logger.critical(f"[ALERTING_SUBSYSTEM] Tier 3 systemic anomaly routed: {json.dumps(diagnostic_brief)}")
async def get_performance_alerts(self, agent_id: str) -> List[Dict[str, Any]]:
"""Get performance alerts for an agent"""
alerts = []
@@ -659,13 +574,6 @@ class AgentPerformanceMonitor:
"timestamp": datetime.utcnow().isoformat()
})
action_payload = self._build_recommended_action_payload(agent_id, snapshot)
if action_payload["tier"] == 3:
self._route_tier3_systemic_alert(action_payload, alerts)
for alert in alerts:
alert["recommended_action"] = action_payload
return alerts
except Exception as e:

View File

@@ -84,17 +84,6 @@ class SafetyValidation:
if self.validation_timestamp is None:
self.validation_timestamp = datetime.utcnow().isoformat()
@dataclass
class SafetyArbitrationDecision:
"""Explicit allow/deny/lock decision with reasons."""
decision: str
reasons: List[str]
tier: int
confidence: float
lock_state_active: bool
class SafetyConstraintManager:
"""Manages safety constraints for agent actions"""
@@ -103,8 +92,6 @@ class SafetyConstraintManager:
self.constraints: Dict[str, SafetyConstraint] = {}
self.action_history: List[Dict[str, Any]] = []
self.violation_history: List[Dict[str, Any]] = []
self.lock_state_active: bool = False
self.lock_state_reason: Optional[str] = None
# Initialize default constraints
self._initialize_default_constraints()
@@ -176,17 +163,6 @@ class SafetyConstraintManager:
"""Validate an action against safety constraints"""
try:
logger.info(f"Validating action for user {self.user_id}: {action_data.get('action_type', 'unknown')}")
if self.lock_state_active and action_data.get("autonomous_modification", True):
reason = self.lock_state_reason or "Safety lock is active due to Tier 3 systemic anomaly"
return SafetyValidation(
is_valid=False,
risk_level=RiskLevel.CRITICAL,
violations=["Autonomous modifications blocked while lock state is active"],
recommendations=[reason],
requires_approval=True,
confidence_score=1.0
)
violations = []
recommendations = []
@@ -231,29 +207,19 @@ class SafetyConstraintManager:
# Final validation
is_valid = len(violations) == 0 and not requires_approval
confidence_score = max(0.0, min(1.0, confidence_score))
arbitration = self._arbitrate_decision(action_data, risk_level, violations, requires_approval, confidence_score)
if arbitration.decision == "lock":
self.lock_state_active = True
self.lock_state_reason = "; ".join(arbitration.reasons)
is_valid = False
requires_approval = True
recommendations.extend([f"Arbitration decision: {arbitration.decision}", *arbitration.reasons])
logger.info(f"Action validation completed for user {self.user_id}. Decision: {arbitration.decision}, Valid: {is_valid}, Risk: {risk_level.value}, Violations: {len(violations)}")
logger.info(f"Action validation completed for user {self.user_id}. Valid: {is_valid}, Risk: {risk_level.value}, Violations: {len(violations)}")
# Record in history
await self._record_validation_history(action_data, is_valid, violations)
return SafetyValidation(
is_valid=is_valid,
risk_level=risk_level,
violations=violations,
recommendations=recommendations,
requires_approval=requires_approval,
confidence_score=confidence_score
confidence_score=max(0.0, min(1.0, confidence_score))
)
except Exception as e:
@@ -269,30 +235,6 @@ class SafetyConstraintManager:
confidence_score=0.0
)
def _arbitrate_decision(self, action_data: Dict[str, Any], risk_level: RiskLevel, violations: List[str], requires_approval: bool, confidence_score: float) -> SafetyArbitrationDecision:
"""Arbitrate allow/deny/lock with explicit reasons."""
reasons: List[str] = []
tier = int(action_data.get("recommended_tier", 1))
if self.lock_state_active:
reasons.append("Existing lock state is active")
return SafetyArbitrationDecision("lock", reasons, tier, confidence_score, True)
if tier >= 3 or risk_level == RiskLevel.CRITICAL:
reasons.append("Tier 3 systemic anomaly or critical risk detected")
if violations:
reasons.extend(violations)
return SafetyArbitrationDecision("lock", reasons, 3, confidence_score, True)
if violations or requires_approval:
reasons.append("Safety policy violation or approval requirement triggered")
reasons.extend(violations)
return SafetyArbitrationDecision("deny", reasons, tier, confidence_score, False)
reasons.append("No policy violations detected")
return SafetyArbitrationDecision("allow", reasons, tier, confidence_score, False)
def _determine_action_category(self, action_type: str) -> ActionCategory:
"""Determine the category of an action"""
action_type_lower = action_type.lower()

View File

@@ -0,0 +1,31 @@
import importlib.util
from pathlib import Path
from fastapi import HTTPException
ROOT = Path(__file__).resolve().parents[3]
ROUTER_PATH = ROOT / 'backend' / 'api' / 'content_assets' / 'router.py'
MODELS_PATH = ROOT / 'backend' / 'models' / 'content_asset_models.py'
models_spec = importlib.util.spec_from_file_location('content_asset_models', MODELS_PATH)
models = importlib.util.module_from_spec(models_spec)
models_spec.loader.exec_module(models)
AssetSource = models.AssetSource
router_spec = importlib.util.spec_from_file_location('content_assets_router', ROUTER_PATH)
router = importlib.util.module_from_spec(router_spec)
router_spec.loader.exec_module(router)
def test_parse_source_modules_supports_repeated_and_csv_values():
parsed = router._parse_source_modules(["blog_writer", "youtube,podcast"])
assert parsed == [AssetSource.BLOG_WRITER, AssetSource.YOUTUBE, AssetSource.PODCAST]
def test_parse_source_modules_raises_for_invalid_values():
try:
router._parse_source_modules(["blog_writer,unknown"])
except HTTPException as exc:
assert exc.status_code == 400
assert "Invalid source module" in exc.detail
else:
raise AssertionError("Expected HTTPException for invalid source module")

View File

@@ -0,0 +1,50 @@
import importlib.util
from pathlib import Path
ROOT = Path(__file__).resolve().parents[3]
SERVICE_PATH = ROOT / 'backend' / 'services' / 'content_asset_service.py'
MODELS_PATH = ROOT / 'backend' / 'models' / 'content_asset_models.py'
models_spec = importlib.util.spec_from_file_location('content_asset_models', MODELS_PATH)
models = importlib.util.module_from_spec(models_spec)
models_spec.loader.exec_module(models)
AssetSource = models.AssetSource
service_spec = importlib.util.spec_from_file_location('content_asset_service', SERVICE_PATH)
service_module = importlib.util.module_from_spec(service_spec)
service_spec.loader.exec_module(service_module)
ContentAssetService = service_module.ContentAssetService
class DummyQuery:
def __init__(self):
self.filters = []
def filter(self, expr):
self.filters.append(expr)
return self
def count(self): return 0
def order_by(self, *_args, **_kwargs): return self
def limit(self, *_args, **_kwargs): return self
def offset(self, *_args, **_kwargs): return self
def all(self): return []
class DummyDB:
def __init__(self): self.query_obj = DummyQuery()
def query(self, *_args, **_kwargs): return self.query_obj
def test_get_user_assets_accepts_multiple_source_modules_filter():
db = DummyDB()
service = ContentAssetService(db)
assets, total = service.get_user_assets(
user_id="user-1",
source_modules=[AssetSource.BLOG_WRITER, AssetSource.YOUTUBE],
)
assert assets == []
assert total == 0
assert len(db.query_obj.filters) >= 2

View File

@@ -0,0 +1,35 @@
import { renderHook, waitFor } from '@testing-library/react';
import { useContentAssets } from '../useContentAssets';
const getTokenMock = jest.fn();
jest.mock('@clerk/clerk-react', () => ({
useAuth: () => ({ getToken: getTokenMock }),
}));
describe('useContentAssets', () => {
beforeEach(() => {
getTokenMock.mockResolvedValue('test-token');
global.fetch = jest.fn().mockResolvedValue({
ok: true,
json: async () => ({ assets: [], total: 0, limit: 100, offset: 0 }),
} as Response);
});
afterEach(() => {
jest.clearAllMocks();
});
it('sends all source_module values as repeated query params', async () => {
renderHook(() =>
useContentAssets({ source_module: ['blog_writer', 'youtube'], limit: 50, offset: 0 })
);
await waitFor(() => expect(global.fetch).toHaveBeenCalled());
const calledUrl = (global.fetch as jest.Mock).mock.calls[0][0] as string;
const params = new URL(calledUrl).searchParams;
expect(params.getAll('source_module')).toEqual(['blog_writer', 'youtube']);
});
});

View File

@@ -29,7 +29,7 @@ export interface ContentAsset {
export interface AssetFilters {
asset_type?: 'text' | 'image' | 'video' | 'audio';
source_module?: string | string[]; // Support single or multiple source modules
source_module?: string | string[]; // Supports single or multiple source modules
search?: string;
tags?: string[];
favorites_only?: boolean;
@@ -146,8 +146,10 @@ export const useContentAssets = (filters: AssetFilters = {}) => {
if (currentFilters.source_module) {
// Handle both string and array cases
if (Array.isArray(currentFilters.source_module)) {
// For arrays, use the first value (backend doesn't support multiple yet)
params.append('source_module', currentFilters.source_module[0]);
// Send every selected source module as repeated query params
currentFilters.source_module.forEach((module) => {
params.append('source_module', module);
});
} else {
params.append('source_module', currentFilters.source_module);
}