Add Step 5 flat context and txtai file tools for agents
This commit is contained in:
@@ -13,12 +13,13 @@ from sqlalchemy import select, desc
|
||||
import json
|
||||
|
||||
from services.database import get_session_for_user
|
||||
from models.onboarding import WebsiteAnalysis, OnboardingSession, CompetitorAnalysis
|
||||
from models.onboarding import WebsiteAnalysis, OnboardingSession, CompetitorAnalysis, ResearchPreferences, PersonaData
|
||||
|
||||
# Import existing SIF components
|
||||
from .txtai_service import TxtaiIntelligenceService
|
||||
from .semantic_cache import semantic_cache_manager, SemanticCacheStats
|
||||
from services.intelligence.harvester import SemanticHarvesterService
|
||||
from services.intelligence.agent_flat_context import AgentFlatContextStore
|
||||
|
||||
|
||||
class SIFIntegrationService:
|
||||
@@ -61,6 +62,284 @@ class SIFIntegrationService:
|
||||
)
|
||||
return self.trend_surfer_agent
|
||||
|
||||
|
||||
async def get_step2_website_context(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Retrieve onboarding step 2 website context with a strict fallback chain:
|
||||
flat file -> database -> SIF semantic index.
|
||||
"""
|
||||
# 1) Fastest: flat-file agent context
|
||||
try:
|
||||
flat_doc = AgentFlatContextStore(self.user_id).load_step2_context_document()
|
||||
if flat_doc:
|
||||
return {
|
||||
"source": "flat_file",
|
||||
"data": flat_doc.get("data") or {},
|
||||
"agent_summary": flat_doc.get("agent_summary") or {},
|
||||
"document_context": flat_doc.get("document_context") or {},
|
||||
"meta": flat_doc.get("meta") or {},
|
||||
"updated_at": flat_doc.get("updated_at"),
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning(f"Flat context lookup failed for user {self.user_id}: {e}")
|
||||
|
||||
# 2) Database fallback
|
||||
db = None
|
||||
try:
|
||||
db = get_session_for_user(self.user_id)
|
||||
if db:
|
||||
stmt = (
|
||||
select(WebsiteAnalysis)
|
||||
.join(OnboardingSession, WebsiteAnalysis.session_id == OnboardingSession.id)
|
||||
.where(OnboardingSession.user_id == self.user_id)
|
||||
.order_by(desc(WebsiteAnalysis.updated_at))
|
||||
)
|
||||
row = db.execute(stmt).scalars().first()
|
||||
if row:
|
||||
payload = row.to_dict() if hasattr(row, "to_dict") else {}
|
||||
return {
|
||||
"source": "database",
|
||||
"data": payload,
|
||||
"agent_summary": {
|
||||
"quick_facts": {
|
||||
"website_url": payload.get("website_url"),
|
||||
"brand_voice": (payload.get("brand_analysis") or {}).get("brand_voice") if isinstance(payload.get("brand_analysis"), dict) else "",
|
||||
}
|
||||
},
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning(f"Database fallback failed for user {self.user_id}: {e}")
|
||||
finally:
|
||||
if db:
|
||||
db.close()
|
||||
|
||||
# 3) Semantic fallback
|
||||
try:
|
||||
results = await self.intelligence_service.search("website analysis brand voice style", limit=1)
|
||||
if results:
|
||||
top = results[0]
|
||||
metadata = top.get("object") if isinstance(top, dict) else None
|
||||
if isinstance(metadata, str):
|
||||
try:
|
||||
metadata = json.loads(metadata)
|
||||
except Exception:
|
||||
metadata = {}
|
||||
if isinstance(metadata, dict):
|
||||
report = metadata.get("full_report") if isinstance(metadata.get("full_report"), dict) else metadata
|
||||
return {
|
||||
"source": "sif_semantic",
|
||||
"data": report,
|
||||
"agent_summary": {
|
||||
"quick_facts": {
|
||||
"website_url": report.get("website_url") if isinstance(report, dict) else None,
|
||||
}
|
||||
},
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning(f"SIF semantic fallback failed for user {self.user_id}: {e}")
|
||||
|
||||
return {"source": "none", "data": {}}
|
||||
|
||||
async def get_step3_research_context(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Retrieve onboarding step 3 research context with fallback chain:
|
||||
flat file -> database -> SIF semantic index.
|
||||
"""
|
||||
try:
|
||||
flat_doc = AgentFlatContextStore(self.user_id).load_step3_context_document()
|
||||
if flat_doc:
|
||||
return {
|
||||
"source": "flat_file",
|
||||
"data": flat_doc.get("data") or {},
|
||||
"agent_summary": flat_doc.get("agent_summary") or {},
|
||||
"document_context": flat_doc.get("document_context") or {},
|
||||
"meta": flat_doc.get("meta") or {},
|
||||
"updated_at": flat_doc.get("updated_at"),
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning(f"Step 3 flat context lookup failed for user {self.user_id}: {e}")
|
||||
|
||||
db = None
|
||||
try:
|
||||
db = get_session_for_user(self.user_id)
|
||||
if db:
|
||||
stmt = (
|
||||
select(ResearchPreferences)
|
||||
.join(OnboardingSession, ResearchPreferences.session_id == OnboardingSession.id)
|
||||
.where(OnboardingSession.user_id == self.user_id)
|
||||
.order_by(desc(ResearchPreferences.updated_at))
|
||||
)
|
||||
prefs = db.execute(stmt).scalars().first()
|
||||
if prefs:
|
||||
payload = prefs.to_dict() if hasattr(prefs, "to_dict") else {}
|
||||
return {
|
||||
"source": "database",
|
||||
"data": payload,
|
||||
"agent_summary": {
|
||||
"quick_facts": {
|
||||
"research_depth": payload.get("research_depth"),
|
||||
"content_types_count": len(payload.get("content_types") or []),
|
||||
}
|
||||
},
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning(f"Step 3 database fallback failed for user {self.user_id}: {e}")
|
||||
finally:
|
||||
if db:
|
||||
db.close()
|
||||
|
||||
try:
|
||||
results = await self.intelligence_service.search("research preferences competitors onboarding step 3", limit=1)
|
||||
if results:
|
||||
top = results[0]
|
||||
metadata = top.get("object") if isinstance(top, dict) else None
|
||||
if isinstance(metadata, str):
|
||||
try:
|
||||
metadata = json.loads(metadata)
|
||||
except Exception:
|
||||
metadata = {}
|
||||
report = metadata.get("full_report") if isinstance(metadata, dict) and isinstance(metadata.get("full_report"), dict) else (metadata if isinstance(metadata, dict) else {})
|
||||
return {
|
||||
"source": "sif_semantic",
|
||||
"data": report,
|
||||
"agent_summary": {
|
||||
"quick_facts": {
|
||||
"research_depth": report.get("research_depth") if isinstance(report, dict) else None,
|
||||
}
|
||||
},
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning(f"Step 3 semantic fallback failed for user {self.user_id}: {e}")
|
||||
|
||||
return {"source": "none", "data": {}}
|
||||
|
||||
async def get_step4_persona_context(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Retrieve onboarding step 4 persona context with fallback chain:
|
||||
flat file -> database -> SIF semantic index.
|
||||
"""
|
||||
try:
|
||||
flat_doc = AgentFlatContextStore(self.user_id).load_step4_context_document()
|
||||
if flat_doc:
|
||||
return {
|
||||
"source": "flat_file",
|
||||
"data": flat_doc.get("data") or {},
|
||||
"agent_summary": flat_doc.get("agent_summary") or {},
|
||||
"document_context": flat_doc.get("document_context") or {},
|
||||
"meta": flat_doc.get("meta") or {},
|
||||
"updated_at": flat_doc.get("updated_at"),
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning(f"Step 4 flat context lookup failed for user {self.user_id}: {e}")
|
||||
|
||||
db = None
|
||||
try:
|
||||
db = get_session_for_user(self.user_id)
|
||||
if db:
|
||||
stmt = (
|
||||
select(PersonaData)
|
||||
.join(OnboardingSession, PersonaData.session_id == OnboardingSession.id)
|
||||
.where(OnboardingSession.user_id == self.user_id)
|
||||
.order_by(desc(PersonaData.updated_at))
|
||||
)
|
||||
persona = db.execute(stmt).scalars().first()
|
||||
if persona:
|
||||
payload = persona.to_dict() if hasattr(persona, "to_dict") else {}
|
||||
return {
|
||||
"source": "database",
|
||||
"data": payload,
|
||||
"agent_summary": {
|
||||
"quick_facts": {
|
||||
"selected_platforms_count": len(payload.get("selected_platforms") or []),
|
||||
"has_core_persona": bool(payload.get("core_persona")),
|
||||
}
|
||||
},
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning(f"Step 4 database fallback failed for user {self.user_id}: {e}")
|
||||
finally:
|
||||
if db:
|
||||
db.close()
|
||||
|
||||
try:
|
||||
results = await self.intelligence_service.search("persona platform personas onboarding step 4", limit=1)
|
||||
if results:
|
||||
top = results[0]
|
||||
metadata = top.get("object") if isinstance(top, dict) else None
|
||||
if isinstance(metadata, str):
|
||||
try:
|
||||
metadata = json.loads(metadata)
|
||||
except Exception:
|
||||
metadata = {}
|
||||
report = metadata.get("full_report") if isinstance(metadata, dict) and isinstance(metadata.get("full_report"), dict) else (metadata if isinstance(metadata, dict) else {})
|
||||
return {
|
||||
"source": "sif_semantic",
|
||||
"data": report,
|
||||
"agent_summary": {
|
||||
"quick_facts": {
|
||||
"has_core_persona": bool(report.get("core_persona")) if isinstance(report, dict) else False,
|
||||
}
|
||||
},
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning(f"Step 4 semantic fallback failed for user {self.user_id}: {e}")
|
||||
|
||||
return {"source": "none", "data": {}}
|
||||
|
||||
async def get_step5_integrations_context(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Retrieve onboarding step 5 integrations context with fallback chain:
|
||||
flat file -> SIF semantic index.
|
||||
"""
|
||||
try:
|
||||
flat_doc = AgentFlatContextStore(self.user_id).load_step5_context_document()
|
||||
if flat_doc:
|
||||
return {
|
||||
"source": "flat_file",
|
||||
"data": flat_doc.get("data") or {},
|
||||
"agent_summary": flat_doc.get("agent_summary") or {},
|
||||
"document_context": flat_doc.get("document_context") or {},
|
||||
"meta": flat_doc.get("meta") or {},
|
||||
"updated_at": flat_doc.get("updated_at"),
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning(f"Step 5 flat context lookup failed for user {self.user_id}: {e}")
|
||||
|
||||
try:
|
||||
results = await self.intelligence_service.search("integrations onboarding step 5 connected providers", limit=1)
|
||||
if results:
|
||||
top = results[0]
|
||||
metadata = top.get("object") if isinstance(top, dict) else None
|
||||
if isinstance(metadata, str):
|
||||
try:
|
||||
metadata = json.loads(metadata)
|
||||
except Exception:
|
||||
metadata = {}
|
||||
report = metadata.get("full_report") if isinstance(metadata, dict) and isinstance(metadata.get("full_report"), dict) else (metadata if isinstance(metadata, dict) else {})
|
||||
return {
|
||||
"source": "sif_semantic",
|
||||
"data": report,
|
||||
"agent_summary": {
|
||||
"quick_facts": {
|
||||
"connected_integrations_count": len((report.get("integrations") or {})) if isinstance(report, dict) and isinstance(report.get("integrations"), dict) else None,
|
||||
}
|
||||
},
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning(f"Step 5 semantic fallback failed for user {self.user_id}: {e}")
|
||||
|
||||
return {"source": "none", "data": {}}
|
||||
|
||||
async def get_flat_context_manifest(self) -> Dict[str, Any]:
|
||||
"""Return lightweight manifest of available flat context documents for this user."""
|
||||
try:
|
||||
manifest = AgentFlatContextStore(self.user_id).load_context_manifest()
|
||||
if manifest:
|
||||
return {"source": "flat_file", "data": manifest}
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to load flat context manifest for user {self.user_id}: {e}")
|
||||
return {"source": "none", "data": {"documents": []}}
|
||||
|
||||
async def index_market_trends_run(self, trends_result: Dict[str, Any], run_id: str) -> bool:
|
||||
try:
|
||||
latest_id = f"market_trends_latest:{self.user_id}"
|
||||
|
||||
Reference in New Issue
Block a user