Compare commits

..

1 Commits

Author SHA1 Message Date
ي
761740de12 Add versioned podcast asset metadata schema and backfill script 2026-05-18 14:36:36 +05:30
7 changed files with 191 additions and 313 deletions

View File

@@ -26,6 +26,7 @@ from services.database import get_db
from middleware.auth_middleware import get_current_user, get_current_user_with_query_token
from api.story_writer.utils.auth import require_authenticated_user
from utils.asset_tracker import save_asset_to_library
from models.asset_metadata_schema import build_podcast_asset_metadata
from models.story_models import StoryAudioResult
from loguru import logger
from ..constants import get_podcast_audio_service, get_podcast_media_dir
@@ -217,11 +218,11 @@ async def upload_podcast_audio(
title=f"Uploaded Audio - {project_id}",
description="Uploaded podcast audio/voice sample",
tags=["podcast", "audio", "upload", project_id],
asset_metadata={
"project_id": project_id,
"type": "uploaded_audio",
"status": "completed",
},
asset_metadata=build_podcast_asset_metadata(
asset_role="uploaded_audio",
project_id=project_id,
origin="podcast.audio.upload",
),
)
except Exception as e:
logger.warning(f"[Podcast] Failed to save audio asset: {e}")
@@ -455,11 +456,12 @@ async def generate_podcast_audio(
provider=result.get("provider"),
model=result.get("model"),
cost=result.get("cost"),
asset_metadata={
"scene_id": request.scene_id,
"scene_title": request.scene_title,
"status": "completed",
},
asset_metadata=build_podcast_asset_metadata(
asset_role="podcast_audio",
project_id=request.project_id,
origin="podcast.audio.generate",
extras={"scene_id": request.scene_id, "scene_title": request.scene_title},
),
)
except Exception as e:
logger.warning(f"[Podcast] Failed to save audio asset: {e}")
@@ -621,13 +623,12 @@ async def combine_podcast_audio(
title=f"Combined Podcast - {request.project_id}",
description=f"Combined podcast audio from {len(request.scene_ids)} scenes",
tags=["podcast", "audio", "combined", request.project_id],
asset_metadata={
"project_id": request.project_id,
"scene_ids": request.scene_ids,
"scene_count": len(request.scene_ids),
"total_duration": total_duration,
"status": "completed",
},
asset_metadata=build_podcast_asset_metadata(
asset_role="combined_podcast_audio",
project_id=request.project_id,
origin="podcast.audio.combine",
extras={"scene_ids": request.scene_ids, "scene_count": len(request.scene_ids), "total_duration": total_duration},
),
)
except Exception as e:
logger.warning(f"[Podcast] Failed to save combined audio asset: {e}")

View File

@@ -18,6 +18,7 @@ from api.story_writer.utils.auth import require_authenticated_user
from services.llm_providers.main_image_generation import generate_image
from services.llm_providers.main_image_editing import edit_image
from utils.asset_tracker import save_asset_to_library
from models.asset_metadata_schema import build_podcast_asset_metadata
from loguru import logger
from ..constants import get_podcast_media_dir, PODCAST_AVATARS_SUBDIR
from ..presenter_personas import choose_persona_id, get_persona
@@ -111,11 +112,11 @@ async def upload_podcast_avatar(
title=f"Podcast Presenter Avatar - {project_id}",
description="Podcast presenter avatar image",
tags=["podcast", "avatar", project_id],
asset_metadata={
"project_id": project_id,
"type": "presenter_avatar",
"status": "completed",
},
asset_metadata=build_podcast_asset_metadata(
asset_role="presenter_avatar",
project_id=project_id,
origin="podcast.avatar.upload",
),
)
except Exception as e:
logger.warning(f"[Podcast] Failed to save avatar asset (non-fatal): {e}")
@@ -223,12 +224,12 @@ async def make_avatar_presentable(
tags=["podcast", "avatar", "presenter", "transformed", project_id],
provider=result.provider,
model=result.model,
asset_metadata={
"project_id": project_id,
"type": "transformed_presenter",
"original_avatar_url": avatar_url,
"status": "completed",
},
asset_metadata=build_podcast_asset_metadata(
asset_role="transformed_presenter",
project_id=project_id,
origin="podcast.avatar.make_presentable",
extras={"original_avatar_url": avatar_url},
),
)
except Exception as e:
logger.warning(f"[Podcast] Failed to save transformed avatar asset: {e}")
@@ -404,14 +405,12 @@ async def generate_podcast_presenters(
tags=["podcast", "avatar", "presenter", project_id],
provider=result.provider,
model=result.model,
asset_metadata={
"project_id": project_id,
"speaker_number": i + 1,
"type": "generated_presenter",
"status": "completed",
"persona_id": selected_persona_id,
"seed": seed,
},
asset_metadata=build_podcast_asset_metadata(
asset_role="generated_presenter",
project_id=project_id,
origin="podcast.avatar.generate",
extras={"speaker_number": i + 1, "persona_id": selected_persona_id, "seed": seed},
),
)
except Exception as e:
logger.warning(f"[Podcast] Failed to save presenter asset: {e}")

View File

@@ -16,6 +16,7 @@ from middleware.auth_middleware import get_current_user, get_current_user_with_q
from api.story_writer.utils.auth import require_authenticated_user
from services.llm_providers.main_image_generation import generate_image, generate_character_image
from utils.asset_tracker import save_asset_to_library
from models.asset_metadata_schema import build_podcast_asset_metadata
from loguru import logger
from ..constants import get_podcast_media_dir
from ..models import PodcastImageRequest, PodcastImageResponse
@@ -417,11 +418,12 @@ async def generate_podcast_scene_image(
tags=["podcast", "scene", request.scene_id],
provider=result.provider,
model=result.model,
asset_metadata={
"scene_id": request.scene_id,
"scene_title": request.scene_title,
"status": "completed",
},
asset_metadata=build_podcast_asset_metadata(
asset_role="podcast_scene_image",
project_id=request.project_id,
origin="podcast.images.generate",
extras={"scene_id": request.scene_id, "scene_title": request.scene_title},
),
)
except Exception as e:
logger.warning(f"[Podcast] Failed to save image asset: {e}")

View File

@@ -0,0 +1,76 @@
"""Shared schema/builders for content asset metadata."""
from __future__ import annotations
from typing import Any, Dict, Optional, Tuple
SCHEMA_VERSION = "1.0"
PODCAST_FEATURE = "podcast_maker"
REQUIRED_KEYS = (
"schema_version",
"feature",
"asset_role",
"project_id",
"status",
"origin",
)
def build_asset_metadata(
*,
feature: str,
asset_role: str,
project_id: Optional[str],
status: str,
origin: str,
extras: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Build normalized, versioned asset metadata payload."""
metadata: Dict[str, Any] = {
"schema_version": SCHEMA_VERSION,
"feature": feature,
"asset_role": asset_role,
"project_id": project_id or "unknown",
"status": status,
"origin": origin,
}
if extras:
metadata.update({k: v for k, v in extras.items() if v is not None})
return metadata
def build_podcast_asset_metadata(
*,
asset_role: str,
project_id: Optional[str],
status: str = "completed",
origin: str,
extras: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Podcast-specific metadata builder."""
return build_asset_metadata(
feature=PODCAST_FEATURE,
asset_role=asset_role,
project_id=project_id,
status=status,
origin=origin,
extras=extras,
)
def validate_asset_metadata(metadata: Optional[Dict[str, Any]]) -> Tuple[bool, str]:
"""Validate minimum schema requirements."""
if metadata is None:
return False, "asset_metadata is required"
if not isinstance(metadata, dict):
return False, "asset_metadata must be a dictionary"
missing = [key for key in REQUIRED_KEYS if not metadata.get(key)]
if missing:
return False, f"asset_metadata missing required keys: {', '.join(missing)}"
if str(metadata.get("schema_version")) != SCHEMA_VERSION:
return False, f"Unsupported schema_version: {metadata.get('schema_version')}"
return True, "ok"

View File

@@ -0,0 +1,63 @@
"""Backfill recent podcast assets with normalized metadata schema."""
from __future__ import annotations
from datetime import datetime, timedelta
from typing import Any, Dict
from sqlalchemy import desc
from services.database import SessionLocal
from models.content_asset_models import ContentAsset, AssetSource
from models.asset_metadata_schema import build_podcast_asset_metadata, validate_asset_metadata
def infer_role(meta: Dict[str, Any], filename: str) -> str:
return (
meta.get("asset_role")
or meta.get("type")
or ("podcast_audio" if filename.lower().endswith((".mp3", ".wav", ".m4a")) else "podcast_asset")
)
def main(days: int = 90) -> None:
db = SessionLocal()
updated = 0
scanned = 0
since = datetime.utcnow() - timedelta(days=days)
try:
assets = (
db.query(ContentAsset)
.filter(ContentAsset.source_module == AssetSource.PODCAST_MAKER)
.filter(ContentAsset.created_at >= since)
.order_by(desc(ContentAsset.created_at))
.all()
)
for asset in assets:
scanned += 1
meta = asset.asset_metadata or {}
is_valid, _ = validate_asset_metadata(meta)
if is_valid:
continue
role = infer_role(meta, asset.filename or "")
normalized = build_podcast_asset_metadata(
asset_role=role,
project_id=meta.get("project_id"),
status=meta.get("status", "completed"),
origin=meta.get("origin", "migration.backfill_podcast_asset_metadata"),
extras=meta,
)
asset.asset_metadata = normalized
db.add(asset)
updated += 1
db.commit()
print(f"Scanned={scanned} Updated={updated} Since={since.isoformat()}")
finally:
db.close()
if __name__ == "__main__":
main()

View File

@@ -1,271 +0,0 @@
"""Self-healing executor for social post engagement recovery.
Implements:
- Per-post evaluation windows and cooldown timers
- Stagnation trigger evaluation with tiered action selection
- Action idempotency keys for edit/comment/thread operations
- Duplicate and over-frequency suppression within cooldown boundaries
- Outcome persistence and safe retry policy for transient failures
"""
from __future__ import annotations
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta, timezone
from enum import Enum
import hashlib
import json
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
class ActionType(str, Enum):
EDIT = "edit"
COMMENT = "comment"
THREAD = "thread"
class ActionTier(str, Enum):
TIER_1 = "tier_1" # low-intensity nudge (comment)
TIER_2 = "tier_2" # medium-intensity enhancement (edit)
TIER_3 = "tier_3" # high-intensity amplification (thread)
SAFE_TRANSIENT_ERROR_CODES = {
"timeout",
"rate_limit",
"service_unavailable",
"network_error",
}
@dataclass
class EvaluationConfig:
per_post_window_minutes: int = 90
min_samples_required: int = 3
cooldown_by_action_seconds: Dict[ActionType, int] = field(
default_factory=lambda: {
ActionType.COMMENT: 30 * 60,
ActionType.EDIT: 2 * 60 * 60,
ActionType.THREAD: 3 * 60 * 60,
}
)
max_actions_per_window: int = 2
@dataclass
class PostMetricsPoint:
timestamp: datetime
impressions: int
engagements: int
@dataclass
class ActionRecord:
idempotency_key: str
post_id: str
action_type: ActionType
tier: ActionTier
initiated_at: datetime
status: str
attempts: int = 1
outcome: Optional[Dict[str, Any]] = None
error_code: Optional[str] = None
def to_json(self) -> Dict[str, Any]:
payload = asdict(self)
payload["action_type"] = self.action_type.value
payload["tier"] = self.tier.value
payload["initiated_at"] = self.initiated_at.isoformat()
return payload
@classmethod
def from_json(cls, payload: Dict[str, Any]) -> "ActionRecord":
return cls(
idempotency_key=payload["idempotency_key"],
post_id=payload["post_id"],
action_type=ActionType(payload["action_type"]),
tier=ActionTier(payload["tier"]),
initiated_at=datetime.fromisoformat(payload["initiated_at"]),
status=payload["status"],
attempts=payload.get("attempts", 1),
outcome=payload.get("outcome"),
error_code=payload.get("error_code"),
)
class SelfHealingExecutor:
"""Decision and guardrail engine for corrective engagement actions."""
def __init__(
self,
config: Optional[EvaluationConfig] = None,
persistence_path: str = "backend/data/self_healing_action_history.json",
) -> None:
self.config = config or EvaluationConfig()
self.persistence_path = Path(persistence_path)
self._history: List[ActionRecord] = self._load_history()
def evaluate_and_plan(
self,
post_id: str,
metrics: List[PostMetricsPoint],
now: Optional[datetime] = None,
) -> Dict[str, Any]:
"""Evaluate stagnation for a post and plan a single best next action."""
now = now or datetime.now(timezone.utc)
window_metrics = self._filter_window(metrics, now)
if len(window_metrics) < self.config.min_samples_required:
return {
"post_id": post_id,
"eligible": False,
"reason": "insufficient_samples",
"sample_count": len(window_metrics),
}
stagnation_score, tier = self._evaluate_stagnation(window_metrics)
action_type = self._choose_action_type(tier)
idempotency_key = self.generate_idempotency_key(post_id, action_type, tier)
if self._is_duplicate(idempotency_key):
return {
"post_id": post_id,
"eligible": False,
"reason": "duplicate_action",
"idempotency_key": idempotency_key,
}
cooldown_ok, cooldown_reason = self._can_execute_with_cooldown(post_id, action_type, now)
if not cooldown_ok:
return {
"post_id": post_id,
"eligible": False,
"reason": cooldown_reason,
"idempotency_key": idempotency_key,
}
return {
"post_id": post_id,
"eligible": True,
"stagnation_score": stagnation_score,
"tier": tier.value,
"action_type": action_type.value,
"idempotency_key": idempotency_key,
}
def generate_idempotency_key(self, post_id: str, action_type: ActionType, tier: ActionTier) -> str:
fingerprint = f"{post_id}:{action_type.value}:{tier.value}".encode("utf-8")
digest = hashlib.sha256(fingerprint).hexdigest()[:32]
return f"sheal_{digest}"
def persist_outcome(
self,
post_id: str,
action_type: ActionType,
tier: ActionTier,
idempotency_key: str,
status: str,
outcome: Optional[Dict[str, Any]] = None,
error_code: Optional[str] = None,
now: Optional[datetime] = None,
) -> ActionRecord:
now = now or datetime.now(timezone.utc)
existing = next((h for h in self._history if h.idempotency_key == idempotency_key), None)
if existing:
existing.status = status
existing.outcome = outcome
existing.error_code = error_code
existing.attempts += 1
existing.initiated_at = now
record = existing
else:
record = ActionRecord(
idempotency_key=idempotency_key,
post_id=post_id,
action_type=action_type,
tier=tier,
initiated_at=now,
status=status,
outcome=outcome,
error_code=error_code,
)
self._history.append(record)
self._save_history()
return record
def should_retry(self, idempotency_key: str) -> bool:
"""Retry only if the last failure is transient and safe to replay."""
rec = next((h for h in self._history if h.idempotency_key == idempotency_key), None)
if not rec or rec.status != "failed":
return False
if rec.error_code not in SAFE_TRANSIENT_ERROR_CODES:
return False
return rec.action_type in {ActionType.COMMENT, ActionType.EDIT, ActionType.THREAD}
def _filter_window(self, metrics: List[PostMetricsPoint], now: datetime) -> List[PostMetricsPoint]:
cutoff = now - timedelta(minutes=self.config.per_post_window_minutes)
return [m for m in metrics if m.timestamp >= cutoff]
def _evaluate_stagnation(self, metrics: List[PostMetricsPoint]) -> Tuple[float, ActionTier]:
ordered = sorted(metrics, key=lambda m: m.timestamp)
first, last = ordered[0], ordered[-1]
imp_delta = max(0, last.impressions - first.impressions)
eng_delta = max(0, last.engagements - first.engagements)
eng_rate = eng_delta / imp_delta if imp_delta > 0 else 0.0
stagnation_score = 1.0 - min(1.0, eng_rate * 20)
if stagnation_score >= 0.8:
return stagnation_score, ActionTier.TIER_3
if stagnation_score >= 0.55:
return stagnation_score, ActionTier.TIER_2
return stagnation_score, ActionTier.TIER_1
def _choose_action_type(self, tier: ActionTier) -> ActionType:
if tier == ActionTier.TIER_1:
return ActionType.COMMENT
if tier == ActionTier.TIER_2:
return ActionType.EDIT
return ActionType.THREAD
def _is_duplicate(self, idempotency_key: str) -> bool:
return any(h.idempotency_key == idempotency_key and h.status in {"success", "running"} for h in self._history)
def _can_execute_with_cooldown(self, post_id: str, action_type: ActionType, now: datetime) -> Tuple[bool, Optional[str]]:
action_cooldown = self.config.cooldown_by_action_seconds[action_type]
same_post = [h for h in self._history if h.post_id == post_id]
recent_in_window = [
h for h in same_post
if h.initiated_at >= now - timedelta(minutes=self.config.per_post_window_minutes)
]
if len(recent_in_window) >= self.config.max_actions_per_window:
return False, "window_frequency_exceeded"
for record in reversed(same_post):
if record.action_type != action_type:
continue
if (now - record.initiated_at).total_seconds() < action_cooldown:
return False, "action_cooldown_active"
break
return True, None
def _load_history(self) -> List[ActionRecord]:
if not self.persistence_path.exists():
return []
try:
payload = json.loads(self.persistence_path.read_text(encoding="utf-8"))
return [ActionRecord.from_json(item) for item in payload]
except (json.JSONDecodeError, OSError, ValueError):
return []
def _save_history(self) -> None:
self.persistence_path.parent.mkdir(parents=True, exist_ok=True)
payload = [item.to_json() for item in self._history]
self.persistence_path.write_text(json.dumps(payload, indent=2), encoding="utf-8")

View File

@@ -11,6 +11,8 @@ import logging
import re
from urllib.parse import urlparse
from models.asset_metadata_schema import validate_asset_metadata
logger = logging.getLogger(__name__)
# Maximum file size (100MB)
@@ -140,6 +142,12 @@ def save_asset_to_library(
if len(title) > 200:
title = title[:197] + '...'
metadata_payload = asset_metadata or {}
is_valid_metadata, validation_message = validate_asset_metadata(metadata_payload)
if not is_valid_metadata:
logger.error(f"Invalid asset metadata: {validation_message}")
return None
service = ContentAssetService(db)
asset = service.create_asset(
user_id=user_id,
@@ -154,7 +162,7 @@ def save_asset_to_library(
description=description,
prompt=prompt,
tags=tags or [],
asset_metadata=asset_metadata or {},
asset_metadata=metadata_payload,
provider=provider,
model=model,
cost=cost,