Fix: Agent orchestrator initialization, singleton LLM loading, and dashboard activity logging

This commit is contained in:
ajaysi
2026-03-03 17:22:50 +05:30
parent a527ab3c76
commit 6c7871bedd
6 changed files with 314 additions and 2848 deletions

View File

@@ -13,12 +13,17 @@ from abc import ABC, abstractmethod
# txtai imports for native agent framework
try:
from txtai import Agent, LLM
TXTAI_AVAILABLE = Agent.__module__ != "txtai.agent.placeholder"
from txtai.pipeline import Agent, LLM
TXTAI_AVAILABLE = True
except ImportError:
TXTAI_AVAILABLE = False
# Fallback implementation for development
logging.warning("txtai not available, using fallback implementation")
try:
from txtai import Agent, LLM
TXTAI_AVAILABLE = True
except ImportError:
TXTAI_AVAILABLE = False
Agent = None
LLM = None
logging.warning("txtai not available")
# Optional MLflow integration
try:
@@ -121,8 +126,10 @@ class BaseALwrityAgent(ABC):
if TXTAI_AVAILABLE:
try:
if not self.llm:
# Hardening: Explicitly set task to avoid 'text2text-generation' default failures
self.llm = LLM(model_name, task="text-generation")
# Allow txtai to auto-detect the correct task for the model
# But force language-generation (txtai mapping for text-generation) for known models
task_hint = "language-generation" if any(x in model_name for x in ["Qwen", "Instruct", "GPT", "Llama"]) else None
self.llm = LLM(path=model_name, task=task_hint) if task_hint else LLM(path=model_name)
self.txtai_agent = self._create_txtai_agent()
logger.info(f"Initialized txtai agent for {agent_type} - {self.agent_id}")
@@ -777,59 +784,8 @@ class StrategyOrchestratorAgent(BaseALwrityAgent):
if not TXTAI_AVAILABLE:
return None
return Agent(
llm=self.llm,
tools=[
{
"name": "market_signal_detector",
"description": "Detects market changes and competitor activities",
"target": self._market_signal_detector_tool
},
{
"name": "google_trends_fetcher",
"description": "Fetches Google Trends data and embeds it into SIF for retrieval",
"target": self._google_trends_fetcher_tool
},
{
"name": "agent_coordinator",
"description": "Coordinates actions between multiple agents",
"target": self._agent_coordinator_tool
},
{
"name": "performance_analyzer",
"description": "Analyzes marketing performance metrics",
"target": self._performance_analyzer_tool
},
{
"name": "strategy_synthesizer",
"description": "Synthesizes unified strategies from multiple inputs",
"target": self._strategy_synthesizer_tool
},
{
"name": "task_delegator",
"description": "Delegates specific tasks to specialized agents (content, competitor, seo, social)",
"target": self._delegate_task_tool
}
],
max_iterations=15,
system=self.get_effective_system_prompt(f"""You are the Marketing Strategy Orchestrator for ALwrity user {self.user_id}.
Your role is to coordinate all marketing agents, analyze market signals,
and synthesize unified strategies.
Key Responsibility: DELEGATE tasks to specialized agents.
- Content Strategy Agent: For content analysis, gaps, and optimization.
- Competitor Response Agent: For monitoring and counter-strategies.
- SEO Optimization Agent: For technical SEO and keywords.
- Social Amplification Agent: For social trends and distribution.
Use the 'task_delegator' tool to assign work to these agents.
Do not just plan; EXECUTE by delegating.
Always prioritize user goals and maintain safety constraints.
Coordinate multi-agent responses to market changes effectively."""
)
)
llm = getattr(self.llm, "llm", self.llm)
return Agent(llm=llm, tools=[], max_iterations=15)
async def _market_signal_detector_tool(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""Tool for detecting market signals"""

View File

@@ -47,7 +47,7 @@ logger = get_service_logger(__name__)
class AgentTeamConfiguration:
"""Configuration for the complete agent team"""
user_id: str
shared_llm: str = "Qwen/Qwen3-4B-Instruct-2507"
shared_llm: str = "Qwen/Qwen2.5-3B-Instruct" # Updated to a stable model known for text-generation
max_iterations: int = 15
enable_safety: bool = True
enable_performance_monitoring: bool = True
@@ -70,6 +70,9 @@ class ALwrityAgentOrchestrator:
self.performance_monitor: Optional[AgentPerformanceMonitor] = None
self.safety_framework: Optional[Dict[str, Any]] = None
# Initialize execution history for tracking agent activities
self.execution_history: List[Dict[str, Any]] = []
# Initialize components
self._initialize_components()
@@ -80,8 +83,14 @@ class ALwrityAgentOrchestrator:
try:
# Initialize shared LLM
if TXTAI_AVAILABLE:
# Hardening: Explicitly set task to avoid 'text2text-generation' default failures
self.llm = LLM(self.config.shared_llm, task="text-generation")
try:
# Allow auto-detection of task
self.llm = LLM(self.config.shared_llm)
except Exception as e:
logger.error(
f"Failed to initialize shared LLM '{self.config.shared_llm}': {e}"
)
raise
else:
self.llm = None
@@ -117,6 +126,36 @@ class ALwrityAgentOrchestrator:
status = onboarding_service.get_onboarding_status(self.user_id)
if not status.get("is_completed", False):
logger.info(f"Skipping agent initialization for user {self.user_id} - Onboarding incomplete")
self.execution_history.append({
"user_id": self.user_id,
"timestamp": datetime.utcnow().isoformat(),
"agent_id": "system",
"action": "system_check",
"status": "pending",
"details": "Agent initialization paused. Waiting for user onboarding completion.",
"agent_name": "System Orchestrator",
"agent_type": "orchestrator"
})
# Persist this check to DB so it survives refreshes
try:
from services.database import get_session_for_user
from services.agent_activity_service import AgentActivityService
db = get_session_for_user(self.user_id)
if db:
try:
activity_service = AgentActivityService(db, self.user_id)
run = activity_service.start_run(agent_type="system_orchestrator", prompt="System Check")
activity_service.finish_run(
run_id=run.id,
success=True,
result_summary="Agent initialization paused. Waiting for user onboarding completion."
)
finally:
db.close()
except Exception:
pass
return
except Exception as e:
logger.warning(f"Could not check onboarding status for {self.user_id}: {e}")
@@ -141,10 +180,14 @@ class ALwrityAgentOrchestrator:
except Exception:
pass
# Track successful initializations
initialized_agents = []
# Content Strategy Agent
if enabled_by_key.get("content_strategist", True):
self.content_agent = ContentStrategyAgent(self.user_id, self.config.shared_llm, llm=self.llm)
self.agents['content'] = self.content_agent
initialized_agents.append("Content Strategist")
# Strategy Architect Agent
if enabled_by_key.get("strategy_architect", True):
@@ -152,43 +195,37 @@ class ALwrityAgentOrchestrator:
intel_service = TxtaiIntelligenceService(self.user_id)
self.strategy_agent = StrategyArchitectAgent(intel_service, self.user_id)
self.agents['strategy'] = self.strategy_agent
initialized_agents.append("Strategy Architect")
# Competitor Response Agent
if enabled_by_key.get("competitor_analyst", True):
self.competitor_agent = CompetitorResponseAgent(self.user_id, self.config.shared_llm, llm=self.llm)
self.agents['competitor'] = self.competitor_agent
initialized_agents.append("Competitor Analyst")
# SEO Optimization Agent
if enabled_by_key.get("seo_specialist", True):
self.seo_agent = SEOOptimizationAgent(self.user_id, self.config.shared_llm, llm=self.llm)
self.agents['seo'] = self.seo_agent
initialized_agents.append("SEO Specialist")
# Social Amplification Agent
if enabled_by_key.get("social_media_manager", True):
self.social_agent = SocialAmplificationAgent(self.user_id, self.config.shared_llm, llm=self.llm)
self.agents['social'] = self.social_agent
# Strategy Architect Agent
if enabled_by_key.get("strategy_architect", True):
from services.intelligence.txtai_service import TxtaiIntelligenceService
intel_service = TxtaiIntelligenceService(self.user_id)
self.strategy_agent = StrategyArchitectAgent(intel_service, self.user_id)
self.agents['strategy'] = self.strategy_agent
initialized_agents.append("Social Media Manager")
# Trend Surfer Agent
if enabled_by_key.get("trend_surfer", True):
# TrendSurferAgent needs TxtaiIntelligenceService, which we might need to get from SIF or initialize
# For now, we assume SIF integration is handled elsewhere or we pass a mock/stub if needed
# But wait, TrendSurferAgent constructor is (intelligence_service, user_id)
# We need to get the intelligence service here.
# Since AgentOrchestrator doesn't hold TxtaiIntelligenceService directly (SIFIntegrationService does),
# this is tricky.
# However, SIFIntegrationService initializes AgentOrchestrator.
# Let's import TxtaiIntelligenceService and initialize it here for the agent
from services.intelligence.txtai_service import TxtaiIntelligenceService
intel_service = TxtaiIntelligenceService(self.user_id)
self.trend_surfer_agent = TrendSurferAgent(intel_service, self.user_id)
self.agents['trend'] = self.trend_surfer_agent
# TrendSurferAgent needs TxtaiIntelligenceService
try:
from services.intelligence.txtai_service import TxtaiIntelligenceService
intel_service = TxtaiIntelligenceService(self.user_id)
self.trend_surfer_agent = TrendSurferAgent(intel_service, self.user_id)
self.agents['trend'] = self.trend_surfer_agent
initialized_agents.append("Trend Surfer")
except Exception as e:
logger.error(f"Failed to initialize TrendSurferAgent: {e}")
# Content Guardian Agent
if enabled_by_key.get("content_guardian", True):
@@ -206,12 +243,52 @@ class ALwrityAgentOrchestrator:
sif_service=None # SIF service is optional/circular dependency handling
)
self.agents['guardian'] = self.content_guardian_agent
initialized_agents.append("Content Guardian")
logger.info(f"Initialized ContentGuardianAgent for user {self.user_id}")
except Exception as e:
logger.error(f"Failed to initialize ContentGuardianAgent: {e}")
logger.info(f"Created {len(self.agents)} specialized agents for user {self.user_id}")
# Log initialization activity
if initialized_agents:
# 1. Add to in-memory history
self.execution_history.append({
"user_id": self.user_id,
"timestamp": datetime.utcnow().isoformat(),
"agent_id": "system",
"action": "agent_initialization",
"status": "completed",
"details": f"Successfully initialized agent team: {', '.join(initialized_agents)}",
"agent_name": "System Orchestrator",
"agent_type": "orchestrator"
})
# 2. Add to persistent database history (so dashboard sees it on refresh)
try:
from services.database import get_session_for_user
from services.agent_activity_service import AgentActivityService
db = get_session_for_user(self.user_id)
if db:
try:
activity_service = AgentActivityService(db, self.user_id)
# Create a run record
run = activity_service.start_run(
agent_type="system_orchestrator",
prompt="Initialize Autonomous Agent Team"
)
# Immediately finish it
activity_service.finish_run(
run_id=run.id,
success=True,
result_summary=f"Successfully initialized agent team: {', '.join(initialized_agents)}"
)
finally:
db.close()
except Exception as e:
logger.error(f"Failed to log initialization activity to DB: {e}")
except Exception as e:
logger.error(f"Error creating specialized agents for user {self.user_id}: {e}")
raise e
@@ -258,6 +335,11 @@ class ALwrityAgentOrchestrator:
"4. Provide specific action recommendations.\n\n"
"Return a comprehensive strategy with specific actions, priorities, and expected outcomes."
)
# Ensure orchestrator is initialized
if not self.orchestrator_agent:
self._create_orchestrator_agent()
orchestrator_prompt = self.orchestrator_agent.build_task_prompt(instruction=instruction, task_context=context)
result = await self.orchestrator_agent.run(orchestrator_prompt)
@@ -322,6 +404,7 @@ class ALwrityAgentOrchestrator:
"timestamp": datetime.utcnow().isoformat(),
"agent_statuses": agent_statuses,
"performance_summary": performance_summary,
"recent_activity": self.get_execution_history(limit=5),
"market_signals_active": self.config.enable_market_signals,
"safety_enabled": self.config.enable_safety,
"performance_monitoring_enabled": self.config.enable_performance_monitoring
@@ -342,6 +425,10 @@ class ALwrityAgentOrchestrator:
# Helper methods
def get_execution_history(self, limit: int = 100) -> List[Dict[str, Any]]:
"""Get execution history for this orchestrator"""
return self.execution_history[-limit:]
async def _prepare_orchestrator_context(self, market_context: Dict[str, Any]) -> Dict[str, Any]:
"""Prepare comprehensive context for orchestrator"""
context = {
@@ -382,7 +469,13 @@ class AgentOrchestrationService:
self.orchestrators[user_id] = ALwrityAgentOrchestrator(config)
logger.info(f"Created new orchestrator for user: {user_id}")
return self.orchestrators[user_id]
# Ensure initialization happened, if not try again (e.g. if onboarding was just completed)
orchestrator = self.orchestrators[user_id]
if not orchestrator.agents and not orchestrator.execution_history:
logger.info(f"Orchestrator for {user_id} has no agents. Attempting re-initialization.")
orchestrator._create_specialized_agents()
return orchestrator
async def execute_marketing_strategy(self, user_id: str, market_context: Dict[str, Any]) -> Dict[str, Any]:
"""Execute marketing strategy for a user"""

View File

@@ -13,12 +13,19 @@ from abc import ABC, abstractmethod
# txtai imports for native agent framework
try:
from txtai import Agent, LLM
TXTAI_AVAILABLE = Agent.__module__ != "txtai.agent.placeholder"
from txtai.pipeline import Agent, LLM
TXTAI_AVAILABLE = True
except ImportError:
TXTAI_AVAILABLE = False
# Fallback implementation for development
logging.warning("txtai not available, using fallback implementation")
try:
from txtai import Agent, LLM
TXTAI_AVAILABLE = True
except ImportError:
TXTAI_AVAILABLE = False
Agent = None
LLM = None
logging.warning("txtai not available")
_core_llm_cache = {}
# Optional MLflow integration
try:
@@ -162,12 +169,15 @@ class BaseALwrityAgent(ABC):
_prompt_context_cache: Dict[str, Dict[str, Any]] = {}
_profile_cache: Dict[str, Dict[str, Any]] = {}
def __init__(self, user_id: str, agent_type: str, model_name: str = "Qwen/Qwen3-4B-Instruct-2507", llm: Any = None, enable_tracing: bool = True):
def __init__(self, user_id: str, agent_type: str, model_name: str = "Qwen/Qwen3-4B-Instruct-2507", llm: Any = None, enable_tracing: bool = True, **kwargs):
self.user_id = user_id
self.agent_type = agent_type
self.model_name = model_name
self.agent_id = f"{agent_type}_{user_id}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}"
self.enable_tracing = enable_tracing
self.kwargs = kwargs
# Optional task hint; do not enforce
self.llm_task = str(self.kwargs.get("task") or "").strip()
self.performance = AgentPerformance(
agent_id=self.agent_id,
total_actions=0,
@@ -191,26 +201,90 @@ class BaseALwrityAgent(ABC):
self._agent_profile = self._load_agent_profile_overrides()
self._prompt_context = self._load_prompt_context()
# Note: We cannot await async methods in __init__.
# If _create_txtai_agent is async, it must be called explicitly after initialization
# or we must use a factory method.
# For now, we will handle sync initialization here, and specialized agents
# must handle their async initialization separately or via a sync wrapper.
if TXTAI_AVAILABLE:
try:
if not self.llm:
# Create new LLM if not provided
# Hardening: Explicitly set task to avoid 'text2text-generation' default failures
raw_llm = LLM(model_name, task="text-generation")
# Wrap it
try:
# Use global cache for core agent LLMs too
cache_key = f"{model_name}:language-generation"
if cache_key in _core_llm_cache:
raw_llm = _core_llm_cache[cache_key]
else:
# Use language-generation for txtai internal mapping
raw_llm = LLM(path=model_name, task="language-generation")
_core_llm_cache[cache_key] = raw_llm
except Exception as e:
logger.error(f"Failed to initialize LLM for {agent_type}: {e}")
raise
self.llm = TrackingLLMWrapper(raw_llm, self.user_id, self.model_name)
try:
self.txtai_agent = self._create_txtai_agent()
logger.info(f"Initialized txtai agent for {agent_type} - {self.agent_id}")
# _create_txtai_agent might be async or sync
# CRITICAL FIX: We cannot await here. If it's async, we must run it in a loop or warn.
# Given specialized agents define it as async, we need a sync wrapper or run_until_complete.
if asyncio.iscoroutinefunction(self._create_txtai_agent):
try:
# Check if we are in a running loop
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop and loop.is_running():
# We are already in a loop (e.g. server), we can't block.
# This is a design flaw in initializing async agents in __init__.
# We will defer initialization or use a sync wrapper if possible.
logger.warning(f"Cannot await async _create_txtai_agent for {agent_type} in __init__ within running loop. Initializing via create_task (agent may not be ready immediately).")
# Create a task to initialize it
async def async_init():
try:
self.txtai_agent = await self._create_txtai_agent()
logger.info(f"Async initialized txtai agent for {agent_type} - {self.agent_id}")
except Exception as e:
logger.error(f"Async initialization failed for {agent_type}: {e}")
loop.create_task(async_init())
# Temporarily set to None or a placeholder, but we can't set it to the result yet
self.txtai_agent = None
else:
# No running loop, we can run_until_complete
if not loop:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.txtai_agent = loop.run_until_complete(self._create_txtai_agent())
except Exception as e:
logger.error(f"Failed to handle async initialization for {agent_type}: {e}")
# Try fallback to sync run if possible
try:
self.txtai_agent = asyncio.run(self._create_txtai_agent())
except Exception as e2:
logger.error(f"Even asyncio.run failed: {e2}")
raise e
else:
self.txtai_agent = self._create_txtai_agent()
if self.txtai_agent:
logger.info(f"Initialized txtai agent for {agent_type} - {self.agent_id}")
else:
raise RuntimeError(f"txtai agent creation returned None for {agent_type}")
except Exception as inner_e:
logger.warning(f"Could not initialize specific txtai agent for {agent_type}: {inner_e}")
self.txtai_agent = self._create_fallback_agent()
logger.error(f"Could not initialize specific txtai agent for {agent_type}: {inner_e}")
# Fail fast: Re-raise exception
raise inner_e
except Exception as e:
logger.error(f"Failed to initialize txtai agent for {agent_type}: {e}")
self.txtai_agent = self._create_fallback_agent()
# Fail fast: Re-raise exception
raise e
else:
self.txtai_agent = self._create_fallback_agent()
raise RuntimeError(f"txtai not available for {agent_type}")
# Initialize safety framework
self.safety_framework = get_safety_framework(user_id)
@@ -253,6 +327,10 @@ class BaseALwrityAgent(ABC):
return "strategy_orchestrator"
return value
def _resolve_llm_task(self, requested_task: Optional[str]) -> str:
# No enforcement; return provided value or empty string
return str(requested_task or "").strip()
def _load_agent_profile_overrides(self) -> Dict[str, Any]:
cache_key = f"{self.user_id}:{self.agent_key}"
cached = BaseALwrityAgent._profile_cache.get(cache_key)
@@ -880,8 +958,8 @@ class BaseALwrityAgent(ABC):
class StrategyOrchestratorAgent(BaseALwrityAgent):
"""Central orchestrator agent that coordinates all marketing agents"""
def __init__(self, user_id: str, market_detector: Any = None, performance_monitor: Any = None, llm: Any = None):
super().__init__(user_id, "StrategyOrchestrator", llm=llm)
def __init__(self, user_id: str, market_detector: Any = None, performance_monitor: Any = None, llm: Any = None, **kwargs):
super().__init__(user_id, "StrategyOrchestrator", llm=llm, **kwargs)
self.market_detector = market_detector
self.performance_monitor = performance_monitor
self.sub_agents = {}
@@ -896,66 +974,10 @@ class StrategyOrchestratorAgent(BaseALwrityAgent):
if not TXTAI_AVAILABLE:
return None
return Agent(
llm=self.llm,
tools=[
{
"name": "market_signal_detector",
"description": "Detects market changes and competitor activities",
"target": self._market_signal_detector_tool
},
{
"name": "google_trends_fetcher",
"description": "Fetches Google Trends data and embeds it into SIF for retrieval",
"target": self._google_trends_fetcher_tool
},
{
"name": "agent_coordinator",
"description": "Coordinates actions between multiple agents",
"target": self._agent_coordinator_tool
},
{
"name": "performance_analyzer",
"description": "Analyzes marketing performance metrics",
"target": self._performance_analyzer_tool
},
{
"name": "strategy_synthesizer",
"description": "Synthesizes unified strategies from multiple inputs",
"target": self._strategy_synthesizer_tool
},
{
"name": "task_delegator",
"description": "Delegates specific tasks to specialized agents (content, competitor, seo, social)",
"target": self._delegate_task_tool
},
{
"name": "kickoff_gsc_first_pass",
"description": "Kicks off first-pass execution by invoking SEO/Content default GSC plans",
"target": self._kickoff_gsc_first_pass_tool
}
],
max_iterations=15,
system=self.get_effective_system_prompt(f"""You are the Marketing Strategy Orchestrator for ALwrity user {self.user_id}.
Your role is to coordinate all marketing agents, analyze market signals,
and synthesize unified strategies.
Key Responsibility: DELEGATE tasks to specialized agents.
- Content Strategy Agent: For content analysis, gaps, and optimization.
- Competitor Response Agent: For monitoring and counter-strategies.
- SEO Optimization Agent: For technical SEO and keywords.
- Social Amplification Agent: For social trends and distribution.
Use the 'task_delegator' tool to assign work to these agents.
Do not just plan; EXECUTE by delegating.
Always prioritize user goals and maintain safety constraints.
Coordinate multi-agent responses to market changes effectively.
First, call 'kickoff_gsc_first_pass' to ground the plan on live GSC signals."""
)
)
_llm_for_agent = self.llm
for _ in range(3):
_llm_for_agent = getattr(_llm_for_agent, "llm", _llm_for_agent)
return Agent(llm=_llm_for_agent, tools=[], max_iterations=15)
async def _market_signal_detector_tool(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""Tool for detecting market signals"""

File diff suppressed because it is too large Load Diff

View File

@@ -18,8 +18,8 @@ class TrendSurferAgent(SIFBaseAgent):
"Surfs" the trends detected by MarketSignalDetector to propose timely content.
"""
def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str):
super().__init__(intelligence_service, user_id, agent_type="trend_surfer")
def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str, **kwargs):
super().__init__(intelligence_service, user_id, agent_type="trend_surfer", **kwargs)
self.user_id = user_id
self.signal_detector = MarketSignalDetector(user_id)
self.trends_service = GoogleTrendsService()

View File

@@ -25,8 +25,9 @@ except ImportError:
class SharedLLMWrapper:
"""Wraps the shared ALwrity LLM service to look like a txtai LLM."""
def __init__(self, user_id: str):
def __init__(self, user_id: str, task: Optional[str] = None):
self.user_id = user_id
self.task = task
def generate(self, prompt: str, **kwargs) -> str:
"""Generate text using the shared LLM provider."""
@@ -41,25 +42,54 @@ class SharedLLMWrapper:
def __call__(self, prompt: str, **kwargs) -> str:
return self.generate(prompt, **kwargs)
_local_llm_cache = {}
class LocalLLMWrapper:
"""
Lazily loads a local LLM via txtai.
This prevents blocking server startup with heavy model loads.
Lazily loads a local LLM via txtai and caches it globally.
This prevents blocking server startup and redundant model loads.
"""
def __init__(self, model_path: str, task: str = "text-generation"):
def __init__(self, model_path: str, task: str = None):
self.model_path = model_path
self.task = task
self._llm = None
# No self._llm here, we use the global cache
@property
def llm(self):
if self._llm is None:
if LLM is None:
raise ImportError("txtai.pipeline.LLM is not available")
logger.info(f"Loading local LLM: {self.model_path} with task: {self.task}")
# Explicitly set task to avoid 'text2text-generation' default failures
self._llm = LLM(path=self.model_path, task=self.task)
return self._llm
# Create a cache key based on model path and task
cache_key = f"{self.model_path}:{self.task}"
if cache_key in _local_llm_cache:
return _local_llm_cache[cache_key]
if LLM is None:
raise ImportError("txtai.pipeline.LLM is not available")
task_to_use = (self.task or "language-generation").strip()
# Explicitly force language-generation for known models if auto-detect fails
if any(x in self.model_path for x in ["Qwen", "Instruct", "GPT", "Llama"]):
task_to_use = "language-generation"
if task_to_use == "text-generation":
task_to_use = "language-generation"
logger.info(f"Loading local LLM (singleton): {self.model_path} (task={task_to_use})")
try:
_local_llm_cache[cache_key] = LLM(path=self.model_path, task=task_to_use)
except Exception as e:
try:
import transformers
from transformers.pipelines import SUPPORTED_TASKS
logger.error(
f"LocalLLMWrapper init failed (model={self.model_path}, requested_task={task_to_use}, "
f"transformers={getattr(transformers, '__version__', 'unknown')}, "
f"supported_tasks={sorted(list(SUPPORTED_TASKS.keys()))[:50]})"
)
except Exception:
pass
logger.error(f"Failed to initialize LocalLLMWrapper: {e}")
raise e
return _local_llm_cache[cache_key]
def __call__(self, prompt: str, **kwargs) -> str:
return self.llm(prompt, **kwargs)
@@ -75,13 +105,9 @@ class SIFBaseAgent(BaseALwrityAgent):
# 2. Local LLM for internal agent work (default for SIF agents)
if llm is None:
if TXTAI_AVAILABLE and LLM is not None:
# Use Lazy Local LLM when txtai LLM is available
# Hardening: Specify 'text-generation' task to avoid text2text defaults
llm = LocalLLMWrapper(model_name, task="text-generation")
else:
# Fallback to Shared if txtai or LLM is not available
llm = self.shared_llm
if not (TXTAI_AVAILABLE and LLM is not None):
raise RuntimeError("txtai LLM is required for SIF agents but is not available")
llm = LocalLLMWrapper(model_name, task="text-generation")
super().__init__(user_id, agent_type, model_name, llm)
self.intelligence = intelligence_service
@@ -98,14 +124,16 @@ class SIFBaseAgent(BaseALwrityAgent):
capabilities via a standard agent interface if available.
"""
if not TXTAI_AVAILABLE or Agent is None:
logger.debug(f"[{self.__class__.__name__}] txtai Agent not available, using fallback agent")
return self._create_fallback_agent()
raise RuntimeError(f"[{self.__class__.__name__}] txtai Agent not available")
try:
return Agent(llm=self.llm, tools=[])
_llm_for_agent = self.llm
for _ in range(3):
_llm_for_agent = getattr(_llm_for_agent, "llm", _llm_for_agent)
return Agent(llm=_llm_for_agent, tools=[])
except Exception as e:
logger.warning(f"[{self.__class__.__name__}] Failed to create txtai Agent: {e}")
return self._create_fallback_agent()
logger.error(f"[{self.__class__.__name__}] Failed to create txtai Agent: {e}")
raise
class StrategyArchitectAgent(SIFBaseAgent):
"""Agent for discovering content pillars and identifying strategic gaps."""