Update Render build configuration: fix deps, force py3.11, add build script

This commit is contained in:
ajaysi
2026-03-04 09:17:35 +05:30
parent 460e1f398d
commit 45fb9636e2
16 changed files with 1387 additions and 2629 deletions

View File

@@ -5,6 +5,7 @@ Provides REST API access to agent orchestration functionality
from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks
from fastapi.responses import StreamingResponse
from starlette.concurrency import run_in_threadpool
from typing import Dict, List, Any, Optional
import asyncio
import os
@@ -19,7 +20,7 @@ from services.intelligence.agents.agent_orchestrator import (
from services.intelligence.agents.core_agent_framework import AgentAction
from services.intelligence.agents.market_signal_detector import MarketSignal
from services.intelligence.agents.performance_monitor import PerformanceMetric, AgentStatus
from services.database import get_db
from services.database import get_db, get_session_for_user
from services.agent_activity_service import AgentActivityService
from services.agent_activity_serializers import (
DETAIL_TIER_DEBUG,
@@ -76,6 +77,7 @@ def _build_huddle_snapshot(
since_alert_id: int = 0,
since_approval_id: int = 0,
limit: int = 50,
detail_tier: str = DETAIL_TIER_SUMMARY,
) -> Dict[str, Any]:
runs_query = db.query(AgentRun).filter(AgentRun.user_id == user_id)
events_query = db.query(AgentEvent).filter(AgentEvent.user_id == user_id)
@@ -102,10 +104,10 @@ def _build_huddle_snapshot(
approvals_sorted = list(reversed(approvals))
return {
"runs": [_serialize_run(r) for r in runs_sorted],
"events": [_serialize_event(e) for e in events_sorted],
"alerts": [_serialize_alert(a) for a in alerts_sorted],
"approvals": [_serialize_approval(a) for a in approvals_sorted],
"runs": [serialize_run(r, detail_tier) for r in runs_sorted],
"events": [serialize_event(e, detail_tier) for e in events_sorted],
"alerts": [serialize_alert(a, detail_tier) for a in alerts_sorted],
"approvals": [serialize_approval(a, detail_tier) for a in approvals_sorted],
"cursor": {
"run_id": max([since_run_id] + [r.id for r in runs_sorted]),
"event_id": max([since_event_id] + [e.id for e in events_sorted]),
@@ -113,35 +115,6 @@ def _build_huddle_snapshot(
"approval_id": max([since_approval_id] + [a.id for a in approvals_sorted]),
},
}
=======
def _can_access_advanced_activity(current_user: Dict[str, Any]) -> bool:
role = str(current_user.get("role") or "").lower().strip()
metadata = current_user.get("public_metadata")
if isinstance(metadata, dict):
role = str(metadata.get("role") or role).lower().strip()
feature_flags = current_user.get("feature_flags")
if not feature_flags and isinstance(metadata, dict):
feature_flags = metadata.get("feature_flags") or metadata.get("features")
has_flag = False
if isinstance(feature_flags, list):
has_flag = any(str(flag).strip().lower() in {"agent_activity_detailed", "agents_activity_detailed"} for flag in feature_flags)
elif isinstance(feature_flags, dict):
has_flag = bool(feature_flags.get("agent_activity_detailed") or feature_flags.get("agents_activity_detailed"))
if os.getenv("DISABLE_AUTH", "false").lower() == "true":
return True
return role in {"admin", "internal"} or has_flag
def _resolve_detail_tier(requested_tier: str, current_user: Dict[str, Any]) -> str:
tier = normalize_detail_tier(requested_tier)
if tier == DETAIL_TIER_DEBUG and not _can_access_advanced_activity(current_user):
return DETAIL_TIER_SUMMARY
return tier
>>>>>>> pr-370
@router.get("/team")
async def get_agent_team_endpoint(
@@ -708,11 +681,13 @@ async def get_agent_huddle_feed_endpoint(
since_alert_id: int = 0,
since_approval_id: int = 0,
limit: int = 50,
detail_tier: str = DETAIL_TIER_SUMMARY,
current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db),
) -> Dict[str, Any]:
try:
user_id = str(current_user.get("id"))
resolved_tier = _resolve_detail_tier(detail_tier, current_user)
payload = _build_huddle_snapshot(
db=db,
user_id=user_id,
@@ -721,6 +696,7 @@ async def get_agent_huddle_feed_endpoint(
since_alert_id=max(0, int(since_alert_id)),
since_approval_id=max(0, int(since_approval_id)),
limit=max(1, min(int(limit), 200)),
detail_tier=resolved_tier,
)
return {
"success": True,
@@ -735,16 +711,39 @@ async def get_agent_huddle_feed_endpoint(
@router.get("/huddle/stream")
async def stream_agent_huddle_endpoint(
detail_tier: str = DETAIL_TIER_SUMMARY,
current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db),
):
user_id = str(current_user.get("id"))
resolved_tier = _resolve_detail_tier(detail_tier, current_user)
# Helper function to get a snapshot safely within a threadpool
# Manages its own short-lived DB session to avoid blocking the pool
def _fetch_snapshot_safe(user_id: str, limit: int, **kwargs):
session = get_session_for_user(user_id)
if not session:
# Should not happen if user_id is valid, but handle gracefully
return {"runs": [], "events": [], "alerts": [], "approvals": [], "cursor": {}}
try:
return _build_huddle_snapshot(
db=session,
user_id=user_id,
limit=limit,
**kwargs
)
finally:
session.close()
async def event_generator():
cursor = {"run_id": 0, "event_id": 0, "alert_id": 0, "approval_id": 0}
run_signatures: Dict[int, str] = {}
initial_snapshot = _build_huddle_snapshot(db=db, user_id=user_id, limit=50)
initial_snapshot = await run_in_threadpool(
_fetch_snapshot_safe,
user_id=user_id,
limit=50,
detail_tier=resolved_tier
)
cursor.update(initial_snapshot.get("cursor") or {})
for run in initial_snapshot.get("runs", []):
run_signatures[int(run.get("id") or 0)] = json.dumps(
@@ -761,23 +760,36 @@ async def stream_agent_huddle_endpoint(
while True:
try:
delta = _build_huddle_snapshot(
db=db,
# Use threadpool for delta snapshot with fresh session
delta = await run_in_threadpool(
_fetch_snapshot_safe,
user_id=user_id,
since_run_id=int(cursor.get("run_id", 0)),
since_event_id=int(cursor.get("event_id", 0)),
since_alert_id=int(cursor.get("alert_id", 0)),
since_approval_id=int(cursor.get("approval_id", 0)),
limit=50,
detail_tier=resolved_tier,
)
recent_runs = (
db.query(AgentRun)
.filter(AgentRun.user_id == user_id)
.order_by(AgentRun.id.desc())
.limit(100)
.all()
)
# Helper for fetching recent runs in threadpool
def _fetch_recent_runs_safe():
session = get_session_for_user(user_id)
if not session:
return []
try:
return (
session.query(AgentRun)
.filter(AgentRun.user_id == user_id)
.order_by(AgentRun.id.desc())
.limit(100)
.all()
)
finally:
session.close()
recent_runs = await run_in_threadpool(_fetch_recent_runs_safe)
lifecycle_updates: List[Dict[str, Any]] = []
for run in recent_runs:
signature = json.dumps(
@@ -791,7 +803,7 @@ async def stream_agent_huddle_endpoint(
)
previous = run_signatures.get(run.id)
if previous != signature:
lifecycle_updates.append(_serialize_run(run))
lifecycle_updates.append(serialize_run(run, resolved_tier))
run_signatures[run.id] = signature
if len(run_signatures) > 300:

9
backend/render-build.sh Normal file
View File

@@ -0,0 +1,9 @@
#!/usr/bin/env bash
set -euo pipefail
python -m pip install --upgrade pip setuptools wheel
python -m pip install --retries 10 --timeout 120 -r requirements.txt
# Download required NLTK and spaCy models during build phase
python -m spacy download en_core_web_sm
python -m nltk.downloader punkt_tab stopwords averaged_perceptron_tagger

View File

@@ -61,7 +61,6 @@ nltk>=3.8.0
# Image and audio processing for Stability AI
Pillow>=10.0.0
huggingface_hub>=1.1.4
scikit-learn>=1.3.0
# Text-to-Speech (TTS) dependencies
gtts>=2.4.0

1
backend/runtime.txt Normal file
View File

@@ -0,0 +1 @@
python-3.11.9

View File

@@ -0,0 +1,25 @@
"""
SIF Specialized Agents Package.
Exports all specialized agents for easier import.
"""
from .base import SIFBaseAgent
from .strategy_architect import StrategyArchitectAgent
from .content_guardian import ContentGuardianAgent
from .link_graph import LinkGraphAgent
from .citation_expert import CitationExpert
from .content_strategy import ContentStrategyAgent
from .competitor_response import CompetitorResponseAgent
from .seo_optimization import SEOOptimizationAgent
from .social_amplification import SocialAmplificationAgent
__all__ = [
"SIFBaseAgent",
"StrategyArchitectAgent",
"ContentGuardianAgent",
"LinkGraphAgent",
"CitationExpert",
"ContentStrategyAgent",
"CompetitorResponseAgent",
"SEOOptimizationAgent",
"SocialAmplificationAgent"
]

View File

@@ -0,0 +1,78 @@
"""
Base class for SIF specialized agents.
"""
import traceback
import json
import asyncio
import re
from collections import Counter
from typing import List, Dict, Any, Optional
from datetime import datetime
from loguru import logger
from services.intelligence.txtai_service import TxtaiIntelligenceService
from services.intelligence.agents.core_agent_framework import BaseALwrityAgent, AgentAction, TaskProposal
from services.intelligence.sif_agents import SharedLLMWrapper, LocalLLMWrapper
try:
# Try importing from pipeline first (standard location)
from txtai.pipeline import Agent, LLM
TXTAI_AVAILABLE = True
except ImportError:
try:
# Fallback to top-level import
from txtai import Agent, LLM
TXTAI_AVAILABLE = True
except ImportError:
TXTAI_AVAILABLE = False
Agent = None
LLM = None
logger.warning("txtai not available, using fallback implementation")
class SIFBaseAgent(BaseALwrityAgent):
def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str, agent_type: str = "sif_agent", model_name: str = "Qwen/Qwen2.5-3B-Instruct", llm: Any = None, **kwargs):
# Hybrid LLM Strategy:
# 1. Shared LLM for external/high-quality generation
self.shared_llm = SharedLLMWrapper(user_id)
# 2. Local LLM for internal agent work (default for SIF agents)
if llm is None:
if not TXTAI_AVAILABLE:
raise RuntimeError("txtai is required for SIF specialized agents but is not available")
# Explicitly force task='language-generation' (txtai internal name) which maps to 'text-generation'
# Using 'text-generation' directly fails because txtai mapping.get() defaults to 'text2text-generation'
task_to_use = "language-generation"
if any(x in model_name for x in ["Qwen", "Instruct", "GPT", "Llama"]):
task_to_use = "language-generation"
logger.info(f"[{self.__class__.__name__}] Initializing LocalLLMWrapper with model={model_name}, task={task_to_use}")
llm = LocalLLMWrapper(model_name, task=task_to_use)
self.intelligence = intelligence_service
super().__init__(user_id, agent_type, model_name, llm, **kwargs)
def _log_agent_operation(self, operation: str, **kwargs):
"""Standardized logging for agent operations."""
logger.info(f"[{self.__class__.__name__}] {operation}")
if kwargs:
logger.debug(f"[{self.__class__.__name__}] Parameters: {kwargs}")
def _create_txtai_agent(self):
"""
SIF agents use the intelligence service directly, but we can expose
capabilities via a standard agent interface if needed.
"""
if not TXTAI_AVAILABLE or Agent is None:
logger.warning(f"[{self.__class__.__name__}] txtai Agent not available (TXTAI_AVAILABLE={TXTAI_AVAILABLE}, Agent={Agent})")
raise RuntimeError(f"[{self.__class__.__name__}] txtai Agent not available")
# Return a simple agent that can use the LLM
try:
_llm_for_agent = self.llm
for _ in range(3):
_llm_for_agent = getattr(_llm_for_agent, "llm", _llm_for_agent)
return Agent(llm=_llm_for_agent, tools=[])
except Exception as e:
logger.error(f"Failed to create txtai Agent for {self.__class__.__name__}: {e}")
# Fail fast: Re-raise the exception instead of returning None
raise e

View File

@@ -0,0 +1,44 @@
"""
Citation Expert Agent implementation.
"""
from typing import List, Dict, Any, Optional
from datetime import datetime
from loguru import logger
from .base import SIFBaseAgent
from services.intelligence.agents.core_agent_framework import TaskProposal
from services.intelligence.txtai_service import TxtaiIntelligenceService
class CitationExpert(SIFBaseAgent):
"""Agent for fact-checking and source management."""
def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str, **kwargs):
super().__init__(intelligence_service, user_id, agent_type="citation_expert", **kwargs)
async def verify_citations(self, content: str) -> Dict[str, Any]:
"""Verify citations in content against trusted sources."""
# Simple extraction for now
# Could use LLM to extract claims and verify against knowledge base
return {
"verified_claims": [],
"unverified_claims": [],
"missing_citations": []
}
async def propose_daily_tasks(self, context: Dict[str, Any]) -> List[TaskProposal]:
"""Propose fact-checking tasks."""
proposals = []
# 1. Fact Check High-Value Content
proposals.append(TaskProposal(
title="Verify Sources for 'AI Trends 2025'",
description="Double-check statistical claims in your latest draft.",
pillar_id="create",
priority="medium",
estimated_time=20,
source_agent="CitationExpert",
reasoning="Ensures credibility and trust.",
action_type="navigate",
action_url="/content-planning-dashboard"
))
return proposals

View File

@@ -0,0 +1,98 @@
"""
Competitor Response Agent implementation.
"""
from typing import Dict, Any, List, Optional
from datetime import datetime
from loguru import logger
from .base import SIFBaseAgent, TXTAI_AVAILABLE, Agent
from services.intelligence.agents.core_agent_framework import BaseALwrityAgent, TaskProposal
try:
from services.intelligence.sif_integration import SIFIntegrationService
SIF_AVAILABLE = True
except ImportError:
SIF_AVAILABLE = False
class CompetitorResponseAgent(BaseALwrityAgent):
"""
Agent responsible for monitoring competitors and generating counter-strategies.
"""
def __init__(self, user_id: str, shared_llm_name: str, llm: Any = None, **kwargs):
super().__init__(user_id, "competitor_analyst", shared_llm_name, llm, **kwargs)
self.sif_service = None
if SIF_AVAILABLE:
try:
self.sif_service = SIFIntegrationService(user_id)
except Exception as e:
logger.warning(f"Failed to initialize SIF service for CompetitorResponseAgent: {e}")
def _create_txtai_agent(self):
"""Create a specialized txtai Agent for competitor analysis."""
if not TXTAI_AVAILABLE or Agent is None:
return None
_llm_for_agent = getattr(self.llm, "llm", self.llm)
return Agent(
tools=[
{
"name": "competitor_monitor",
"description": "Monitors competitor content and changes",
"target": self._competitor_monitor_tool
},
{
"name": "threat_analyzer",
"description": "Analyzes competitive threats",
"target": self._threat_analyzer_tool
}
],
llm=_llm_for_agent,
max_iterations=5,
# Removed unsupported 'system' argument
# Instruction will be provided via orchestrator context or initial prompt
# Instruction should be provided during invocation or via orchestrator context
)
# Tool Implementations
def _competitor_monitor_tool(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Competitor monitoring tool that retrieves data via SIF.
Args:
context: Dictionary containing 'competitor_url' (optional) to filter monitoring targets.
"""
# Stub implementation
return {"status": "monitored", "changes": []}
def _threat_analyzer_tool(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Threat analysis tool using SIF data.
Args:
context: Dictionary containing analysis parameters like 'focus_area' or 'timeframe'.
"""
# Stub implementation
return {"threat_assessment": "Low", "level": "low"}
async def propose_daily_tasks(self, context: Dict[str, Any]) -> List[TaskProposal]:
"""
Propose tasks based on competitive intel.
"""
proposals = []
# 1. Competitor Gap Fill
proposals.append(TaskProposal(
title="Cover 'AI Agent Frameworks'",
description="Competitor X just published a guide on this. Create a better version.",
pillar_id="create",
priority="high",
estimated_time=60,
source_agent="CompetitorResponseAgent",
reasoning="High-value topic gaining traction.",
action_type="navigate",
action_url="/content-planning-dashboard"
))
return proposals

View File

@@ -0,0 +1,66 @@
"""
Content Guardian Agent implementation.
"""
from typing import List, Dict, Any, Optional
from datetime import datetime
from loguru import logger
from .base import SIFBaseAgent, TXTAI_AVAILABLE, Agent
from services.intelligence.agents.core_agent_framework import TaskProposal
from services.intelligence.txtai_service import TxtaiIntelligenceService
class ContentGuardianAgent(SIFBaseAgent):
"""Agent for monitoring brand consistency and quality."""
def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str, **kwargs):
# Pass kwargs to superclass to handle 'task' and other framework arguments
super().__init__(intelligence_service, user_id, agent_type="content_guardian", **kwargs)
async def _create_txtai_agent(self):
"""Create a specialized txtai Agent for content review."""
if not TXTAI_AVAILABLE or Agent is None:
return None
try:
_llm_for_agent = getattr(self.llm, "llm", self.llm)
return Agent(
tools=[
{
"name": "brand_voice_checker",
"description": "Checks content against brand voice guidelines",
"target": self._check_brand_voice
}
],
llm=_llm_for_agent,
max_iterations=3
)
except Exception as e:
logger.error(f"Failed to create txtai agent for ContentGuardian: {e}")
raise e
def _check_brand_voice(self, content: str) -> Dict[str, Any]:
"""Tool to check brand voice consistency."""
# This would use semantic search to compare against brand guidelines
return {
"consistent": True,
"score": 0.95,
"notes": "Content aligns with professional/authoritative tone."
}
async def propose_daily_tasks(self, context: Dict[str, Any]) -> List[TaskProposal]:
"""Propose quality assurance tasks."""
proposals = []
# 1. Content Freshness Audit
proposals.append(TaskProposal(
title="Audit Old Content",
description="Review top performing posts from >6 months ago for updates.",
pillar_id="create",
priority="low",
estimated_time=30,
source_agent="ContentGuardianAgent",
reasoning="Maintains content relevance and authority.",
action_type="navigate",
action_url="/content-planning-dashboard"
))
return proposals

View File

@@ -0,0 +1,308 @@
"""
Content Strategy Agent implementation.
"""
from typing import Dict, Any, List, Optional
from datetime import datetime
from loguru import logger
from .base import SIFBaseAgent, TXTAI_AVAILABLE, Agent
from services.intelligence.agents.core_agent_framework import BaseALwrityAgent, TaskProposal
from services.seo_tools.content_strategy_service import ContentStrategyService
from services.analytics import PlatformAnalyticsService
try:
from services.intelligence.sif_integration import SIFIntegrationService
SIF_AVAILABLE = True
except ImportError:
SIF_AVAILABLE = False
class ContentStrategyAgent(BaseALwrityAgent):
"""
Agent responsible for content strategy, gap analysis, and optimization.
"""
def __init__(self, user_id: str, shared_llm_name: str, llm: Any = None, **kwargs):
# Correctly pass arguments to superclass
super().__init__(user_id, "content_strategist", shared_llm_name, llm, **kwargs)
self.sif_service = None
self.content_strategy_service = ContentStrategyService()
if SIF_AVAILABLE:
try:
self.sif_service = SIFIntegrationService(user_id)
except Exception as e:
logger.warning(f"Failed to initialize SIF service for ContentStrategyAgent: {e}")
def _create_txtai_agent(self):
"""Create a specialized txtai Agent for content strategy with tools."""
if not TXTAI_AVAILABLE or Agent is None:
return None
# Unwrap tracking wrapper for txtai Agent if present
_llm_for_agent = getattr(self.llm, "llm", self.llm)
return Agent(
tools=[
{
"name": "content_analyzer",
"description": "Analyzes content performance using SIF insights and GSC data",
"target": self._content_analyzer_tool_sync
},
{
"name": "semantic_gap_detector",
"description": "Identifies semantic gaps between current content and high-performing topics",
"target": self._semantic_gap_detector_tool_sync
},
{
"name": "content_optimizer",
"description": "Optimizes content for target keywords and user intent",
"target": self._content_optimizer_tool_sync
},
{
"name": "performance_tracker",
"description": "Tracks content performance over time",
"target": self._performance_tracker_tool_sync
},
{
"name": "sitemap_analyzer",
"description": "Analyzes website structure and publishing velocity via sitemap",
"target": self._sitemap_analyzer_tool_sync
},
{
"name": "gsc_low_ctr_queries",
"description": "Returns low-CTR queries with evidence from cached GSC metrics",
"target": self._cs_gsc_low_ctr_queries_tool_sync
},
{
"name": "gsc_striking_distance_queries",
"description": "Returns striking-distance queries (positions ~820) with evidence",
"target": self._cs_gsc_striking_distance_tool_sync
},
{
"name": "gsc_declining_queries",
"description": "Returns period-over-period declining queries with evidence",
"target": self._cs_gsc_declining_queries_tool_sync
},
{
"name": "gsc_low_ctr_pages",
"description": "Returns low-CTR pages with top contributing queries",
"target": self._cs_gsc_low_ctr_pages_tool_sync
},
{
"name": "gsc_cannibalization_candidates",
"description": "Returns query→multiple-pages cannibalization candidates with target recommendation",
"target": self._cs_gsc_cannibalization_candidates_tool_sync
},
{
"name": "default_content_gsc_plan",
"description": "Runs a default first-pass plan using GSC signals (titles/meta, consolidation, refreshes)",
"target": self._default_content_gsc_plan_tool_sync
},
],
llm=_llm_for_agent,
max_iterations=8,
# Removed unsupported 'system' argument for MultiStepAgent
# Provide instruction as part of initial prompt when invoking the agent
# or store in context via orchestrator
# Instruction should be provided during invocation or via orchestrator context
)
# Tool Implementations
def _sitemap_analyzer_tool_sync(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Analyzes sitemap structure and publishing velocity.
Args:
context: Input parameters for analysis. Example keys:
- sitemap_url: Optional URL to sitemap.xml
- include_lastmod: Whether to include last modification dates
Returns:
A dictionary with summary metrics (e.g., pages, last_mod).
"""
# Stub implementation
return {"status": "analyzed", "pages": 0}
async def _cs_fetch_gsc_analytics(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> Dict[str, Any]:
svc = PlatformAnalyticsService()
data = await svc.get_comprehensive_analytics(self.user_id, platforms=["gsc"], start_date=start_date, end_date=end_date)
gsc = data.get("gsc")
if not gsc or gsc.status != "success":
err = getattr(gsc, "error_message", None) if gsc else "No data"
raise RuntimeError(f"GSC analytics unavailable: {err}")
return {"metrics": gsc.metrics, "date_range": gsc.date_range}
def _cs_gsc_low_ctr_queries_tool_sync(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Fetches low-CTR queries from Google Search Console signals.
Args:
context: Input parameters. Example keys:
- date_range: Optional date range
- limit: Max number of queries to return
Returns:
A dictionary containing items and source.
"""
self._log_agent_operation("Fetching Low CTR Queries (Stub)", context=context)
return {"items": [], "source": "stub"}
def _cs_gsc_striking_distance_tool_sync(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Returns striking-distance queries (positions ~820).
Args:
context: Input parameters. Example keys:
- position_range: Range to consider striking distance
- limit: Max number of queries
Returns:
A dictionary containing items and source.
"""
self._log_agent_operation("Fetching Striking Distance Queries (Stub)", context=context)
return {"items": [], "source": "stub"}
def _cs_gsc_declining_queries_tool_sync(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Returns period-over-period declining queries.
Args:
context: Input parameters. Example keys:
- compare_range: Time windows to compare
- limit: Max number of queries
Returns:
A dictionary containing items and source.
"""
self._log_agent_operation("Fetching Declining Queries (Stub)", context=context)
return {"items": [], "source": "stub"}
def _cs_gsc_low_ctr_pages_tool_sync(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Returns low-CTR pages with top contributing queries.
Args:
context: Input parameters. Example keys:
- date_range: Optional date range
- limit: Max number of pages
Returns:
A dictionary containing items and source.
"""
self._log_agent_operation("Fetching Low CTR Pages (Stub)", context=context)
return {"items": [], "source": "stub"}
def _cs_gsc_cannibalization_candidates_tool_sync(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Returns query→multiple-pages cannibalization candidates with target recommendation.
Args:
context: Input parameters. Example keys:
- limit: Max number of candidates
Returns:
A dictionary containing items and source.
"""
self._log_agent_operation("Fetching Cannibalization Candidates (Stub)", context=context)
return {"items": [], "source": "stub"}
def _default_content_gsc_plan_tool_sync(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Generates a default first-pass plan using GSC signals (titles/meta, consolidation, refreshes).
Args:
context: Input parameters. Example keys:
- target_url: Page to optimize
- date_range: Optional date range for signals
Returns:
A dictionary describing plan_name and actions.
"""
self._log_agent_operation("Generating Default GSC Plan (Stub)", context=context)
return {"plan_name": "Stub Plan", "actions": []}
def _content_analyzer_tool_sync(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Analyzes content performance using SIF insights and Google Search Console data.
Args:
context: Input parameters. Example keys:
- target_url: Page to analyze
- date_range: Optional date range
- include_competitors: Whether to include competitor comparison
Returns:
A dictionary containing content_analysis summary, sif_insights, gsc_performance,
identified_gaps, strategic_recommendations, and timestamp.
"""
return {
"content_analysis": "Completed via SIF + GSC Integration",
"sif_insights": {},
"gsc_performance": {"clicks": 100},
"identified_gaps": [],
"strategic_recommendations": [],
"timestamp": datetime.utcnow().isoformat()
}
def _content_optimizer_tool_sync(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Generates specific diffs/rewrites using LLM-based rewriting and semantic analysis.
Args:
context: Input parameters. Example keys:
- target_url: Page to optimize
- optimization_goal: e.g., 'increase CTR', 'clarify intent'
Returns:
A dictionary containing optimized_content text or diff instructions.
"""
return {"optimized_content": "Optimized text"}
def _semantic_gap_detector_tool_sync(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Detects semantic gaps in current coverage versus target topics.
Args:
context: Input parameters. Example keys:
- topics: Optional list of topics to compare against
Returns:
A list of gap objects with relevance scores.
"""
self._log_agent_operation("Detecting gaps", context=context)
return [{"gap": "advanced techniques", "relevance": 0.9}]
def _performance_tracker_tool_sync(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Tracks performance metrics over time.
Args:
context: Input parameters. Example keys:
- date_range: Optional date range
- metrics: Optional list of metrics to track
Returns:
A dictionary containing views/engagement summary.
"""
self._log_agent_operation("Tracking performance", context=context)
return {"views": 100, "engagement": 0.05}
async def propose_daily_tasks(self, context: Dict[str, Any]) -> List[TaskProposal]:
"""
Propose strategic tasks based on content analysis.
"""
proposals = []
# 1. Content Refresh
proposals.append(TaskProposal(
title="Refresh 'SEO Basics'",
description="Update your SEO basics guide with 2024 trends.",
pillar_id="create",
priority="high",
estimated_time=45,
source_agent="ContentStrategyAgent",
reasoning="Declining traffic and outdated references.",
action_type="navigate",
action_url="/content-planning-dashboard"
))
return proposals

View File

@@ -0,0 +1,59 @@
"""
Link Graph Agent implementation.
"""
from typing import List, Dict, Any, Optional
from datetime import datetime
from loguru import logger
from .base import SIFBaseAgent
from services.intelligence.agents.core_agent_framework import TaskProposal
from services.intelligence.txtai_service import TxtaiIntelligenceService
class LinkGraphAgent(SIFBaseAgent):
"""Agent for internal linking and graph optimization."""
def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str, **kwargs):
super().__init__(intelligence_service, user_id, agent_type="link_graph_expert", **kwargs)
async def analyze_graph(self) -> Dict[str, Any]:
"""Analyze the knowledge graph structure of the content."""
if not self.intelligence.is_initialized():
return {}
try:
# Construct a graph from semantic relationships
graph = await self.intelligence.construct_graph()
# Identify isolated nodes (orphaned content)
orphans = [] # self._find_orphans(graph)
# Identify central nodes (pillars)
hubs = [] # self._find_hubs(graph)
return {
"node_count": 0, # graph.number_of_nodes(),
"edge_count": 0, # graph.number_of_edges(),
"orphaned_content": orphans,
"content_hubs": hubs
}
except Exception as e:
logger.error(f"[{self.__class__.__name__}] Graph analysis failed: {e}")
return {}
async def propose_daily_tasks(self, context: Dict[str, Any]) -> List[TaskProposal]:
"""Propose internal linking tasks."""
proposals = []
# 1. Internal Link Opportunity
proposals.append(TaskProposal(
title="Internal Linking Review",
description="Add internal links to your new post 'Content Strategy 101'.",
pillar_id="create",
priority="medium",
estimated_time=15,
source_agent="LinkGraphAgent",
reasoning="Improves SEO and user navigation.",
action_type="navigate",
action_url="/content-planning-dashboard"
))
return proposals

View File

@@ -0,0 +1,128 @@
"""
SEO Optimization Agent implementation.
"""
from typing import Dict, Any, List, Optional
from datetime import datetime
from loguru import logger
from .base import SIFBaseAgent, TXTAI_AVAILABLE, Agent
from services.intelligence.agents.core_agent_framework import BaseALwrityAgent, TaskProposal
try:
from services.intelligence.sif_integration import SIFIntegrationService
SIF_AVAILABLE = True
except ImportError:
SIF_AVAILABLE = False
class SEOOptimizationAgent(BaseALwrityAgent):
"""
Agent responsible for technical SEO, keyword strategy, and performance optimization.
"""
def __init__(self, user_id: str, shared_llm_name: str, llm: Any = None, **kwargs):
super().__init__(user_id, "seo_specialist", shared_llm_name, llm, **kwargs)
self.sif_service = None
if SIF_AVAILABLE:
try:
self.sif_service = SIFIntegrationService(user_id)
except Exception as e:
logger.warning(f"Failed to initialize SIF service for SEOOptimizationAgent: {e}")
def _create_txtai_agent(self):
"""Create a specialized txtai Agent for SEO optimization."""
if not TXTAI_AVAILABLE or Agent is None:
return None
_llm_for_agent = getattr(self.llm, "llm", self.llm)
return Agent(
tools=[
{
"name": "seo_auditor",
"description": "Performs comprehensive SEO audits",
"target": self._seo_auditor_tool
},
{
"name": "keyword_researcher",
"description": "Researches high-potential keywords",
"target": self._keyword_researcher_tool
},
{
"name": "on_page_optimizer",
"description": "Optimizes on-page elements",
"target": self._on_page_optimizer_tool
},
{
"name": "technical_fixer",
"description": "Fixes technical SEO issues",
"target": self._technical_fixer_tool
}
],
llm=_llm_for_agent,
max_iterations=15,
# Removed unsupported 'system' argument
# Instruction will be provided via orchestrator context or initial prompt
# Instruction should be provided during invocation or via orchestrator context
)
# Tool Implementations
def _seo_auditor_tool(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
SEO audit tool that retrieves existing SEO data via SIF.
Args:
context: Dictionary containing 'website_url' to audit.
"""
# Stub implementation
return {"health": "good", "issues": []}
def _keyword_researcher_tool(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Keyword research tool.
Args:
context: Dictionary containing 'seed_keywords' or 'topic'.
"""
# Stub implementation
return {"keywords": []}
def _on_page_optimizer_tool(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
On-page optimization tool.
Args:
context: Dictionary containing 'url' and 'target_keyword'.
"""
# Stub implementation
return {"optimized": True}
def _technical_fixer_tool(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Technical SEO fixer tool.
Args:
context: Dictionary containing 'issue_id' to fix.
"""
# Stub implementation
return {"fixed": True}
async def propose_daily_tasks(self, context: Dict[str, Any]) -> List[TaskProposal]:
"""
Propose SEO-focused tasks.
"""
proposals = []
# 1. Quick SEO Win
proposals.append(TaskProposal(
title="Fix Broken Links",
description="3 internal links on 'About Us' page are broken.",
pillar_id="distribute",
priority="high",
estimated_time=10,
source_agent="SEOOptimizationAgent",
reasoning="Easy technical win.",
action_type="navigate",
action_url="/content-planning-dashboard"
))
return proposals

View File

@@ -0,0 +1,140 @@
"""
Social Amplification Agent implementation.
"""
from typing import Dict, Any, List, Optional
from datetime import datetime
from loguru import logger
from .base import SIFBaseAgent, TXTAI_AVAILABLE, Agent
from services.intelligence.agents.core_agent_framework import BaseALwrityAgent, TaskProposal
try:
from services.intelligence.sif_integration import SIFIntegrationService
SIF_AVAILABLE = True
except ImportError:
SIF_AVAILABLE = False
class SocialAmplificationAgent(BaseALwrityAgent):
"""
Agent responsible for social media monitoring, content adaptation, and distribution.
"""
def __init__(self, user_id: str, shared_llm_name: str, llm: Any = None, **kwargs):
super().__init__(user_id, "social_media_manager", shared_llm_name, llm, **kwargs)
self.sif_service = None
if SIF_AVAILABLE:
try:
self.sif_service = SIFIntegrationService(user_id)
except Exception as e:
logger.warning(f"Failed to initialize SIF service for SocialAmplificationAgent: {e}")
def _create_txtai_agent(self):
"""Create a specialized txtai Agent for social media."""
if not TXTAI_AVAILABLE or Agent is None:
return None
_llm_for_agent = getattr(self.llm, "llm", self.llm)
return Agent(
tools=[
{
"name": "social_monitor",
"description": "Monitors social trends and conversations",
"target": self._social_monitor_tool
},
{
"name": "content_adapter",
"description": "Adapts long-form content for social platforms",
"target": self._content_adapter_tool
},
{
"name": "engagement_optimizer",
"description": "Optimizes posts for engagement (hashtags, timing)",
"target": self._engagement_optimizer_tool
},
{
"name": "distribution_manager",
"description": "Manages posting schedule",
"target": self._distribution_manager_tool
}
],
llm=_llm_for_agent,
max_iterations=10,
# Removed unsupported 'system' argument
# Instruction will be provided via orchestrator context or initial prompt
# Instruction should be provided during invocation or via orchestrator context
)
# Tool Implementations
def _social_monitor_tool(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Social monitoring tool using SIF.
Args:
context: Dictionary containing monitoring criteria like 'topics' or 'platforms'.
"""
# Stub implementation
return {
"trends": ["AI in marketing", "Content automation"],
"source": "stub",
"timestamp": datetime.utcnow().isoformat()
}
def _content_adapter_tool(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Adapts content for specific platforms.
Args:
context: Dictionary containing 'content' and 'platform' (e.g., 'linkedin', 'twitter').
"""
# Stub implementation
return {"adapted_content": "Social post"}
def _engagement_optimizer_tool(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Optimizes content for engagement (hashtags, timing, hook).
Args:
context: Dictionary containing 'content' to optimize.
"""
# Stub implementation
return {
"optimization_suggestions": ["Use questions"],
"estimated_engagement_score": 8.5,
"timestamp": datetime.utcnow().isoformat()
}
def _distribution_manager_tool(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Manages distribution (scheduling/posting).
Args:
context: Dictionary containing 'post_content' and 'schedule_time'.
"""
# Stub implementation
return {
"distribution_plan": [],
"status": "scheduled",
"timestamp": datetime.utcnow().isoformat()
}
async def propose_daily_tasks(self, context: Dict[str, Any]) -> List[TaskProposal]:
"""
Propose social media tasks.
"""
proposals = []
# 1. Social Post Creation
proposals.append(TaskProposal(
title="Create LinkedIn Thread",
description="Summarize your latest blog post into a 5-tweet thread.",
pillar_id="distribute",
priority="medium",
estimated_time=20,
source_agent="SocialAmplificationAgent",
reasoning="Repurpose existing content.",
action_type="navigate",
action_url="/content-planning-dashboard"
))
return proposals

View File

@@ -0,0 +1,354 @@
"""
Strategy Architect Agent implementation.
"""
import traceback
import re
from typing import List, Dict, Any, Optional
from datetime import datetime
from collections import Counter
from loguru import logger
from services.intelligence.agents.specialized.base import SIFBaseAgent
from services.intelligence.agents.core_agent_framework import TaskProposal
from services.intelligence.txtai_service import TxtaiIntelligenceService
class StrategyArchitectAgent(SIFBaseAgent):
"""Agent for discovering content pillars and identifying strategic gaps."""
def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str, **kwargs):
super().__init__(intelligence_service, user_id, agent_type="strategy_architect", **kwargs)
async def discover_pillars(self) -> List[Dict[str, Any]]:
"""Identify content pillars through semantic clustering."""
self._log_agent_operation("Discovering content pillars")
try:
# Check if intelligence service is initialized
if not self.intelligence.is_initialized():
logger.error(f"[{self.__class__.__name__}] Intelligence service not initialized")
return []
clusters = await self.intelligence.cluster(min_score=0.6)
if not clusters:
logger.warning(f"[{self.__class__.__name__}] No clusters found")
return []
# Create pillar objects with metadata
pillars = []
for i, cluster_indices in enumerate(clusters):
pillar = {
"pillar_id": f"pillar_{i}",
"indices": cluster_indices,
"size": len(cluster_indices),
"confidence": self._calculate_cluster_confidence(cluster_indices)
}
pillars.append(pillar)
logger.debug(f"[{self.__class__.__name__}] Created pillar {pillar['pillar_id']} with {pillar['size']} items")
logger.info(f"[{self.__class__.__name__}] Discovered {len(pillars)} content pillars")
return pillars
except Exception as e:
logger.error(f"[{self.__class__.__name__}] Failed to discover pillars: {e}")
logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}")
return []
def _calculate_cluster_confidence(self, cluster_indices: List[int]) -> float:
"""Calculate confidence score for a cluster based on its size and coherence."""
# Simple confidence based on cluster size - larger clusters are more reliable
return min(1.0, len(cluster_indices) / 10.0)
async def propose_daily_tasks(self, context: Dict[str, Any]) -> List[TaskProposal]:
"""Propose PLAN pillar tasks based on semantic analysis."""
proposals = []
# 1. Pillar Health Check
try:
# We use a shorter timeout or cached check if possible, but discover_pillars is fairly fast
pillars = await self.discover_pillars()
if not pillars:
proposals.append(TaskProposal(
title="Establish Content Pillars",
description="Your content strategy lacks defined pillars. Let's analyze your niche to find core topics.",
pillar_id="plan",
priority="high",
estimated_time=15,
source_agent="StrategyArchitectAgent",
reasoning="No content pillars detected via SIF clustering.",
action_type="navigate",
action_url="/content-planning-dashboard"
))
elif len(pillars) < 3:
proposals.append(TaskProposal(
title="Expand Content Pillars",
description=f"You only have {len(pillars)} active pillars. Consider diversifying your strategy.",
pillar_id="plan",
priority="medium",
estimated_time=20,
source_agent="StrategyArchitectAgent",
reasoning=f"Low pillar diversity ({len(pillars)} detected).",
action_type="navigate",
action_url="/content-planning-dashboard"
))
except Exception as e:
logger.warning(f"[{self.__class__.__name__}] Error checking pillars for proposals: {e}")
# 2. Strategy Review (Generic fallback)
proposals.append(TaskProposal(
title="Review Strategic Goals",
description="Ensure your content output aligns with your quarterly business goals.",
pillar_id="plan",
priority="low",
estimated_time=10,
source_agent="StrategyArchitectAgent",
reasoning="Routine strategy maintenance.",
action_type="navigate",
action_url="/content-planning-dashboard"
))
return proposals
async def find_semantic_gaps(self, competitor_indices: List[Any]) -> List[Dict[str, Any]]:
"""Compare user content vs competitor content to find missing topics."""
self._log_agent_operation("Finding semantic content gaps", competitor_count=len(competitor_indices))
try:
documents = await self._fetch_index_documents()
if not documents:
logger.info(f"[{self.__class__.__name__}] No indexed documents available for gap detection")
return []
competitor_docs, user_docs = [], []
allowed_competitor_ids = set(str(idx) for idx in competitor_indices) if competitor_indices else None
if allowed_competitor_ids:
for idx in competitor_indices:
if isinstance(idx, int) and 0 <= idx < len(documents):
allowed_competitor_ids.add(str(documents[idx].get("id", "")))
for doc in documents:
metadata = doc.get("metadata", {})
role = self._infer_document_role(metadata)
if role == "competitor":
if allowed_competitor_ids and str(doc.get("id")) not in allowed_competitor_ids:
continue
competitor_docs.append(doc)
elif role == "user":
user_docs.append(doc)
if not competitor_docs or not user_docs:
logger.info(
f"[{self.__class__.__name__}] Insufficient split for gap analysis: "
f"user_docs={len(user_docs)}, competitor_docs={len(competitor_docs)}"
)
return []
competitor_topics = self._extract_topic_density(competitor_docs)
user_topics = self._extract_topic_density(user_docs)
competitor_topic_docs = self._map_topic_to_doc_titles(competitor_docs)
user_topic_docs = self._map_topic_to_doc_titles(user_docs)
gaps = []
for topic, competitor_density in competitor_topics.items():
user_density = user_topics.get(topic, 0.0)
coverage_delta = competitor_density - user_density
if coverage_delta <= 0.08:
continue
competitor_support = len(competitor_topic_docs.get(topic, []))
user_support = len(user_topic_docs.get(topic, []))
confidence = max(0.0, min(1.0, (coverage_delta * 0.65) + (min(1.0, competitor_support / 4) * 0.35)))
severity_score = max(0.0, min(1.0, (coverage_delta * 0.7) + (confidence * 0.3)))
priority = "high" if severity_score >= 0.72 else "medium" if severity_score >= 0.45 else "low"
gaps.append({
"topic": topic,
"priority": priority,
"reason": (
f"Competitors mention '{topic}' substantially more often "
f"(density {competitor_density:.2f} vs {user_density:.2f})."
),
"confidence": round(confidence, 3),
"severity_score": round(severity_score, 3),
"coverage_delta": round(coverage_delta, 4),
"topic_density": {
"competitor": round(competitor_density, 4),
"user": round(user_density, 4),
"gap": round(coverage_delta, 4)
},
"evidence": {
"competitor_sample_titles": self._sample_titles_for_topic(competitor_docs, topic),
"user_sample_titles": self._sample_titles_for_topic(user_docs, topic),
"competitor_supporting_docs": competitor_support,
"user_supporting_docs": user_support,
"competitor_doc_count": len(competitor_docs),
"user_doc_count": len(user_docs)
}
})
gaps.sort(
key=lambda item: (
item.get("severity_score", 0),
item.get("confidence", 0),
item.get("topic_density", {}).get("gap", 0)
),
reverse=True
)
return gaps[:12]
except Exception as e:
logger.error(f"[{self.__class__.__name__}] Failed to find semantic gaps: {e}")
logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}")
return []
async def _fetch_index_documents(self) -> List[Dict[str, Any]]:
"""Fetch indexed documents and normalize metadata from txtai result objects."""
if not self.intelligence.is_initialized() or not self.intelligence.embeddings:
return []
embeddings = self.intelligence.embeddings
limit = 0
if hasattr(embeddings, "count"):
try:
limit = int(embeddings.count())
except Exception:
limit = 0
documents = []
candidate_queries = []
if limit > 0:
candidate_queries.extend([
f"select id, text, object from txtai limit {limit}",
f"select id, text, tags from txtai limit {limit}"
])
candidate_queries.extend(["marketing", "content", "seo", "strategy", "social media"])
seen_ids = set()
for query in candidate_queries:
try:
query_limit = limit if query.startswith("select") and limit > 0 else max(10, limit or 50)
rows = embeddings.search(query, limit=query_limit)
except Exception:
continue
for row in rows or []:
doc_id = str(row.get("id", ""))
dedupe_key = doc_id or str(hash(f"{row.get('text','')}::{row.get('score',0)}"))
if dedupe_key in seen_ids:
continue
seen_ids.add(dedupe_key)
documents.append({
"id": doc_id,
"text": row.get("text", "") or "",
"metadata": self._normalize_metadata(row)
})
if limit > 0 and len(documents) >= limit:
break
return documents
def _normalize_metadata(self, row: Dict[str, Any]) -> Dict[str, Any]:
"""Normalize metadata payloads from txtai search rows."""
for key in ("object", "tags", "metadata", "meta"):
payload = row.get(key)
if isinstance(payload, dict):
return payload
if isinstance(payload, str):
try:
import json
parsed = json.loads(payload)
if isinstance(parsed, dict):
return parsed
except Exception:
continue
return {}
def _extract_topic_density(self, documents: List[Dict[str, Any]]) -> Dict[str, float]:
"""Extract topic density from document metadata and titles."""
topic_counter: Counter = Counter()
for doc in documents:
for topic in self._extract_topics_from_document(doc):
topic_counter[topic] += 1
total_docs = max(1, len(documents))
return {
topic: count / total_docs
for topic, count in topic_counter.items()
if count >= 2
}
def _infer_document_role(self, metadata: Dict[str, Any]) -> str:
"""Infer whether a document belongs to user content or competitor content."""
signals = [
metadata.get("type", ""),
metadata.get("doc_type", ""),
metadata.get("content_type", ""),
metadata.get("source", ""),
metadata.get("origin", "")
]
signal_blob = " ".join(str(item).lower() for item in signals if item)
if any(token in signal_blob for token in ("competitor", "rival", "market_peer")):
return "competitor"
if any(token in signal_blob for token in ("user", "owned", "first_party", "customer_site")):
return "user"
return "unknown"
def _extract_topics_from_document(self, doc: Dict[str, Any]) -> List[str]:
"""Extract normalized topic labels from metadata and lightweight text fields."""
metadata = doc.get("metadata", {})
candidates: List[str] = []
for key in ("topics", "topic", "themes", "theme", "keywords", "keyword", "tags", "category", "categories"):
value = metadata.get(key)
if isinstance(value, list):
candidates.extend([str(v) for v in value if v])
elif isinstance(value, str) and value.strip():
candidates.extend(re.split(r"[,|/]", value))
title = metadata.get("title") or doc.get("text", "")[:160]
if title:
candidates.extend(re.findall(r"[a-zA-Z][a-zA-Z\-]{3,}", str(title).lower()))
stopwords = {
"with", "from", "that", "this", "your", "about", "into", "using", "guide", "best",
"tips", "what", "when", "where", "how", "the", "and", "for", "2024", "2025"
}
normalized = {
item.strip().lower()
for item in candidates
if item
and len(item.strip()) >= 4
and not item.strip().isdigit()
and item.strip().lower() not in stopwords
}
return sorted(normalized)
def _map_topic_to_doc_titles(self, documents: List[Dict[str, Any]]) -> Dict[str, List[str]]:
"""Map each topic to a list of document titles that support it."""
mapping: Dict[str, List[str]] = {}
for doc in documents:
metadata = doc.get("metadata", {})
title = str(metadata.get("title") or doc.get("text", "")[:100] or "Untitled")
for topic in self._extract_topics_from_document(doc):
mapping.setdefault(topic, []).append(title)
return mapping
def _sample_titles_for_topic(self, documents: List[Dict[str, Any]], topic: str, limit: int = 3) -> List[str]:
"""Return sample titles for a topic."""
import json
samples = []
topic_lower = topic.lower()
for doc in documents:
metadata = doc.get("metadata", {})
title = metadata.get("title") or doc.get("text", "")[:100]
if not title:
continue
haystack = f"{title} {json.dumps(metadata, default=str)}".lower()
if topic_lower in haystack:
samples.append(str(title))
if len(samples) >= limit:
break
return samples

File diff suppressed because it is too large Load Diff

View File

@@ -618,20 +618,24 @@ def _extract_dominant_colors(img: Image.Image, num_colors: int = 5) -> List[Tupl
List of RGB tuples
"""
# Resize image for faster processing
img_small = img.resize((150, 150))
# Convert to numpy array
img_array = np.array(img_small)
pixels = img_array.reshape(-1, 3)
# Use k-means clustering to find dominant colors
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=num_colors, random_state=42, n_init=10)
kmeans.fit(pixels)
colors = kmeans.cluster_centers_.astype(int)
return [tuple(color) for color in colors]
img_small = img.resize((150, 150)).convert("RGBA")
try:
paletted = img_small.convert("P", palette=Image.ADAPTIVE, colors=max(1, num_colors))
palette = paletted.getpalette() or []
color_counts = paletted.getcolors() or []
color_counts.sort(key=lambda x: x[0], reverse=True)
colors: List[Tuple[int, int, int]] = []
for _, idx in color_counts[:num_colors]:
base = int(idx) * 3
if base + 2 < len(palette):
colors.append((palette[base], palette[base + 1], palette[base + 2]))
return colors
except Exception:
return []
def _assess_image_quality(img: Image.Image) -> Dict[str, Any]:
@@ -855,4 +859,4 @@ def estimate_processing_time(
if complexity and complexity.get("complexity_score", 0) > 80:
adjusted_time *= 1.5
return round(adjusted_time, 1)
return round(adjusted_time, 1)