Make SIF fail fast and add low-cost remote LLM fallback
This commit is contained in:
@@ -47,7 +47,7 @@ logger = get_service_logger(__name__)
|
||||
class AgentTeamConfiguration:
|
||||
"""Configuration for the complete agent team"""
|
||||
user_id: str
|
||||
shared_llm: str = "Qwen/Qwen2.5-3B-Instruct" # Updated to a stable model known for text-generation
|
||||
shared_llm: str = "Qwen/Qwen2.5-1.5B-Instruct" # Reduced default memory footprint for local environments
|
||||
max_iterations: int = 15
|
||||
enable_safety: bool = True
|
||||
enable_performance_monitoring: bool = True
|
||||
|
||||
@@ -40,10 +40,17 @@ from services.intelligence.monitoring.semantic_dashboard import RealTimeSemantic
|
||||
from services.intelligence.agents.safety_framework import get_safety_framework
|
||||
from services.agent_activity_service import AgentActivityService, build_agent_event_payload
|
||||
from services.intelligence.agents.agent_usage_tracking import track_agent_usage_sync
|
||||
from services.llm_providers.main_text_generation import llm_text_gen
|
||||
import time
|
||||
|
||||
logger = get_service_logger(__name__)
|
||||
|
||||
LOW_COST_REMOTE_MODELS = [
|
||||
"Qwen/Qwen2.5-1.5B-Instruct",
|
||||
"Qwen/Qwen2.5-0.5B-Instruct",
|
||||
"TinyLlama/TinyLlama-1.1B-Chat-v1.0",
|
||||
]
|
||||
|
||||
class TrackingLLMWrapper:
|
||||
"""
|
||||
Wrapper for LLM instances to transparently track usage.
|
||||
@@ -169,7 +176,7 @@ class BaseALwrityAgent(ABC):
|
||||
_prompt_context_cache: Dict[str, Dict[str, Any]] = {}
|
||||
_profile_cache: Dict[str, Dict[str, Any]] = {}
|
||||
|
||||
def __init__(self, user_id: str, agent_type: str, model_name: str = "Qwen/Qwen3-4B-Instruct-2507", llm: Any = None, enable_tracing: bool = True, **kwargs):
|
||||
def __init__(self, user_id: str, agent_type: str, model_name: str = "Qwen/Qwen2.5-1.5B-Instruct", llm: Any = None, enable_tracing: bool = True, **kwargs):
|
||||
self.user_id = user_id
|
||||
self.agent_type = agent_type
|
||||
self.model_name = model_name
|
||||
@@ -295,7 +302,8 @@ class BaseALwrityAgent(ABC):
|
||||
Centralized method for all agents inheriting from BaseALwrityAgent.
|
||||
"""
|
||||
if not self.llm:
|
||||
return "[LLM Unavailable]"
|
||||
logger.error("LLM unavailable for agent %s (%s)", self.agent_type, self.agent_id)
|
||||
raise RuntimeError(f"LLM unavailable for agent {self.agent_type}")
|
||||
|
||||
try:
|
||||
# Run in executor to avoid blocking if LLM is synchronous
|
||||
@@ -319,7 +327,37 @@ class BaseALwrityAgent(ABC):
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"LLM generation failed in agent {self.agent_type}: {e}")
|
||||
return "[Generation Failed]"
|
||||
logger.warning(
|
||||
"Attempting remote low-cost fallback via llm_text_gen for agent %s (user=%s)",
|
||||
self.agent_type,
|
||||
self.user_id,
|
||||
)
|
||||
try:
|
||||
loop = asyncio.get_event_loop()
|
||||
fallback_response = await loop.run_in_executor(
|
||||
None,
|
||||
lambda: llm_text_gen(
|
||||
prompt=prompt,
|
||||
user_id=self.user_id,
|
||||
preferred_hf_models=LOW_COST_REMOTE_MODELS,
|
||||
),
|
||||
)
|
||||
logger.warning(
|
||||
"Remote low-cost fallback succeeded for agent %s (user=%s)",
|
||||
self.agent_type,
|
||||
self.user_id,
|
||||
)
|
||||
return fallback_response
|
||||
except Exception as remote_e:
|
||||
logger.error(
|
||||
"Remote fallback failed for agent %s (user=%s): %s",
|
||||
self.agent_type,
|
||||
self.user_id,
|
||||
remote_e,
|
||||
)
|
||||
raise RuntimeError(
|
||||
f"Local and remote LLM generation failed for agent {self.agent_type}: {remote_e}"
|
||||
) from remote_e
|
||||
|
||||
def _resolve_agent_key(self, agent_type: str) -> str:
|
||||
value = str(agent_type or "").strip()
|
||||
@@ -524,7 +562,7 @@ class BaseALwrityAgent(ABC):
|
||||
result = await loop.run_in_executor(None, self.txtai_agent, prompt)
|
||||
|
||||
if not self.txtai_agent:
|
||||
result = "Agent not initialized"
|
||||
raise RuntimeError(f"Agent {self.agent_id} not initialized (txtai_agent missing)")
|
||||
|
||||
if activity and run_record:
|
||||
activity.log_event(
|
||||
@@ -848,19 +886,15 @@ class BaseALwrityAgent(ABC):
|
||||
raise e
|
||||
|
||||
async def _execute_fallback(self, action: AgentAction) -> str:
|
||||
"""Execute fallback action when txtai is not available"""
|
||||
# Simulate agent processing for development
|
||||
logger.info(f"Executing fallback action: {action.action_type}")
|
||||
|
||||
# Return simulated result based on action type
|
||||
if action.action_type == "analyze_competitor":
|
||||
return "Competitor analysis completed (fallback mode)"
|
||||
elif action.action_type == "optimize_content":
|
||||
return "Content optimization completed (fallback mode)"
|
||||
elif action.action_type == "fix_seo_issue":
|
||||
return "SEO issue fixed (fallback mode)"
|
||||
else:
|
||||
return f"Action {action.action_type} completed (fallback mode)"
|
||||
"""Fail-fast instead of returning mock fallback output."""
|
||||
logger.error(
|
||||
"Fallback execution requested for action '%s' on agent %s. Failing fast to avoid mock output.",
|
||||
action.action_type,
|
||||
self.agent_id,
|
||||
)
|
||||
raise RuntimeError(
|
||||
f"Fallback execution is disabled for SIF reliability. Agent={self.agent_id}, action={action.action_type}"
|
||||
)
|
||||
|
||||
def _prepare_agent_prompt(self, action: AgentAction) -> str:
|
||||
"""Prepare prompt for txtai agent"""
|
||||
|
||||
@@ -29,7 +29,7 @@ except ImportError:
|
||||
logger.warning("txtai not available, using fallback implementation")
|
||||
|
||||
class SIFBaseAgent(BaseALwrityAgent):
|
||||
def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str, agent_type: str = "sif_agent", model_name: str = "Qwen/Qwen2.5-3B-Instruct", llm: Any = None, **kwargs):
|
||||
def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str, agent_type: str = "sif_agent", model_name: str = "Qwen/Qwen2.5-1.5B-Instruct", llm: Any = None, **kwargs):
|
||||
# Hybrid LLM Strategy:
|
||||
# 1. Shared LLM for external/high-quality generation
|
||||
self.shared_llm = SharedLLMWrapper(user_id)
|
||||
|
||||
@@ -44,6 +44,12 @@ class SharedLLMWrapper:
|
||||
|
||||
_local_llm_cache = {}
|
||||
|
||||
LOCAL_LLM_FALLBACKS = [
|
||||
"Qwen/Qwen2.5-1.5B-Instruct",
|
||||
"Qwen/Qwen2.5-0.5B-Instruct",
|
||||
"TinyLlama/TinyLlama-1.1B-Chat-v1.0",
|
||||
]
|
||||
|
||||
class LocalLLMWrapper:
|
||||
"""
|
||||
Lazily loads a local LLM via txtai and caches it globally.
|
||||
@@ -72,22 +78,56 @@ class LocalLLMWrapper:
|
||||
if task_to_use == "text-generation":
|
||||
task_to_use = "language-generation"
|
||||
|
||||
logger.info(f"Loading local LLM (singleton): {self.model_path} (task={task_to_use})")
|
||||
try:
|
||||
_local_llm_cache[cache_key] = LLM(path=self.model_path, task=task_to_use)
|
||||
except Exception as e:
|
||||
candidate_models = []
|
||||
for candidate in [self.model_path, *LOCAL_LLM_FALLBACKS]:
|
||||
if candidate not in candidate_models:
|
||||
candidate_models.append(candidate)
|
||||
|
||||
last_error = None
|
||||
for candidate_model in candidate_models:
|
||||
candidate_key = f"{candidate_model}:{self.task}"
|
||||
if candidate_key in _local_llm_cache:
|
||||
if candidate_model != self.model_path:
|
||||
logger.warning(f"Using cached fallback local LLM model: {candidate_model}")
|
||||
return _local_llm_cache[candidate_key]
|
||||
|
||||
logger.info(f"Loading local LLM (singleton): {candidate_model} (task={task_to_use})")
|
||||
try:
|
||||
import transformers
|
||||
from transformers.pipelines import SUPPORTED_TASKS
|
||||
logger.error(
|
||||
f"LocalLLMWrapper init failed (model={self.model_path}, requested_task={task_to_use}, "
|
||||
f"transformers={getattr(transformers, '__version__', 'unknown')}, "
|
||||
f"supported_tasks={sorted(list(SUPPORTED_TASKS.keys()))[:50]})"
|
||||
_local_llm_cache[candidate_key] = LLM(path=candidate_model, task=task_to_use)
|
||||
if candidate_model != self.model_path:
|
||||
logger.warning(
|
||||
f"Loaded fallback local LLM model '{candidate_model}' after failure on '{self.model_path}'"
|
||||
)
|
||||
return _local_llm_cache[candidate_key]
|
||||
except Exception as e:
|
||||
last_error = e
|
||||
message = str(e).lower()
|
||||
is_memory_issue = (
|
||||
"paging file is too small" in message
|
||||
or "os error 1455" in message
|
||||
or "out of memory" in message
|
||||
or "not enough memory" in message
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
logger.error(f"Failed to initialize LocalLLMWrapper: {e}")
|
||||
raise e
|
||||
if is_memory_issue:
|
||||
logger.warning(
|
||||
f"Local LLM memory load failure for '{candidate_model}', trying smaller fallback. Error: {e}"
|
||||
)
|
||||
continue
|
||||
logger.warning(f"Local LLM load failed for '{candidate_model}', trying next fallback. Error: {e}")
|
||||
continue
|
||||
|
||||
try:
|
||||
import transformers
|
||||
from transformers.pipelines import SUPPORTED_TASKS
|
||||
logger.error(
|
||||
f"LocalLLMWrapper init failed (model={self.model_path}, requested_task={task_to_use}, "
|
||||
f"transformers={getattr(transformers, '__version__', 'unknown')}, "
|
||||
f"supported_tasks={sorted(list(SUPPORTED_TASKS.keys()))[:50]})"
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
logger.error(f"Failed to initialize LocalLLMWrapper after fallback attempts: {last_error}")
|
||||
raise last_error
|
||||
|
||||
return _local_llm_cache[cache_key]
|
||||
|
||||
@@ -98,7 +138,7 @@ class LocalLLMWrapper:
|
||||
return self.llm(prompt, **kwargs)
|
||||
|
||||
class SIFBaseAgent(BaseALwrityAgent):
|
||||
def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str, agent_type: str = "sif_agent", model_name: str = "Qwen/Qwen2.5-3B-Instruct", llm: Any = None):
|
||||
def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str, agent_type: str = "sif_agent", model_name: str = "Qwen/Qwen2.5-1.5B-Instruct", llm: Any = None):
|
||||
# Hybrid LLM Strategy:
|
||||
# 1. Shared LLM for external/high-quality generation (available to all agents)
|
||||
self.shared_llm = SharedLLMWrapper(user_id)
|
||||
|
||||
@@ -45,6 +45,7 @@ class TxtaiIntelligenceService:
|
||||
self.enable_caching = enable_caching
|
||||
self.cache_manager = semantic_cache_manager if enable_caching else None
|
||||
self._backend = "faiss" # Default backend
|
||||
self.fail_fast = str(os.getenv("SIF_FAIL_FAST", "true")).lower() in {"1", "true", "yes", "on"}
|
||||
|
||||
# Mark as initialized for singleton pattern
|
||||
self._singleton_initialized = True
|
||||
@@ -143,7 +144,10 @@ class TxtaiIntelligenceService:
|
||||
"""
|
||||
self._ensure_initialized()
|
||||
if not self._initialized or not self.embeddings:
|
||||
logger.error(f"Cannot index content - service not initialized for user {self.user_id}")
|
||||
message = f"Cannot index content - service not initialized for user {self.user_id}"
|
||||
logger.error(message)
|
||||
if self.fail_fast:
|
||||
raise RuntimeError(message)
|
||||
return
|
||||
|
||||
try:
|
||||
@@ -190,7 +194,10 @@ class TxtaiIntelligenceService:
|
||||
"""Perform semantic search with intelligent caching."""
|
||||
self._ensure_initialized()
|
||||
if not self._initialized or not self.embeddings:
|
||||
logger.error(f"Cannot perform search - service not initialized for user {self.user_id}")
|
||||
message = f"Cannot perform search - service not initialized for user {self.user_id}"
|
||||
logger.error(message)
|
||||
if self.fail_fast:
|
||||
raise RuntimeError(message)
|
||||
return []
|
||||
|
||||
try:
|
||||
@@ -238,6 +245,8 @@ class TxtaiIntelligenceService:
|
||||
return results
|
||||
except Exception as e:
|
||||
logger.error(f"Search failed for user {self.user_id}: {e}")
|
||||
if self.fail_fast:
|
||||
raise
|
||||
logger.error(f"Query: '{query}'")
|
||||
logger.error(f"Full traceback: {traceback.format_exc()}")
|
||||
return []
|
||||
|
||||
Reference in New Issue
Block a user