Merge_PR_408_flat_context_and_txtai_file_tools

This commit is contained in:
ajaysi
2026-03-12 15:29:08 +05:30
13 changed files with 1480 additions and 7 deletions

View File

@@ -13,6 +13,7 @@ from sqlalchemy.exc import SQLAlchemyError
from api.content_planning.services.content_strategy.onboarding import OnboardingDataIntegrationService
from services.database import get_db
from models.onboarding import OnboardingSession, APIKey, WebsiteAnalysis, ResearchPreferences, PersonaData, CompetitorAnalysis
from services.intelligence.agent_flat_context import AgentFlatContextStore
class StepManagementService:
"""Service for handling onboarding step management."""
@@ -62,6 +63,7 @@ class StepManagementService:
db.add(new_key)
db.commit()
return True
except Exception as e:
logger.error(f"Error saving API key for user {user_id}: {e}")
@@ -139,6 +141,39 @@ class StepManagementService:
db.add(new_analysis)
db.commit()
# Persist Step 2 snapshot to agent flat-file context for ultra-fast reads
try:
flat_store = AgentFlatContextStore(user_id)
canonical_payload = {
"website_url": filtered_data.get("website_url") or incoming.get("website") or incoming.get("website_url"),
"analysis_date": datetime.utcnow().isoformat(),
"status": (nested or incoming).get("status") or "completed",
"error_message": (nested or incoming).get("error_message"),
"warning_message": (nested or incoming).get("warning_message"),
"writing_style": filtered_data.get("writing_style"),
"content_characteristics": filtered_data.get("content_characteristics"),
"target_audience": filtered_data.get("target_audience"),
"content_type": filtered_data.get("content_type"),
"recommended_settings": filtered_data.get("recommended_settings"),
"brand_analysis": filtered_data.get("brand_analysis"),
"content_strategy_insights": filtered_data.get("content_strategy_insights"),
"social_media_presence": filtered_data.get("social_media_presence"),
"style_patterns": filtered_data.get("style_patterns"),
"style_guidelines": filtered_data.get("style_guidelines"),
"seo_audit": filtered_data.get("seo_audit"),
"strategic_insights_history": (nested or incoming).get("strategic_insights_history"),
"crawl_result": filtered_data.get("crawl_result"),
"meta_info": meta_info,
"sitemap_analysis": sitemap_analysis,
"raw_step2_payload": incoming,
"raw_analysis_payload": nested or incoming,
"saved_at": datetime.utcnow().isoformat(),
}
flat_store.save_step2_website_analysis(canonical_payload, source="onboarding_step2")
except Exception as flat_err:
logger.warning(f"Failed to persist step 2 flat context for user {user_id}: {flat_err}")
return True
except Exception as e:
logger.error(f"Error saving website analysis for user {user_id}: {e}")
@@ -193,6 +228,28 @@ class StepManagementService:
db.add(new_prefs)
db.commit()
# Persist Step 3 snapshot to agent flat-file context
try:
flat_store = AgentFlatContextStore(user_id)
canonical_payload = {
"research_depth": research_data.get("research_depth"),
"content_types": research_data.get("content_types") or [],
"auto_research": research_data.get("auto_research", True),
"factual_content": research_data.get("factual_content", True),
"writing_style": research_data.get("writing_style") or {},
"content_characteristics": research_data.get("content_characteristics") or {},
"target_audience": research_data.get("target_audience") or {},
"recommended_settings": research_data.get("recommended_settings") or {},
"industry_context": research_data.get("industry_context") or research_data.get("industryContext"),
"competitors": research_data.get("competitors") if isinstance(research_data.get("competitors"), list) else [],
"saved_at": datetime.utcnow().isoformat(),
"source_payload": research_data,
}
flat_store.save_step3_research_preferences(canonical_payload, source="onboarding_step3")
except Exception as flat_err:
logger.warning(f"Failed to persist step 3 flat context for user {user_id}: {flat_err}")
return True
except Exception as e:
logger.error(f"Error saving research preferences for user {user_id}: {e}")
@@ -268,6 +325,22 @@ class StepManagementService:
db.commit()
logger.info(f"✅ Saved {saved_count} competitors ({failed_count} failed)")
# Refresh Step 3 flat context with competitor details saved by this flow
try:
flat_store = AgentFlatContextStore(user_id)
existing_doc = flat_store.load_step3_context_document() or {}
existing_data = existing_doc.get("data") if isinstance(existing_doc, dict) and isinstance(existing_doc.get("data"), dict) else {}
merged_payload = {
**existing_data,
"competitors": competitors,
"industry_context": industry_context or existing_data.get("industry_context"),
"competitors_saved_at": datetime.utcnow().isoformat(),
}
flat_store.save_step3_research_preferences(merged_payload, source="onboarding_step3_competitors")
except Exception as flat_err:
logger.warning(f"Failed to refresh step 3 competitor flat context for user {user_id}: {flat_err}")
return True
except Exception as e:
logger.error(f"Error saving competitor analysis for user {user_id}: {e}")
@@ -275,6 +348,25 @@ class StepManagementService:
raise e
def _save_step5_integrations_context(self, user_id: str, step5_data: Dict[str, Any]) -> bool:
"""Persist Step 5 integrations context to flat-file store."""
try:
flat_store = AgentFlatContextStore(user_id)
canonical_payload = {
"integrations": step5_data.get("integrations") if isinstance(step5_data.get("integrations"), dict) else {},
"providers": step5_data.get("providers") if isinstance(step5_data.get("providers"), list) else [],
"connected_accounts": step5_data.get("connectedAccounts") if isinstance(step5_data.get("connectedAccounts"), list) else [],
"integration_status": step5_data.get("status") or step5_data.get("integrationStatus"),
"notes": step5_data.get("notes") or step5_data.get("integrationNotes"),
"saved_at": datetime.utcnow().isoformat(),
"source_payload": step5_data,
}
return flat_store.save_step5_integrations(canonical_payload, source="onboarding_step5")
except Exception as e:
logger.warning(f"Failed to save Step 5 integrations context for user {user_id}: {e}")
return False
def _save_persona_data(self, user_id: str, persona_data: Dict[str, Any], db: Session) -> bool:
"""Save persona data directly to database."""
try:
@@ -301,6 +393,24 @@ class StepManagementService:
db.add(persona)
db.commit()
# Persist Step 4 snapshot to agent flat-file context
try:
flat_store = AgentFlatContextStore(user_id)
canonical_payload = {
"core_persona": persona_data.get("corePersona") or {},
"platform_personas": persona_data.get("platformPersonas") or {},
"quality_metrics": persona_data.get("qualityMetrics") or {},
"selected_platforms": persona_data.get("selectedPlatforms", []),
"research_persona": persona_data.get("researchPersona") or persona_data.get("research_persona"),
"persona_generation_notes": persona_data.get("personaGenerationNotes") or persona_data.get("persona_generation_notes"),
"saved_at": datetime.utcnow().isoformat(),
"source_payload": persona_data,
}
flat_store.save_step4_persona_data(canonical_payload, source="onboarding_step4")
except Exception as flat_err:
logger.warning(f"Failed to persist step 4 flat context for user {user_id}: {flat_err}")
return True
except Exception as e:
logger.error(f"Error saving persona data for user {user_id}: {e}")
@@ -635,6 +745,19 @@ class StepManagementService:
detail="Failed to save persona data. Onboarding cannot proceed until this is resolved."
) from e
# Step 5: Save integrations data to flat context
elif step_number == 5 and request_data:
step5_data = request_data.get('data') or request_data
logger.info(f"🔍 Step 5: Raw request_data keys: {list(request_data.keys()) if request_data else 'None'}")
logger.info(f"🔍 Step 5: Extracted step5_data keys: {list(step5_data.keys()) if step5_data else 'None'}")
if step5_data:
saved = self._save_step5_integrations_context(user_id, step5_data)
if saved:
logger.info(f"✅ Saved Step 5 integrations context for user {user_id}")
else:
logger.warning(f"⚠️ Step 5 integrations context not persisted for user {user_id}")
# Persist current step and progress in DB
from services.onboarding.progress_service import OnboardingProgressService
progress_service = OnboardingProgressService()

View File

@@ -0,0 +1,528 @@
"""Flat-file context storage for AI agents.
Stores onboarding context in per-user workspace files, optimized for fast agent reads.
Includes minimal security hardening, context-size controls, and internal document linking.
"""
from __future__ import annotations
import json
import os
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
from loguru import logger
class AgentFlatContextStore:
"""Read/write agent-only flat-file context in per-user workspace."""
CONTEXT_DIRNAME = "agent_context"
STEP2_FILENAME = "step2_website_analysis.json"
STEP3_FILENAME = "step3_research_preferences.json"
STEP4_FILENAME = "step4_persona_data.json"
STEP5_FILENAME = "step5_integrations.json"
MANIFEST_FILENAME = "context_manifest.json"
SCHEMA_VERSION = "1.3"
DEFAULT_MAX_BYTES = 300_000
SUMMARY_TEXT_LIMIT = 800
def __init__(self, user_id: str):
self.user_id = user_id
self.safe_user_id = self._sanitize_user_id(user_id)
@staticmethod
def _sanitize_user_id(user_id: str) -> str:
safe = "".join(c for c in str(user_id) if c.isalnum() or c in ("-", "_"))
return safe or "unknown_user"
def _workspace_dir(self) -> Path:
root_dir = Path(__file__).resolve().parents[3]
return root_dir / "workspace" / f"workspace_{self.safe_user_id}"
def _context_dir(self) -> Path:
return self._workspace_dir() / self.CONTEXT_DIRNAME
def _context_file(self, filename: str) -> Path:
return self._context_dir() / filename
@staticmethod
def _estimate_size_bytes(value: Any) -> int:
try:
return len(json.dumps(value, ensure_ascii=False).encode("utf-8"))
except Exception:
return 0
@staticmethod
def _to_context_list(value: Any) -> Any:
if value is None:
return []
if isinstance(value, list):
return value
if isinstance(value, dict):
return list(value.keys())
return [str(value)]
@staticmethod
def _truncate_text(value: Any, max_chars: int = SUMMARY_TEXT_LIMIT) -> str:
text = value if isinstance(value, str) else ""
if len(text) <= max_chars:
return text
return f"{text[:max_chars]}..."
@staticmethod
def _redact_sensitive(data: Any) -> Any:
"""Minimal recursive redaction for sensitive-like keys in payload snapshots."""
sensitive_tokens = {"api_key", "token", "secret", "password", "authorization", "cookie"}
if isinstance(data, dict):
redacted = {}
for k, v in data.items():
key_lower = str(k).lower()
if any(token in key_lower for token in sensitive_tokens):
redacted[k] = "[REDACTED]"
else:
redacted[k] = AgentFlatContextStore._redact_sensitive(v)
return redacted
if isinstance(data, list):
return [AgentFlatContextStore._redact_sensitive(v) for v in data]
return data
def _related_documents(self, context_type: str) -> list:
if context_type == "onboarding_step2_website_analysis":
return [
{"type": "onboarding_step3_research_preferences", "path": self.STEP3_FILENAME, "relationship": "next_step"},
{"type": "onboarding_step4_persona_data", "path": self.STEP4_FILENAME, "relationship": "future_dependency"},
{"type": "onboarding_step5_integrations", "path": self.STEP5_FILENAME, "relationship": "future_dependency"},
]
if context_type == "onboarding_step3_research_preferences":
return [
{"type": "onboarding_step2_website_analysis", "path": self.STEP2_FILENAME, "relationship": "previous_step"},
{"type": "onboarding_step4_persona_data", "path": self.STEP4_FILENAME, "relationship": "next_step"},
{"type": "onboarding_step5_integrations", "path": self.STEP5_FILENAME, "relationship": "future_dependency"},
]
if context_type == "onboarding_step4_persona_data":
return [
{"type": "onboarding_step3_research_preferences", "path": self.STEP3_FILENAME, "relationship": "previous_step"},
{"type": "onboarding_step2_website_analysis", "path": self.STEP2_FILENAME, "relationship": "upstream_context"},
{"type": "onboarding_step5_integrations", "path": self.STEP5_FILENAME, "relationship": "next_step"},
]
if context_type == "onboarding_step5_integrations":
return [
{"type": "onboarding_step4_persona_data", "path": self.STEP4_FILENAME, "relationship": "previous_step"},
{"type": "onboarding_step3_research_preferences", "path": self.STEP3_FILENAME, "relationship": "upstream_context"},
]
return []
def _build_document_context(
self,
*,
context_type: str,
source: str,
journey_stage: str,
fallback_order: list,
payload_size: int,
summary_size: int,
payload_within_budget: bool,
) -> Dict[str, Any]:
total_size = payload_size + summary_size
return {
"audience": "ai_agents",
"purpose": "fast_context_retrieval",
"context_type": context_type,
"source": source,
"tenant": {"user_id_safe": self.safe_user_id, "isolation_scope": "workspace_user"},
"journey": {
"stage": journey_stage,
"user_action": "onboarding",
"agent_expectation": "read_summary_first_then_expand",
},
"retrieval_contract": {
"preferred": "flat_file",
"fallback_order": fallback_order,
},
"context_window_guidance": {
"max_raw_bytes": self.DEFAULT_MAX_BYTES,
"total_bytes": total_size,
"raw_document_within_budget": payload_within_budget,
"agent_policy": "Use agent_summary first; open full data only for specialist tasks",
},
"related_documents": self._related_documents(context_type),
}
def _build_step2_summary(self, payload: Dict[str, Any]) -> Dict[str, Any]:
seo_audit = payload.get("seo_audit") if isinstance(payload.get("seo_audit"), dict) else {}
brand = payload.get("brand_analysis") if isinstance(payload.get("brand_analysis"), dict) else {}
rec_settings = payload.get("recommended_settings") if isinstance(payload.get("recommended_settings"), dict) else {}
target_audience = payload.get("target_audience") if isinstance(payload.get("target_audience"), dict) else {}
social = payload.get("social_media_presence") if isinstance(payload.get("social_media_presence"), dict) else {}
technical_issues = self._to_context_list(seo_audit.get("technical_issues"))
recommendations = self._to_context_list(seo_audit.get("recommendations"))
quick_facts = {
"website_url": payload.get("website_url") or "",
"brand_voice": brand.get("brand_voice") or "",
"industry": brand.get("industry") or "",
"target_segment": target_audience.get("primary_audience") or target_audience.get("audience_type") or "",
"writing_tone": rec_settings.get("writing_tone") or "",
"primary_content_type": (payload.get("content_type") or {}).get("primary_type") if isinstance(payload.get("content_type"), dict) else "",
"social_platforms": sorted(list(social.keys())),
"seo_issue_count": len(technical_issues),
"seo_recommendation_count": len(recommendations),
}
return {
"quick_facts": quick_facts,
"retrieval_hints": {
"high_signal_terms": [
term
for term in [
quick_facts.get("brand_voice"),
quick_facts.get("industry"),
quick_facts.get("writing_tone"),
quick_facts.get("primary_content_type"),
]
if term
],
"agent_queries": [
"brand voice guidelines",
"website style patterns",
"seo technical issues",
"content strategy opportunities",
"target audience profile",
],
},
"profile": {
"writing_style": payload.get("writing_style") or {},
"style_patterns": payload.get("style_patterns") or {},
"style_guidelines": payload.get("style_guidelines") or {},
"recommended_settings": rec_settings,
"target_audience": target_audience,
},
"seo_focus": {
"technical_issues": technical_issues,
"recommendations": recommendations,
},
}
def _build_step3_summary(self, payload: Dict[str, Any]) -> Dict[str, Any]:
competitors = payload.get("competitors") if isinstance(payload.get("competitors"), list) else []
domains = []
for comp in competitors[:20]:
if isinstance(comp, dict):
dom = comp.get("domain") or comp.get("url")
if dom:
domains.append(str(dom))
research_depth = payload.get("research_depth") or ""
content_types = payload.get("content_types") if isinstance(payload.get("content_types"), list) else []
industry_context = self._truncate_text(payload.get("industry_context") or payload.get("industryContext") or "", 500)
return {
"quick_facts": {
"research_depth": research_depth,
"content_types": content_types,
"auto_research": bool(payload.get("auto_research", True)),
"factual_content": bool(payload.get("factual_content", True)),
"competitor_count": len(competitors),
},
"retrieval_hints": {
"high_signal_terms": [research_depth, *content_types[:5]],
"agent_queries": [
"competitor landscape summary",
"content opportunities by competitor",
"research depth preferences",
"factual content constraints",
],
},
"competitor_focus": {
"top_competitor_domains": domains[:10],
"industry_context": industry_context,
},
}
def _build_step4_summary(self, payload: Dict[str, Any]) -> Dict[str, Any]:
core_persona = payload.get("core_persona") if isinstance(payload.get("core_persona"), dict) else {}
platform_personas = payload.get("platform_personas") if isinstance(payload.get("platform_personas"), dict) else {}
quality_metrics = payload.get("quality_metrics") if isinstance(payload.get("quality_metrics"), dict) else {}
selected_platforms = payload.get("selected_platforms") if isinstance(payload.get("selected_platforms"), list) else []
persona_name = core_persona.get("name") or core_persona.get("persona_name") or ""
primary_goal = self._truncate_text(core_persona.get("primary_goal") or core_persona.get("goal") or "", 250)
return {
"quick_facts": {
"persona_name": persona_name,
"selected_platforms": selected_platforms,
"platform_persona_count": len(platform_personas.keys()) if isinstance(platform_personas, dict) else 0,
"has_research_persona": bool(payload.get("research_persona")),
},
"retrieval_hints": {
"high_signal_terms": [persona_name, *selected_platforms[:5]],
"agent_queries": [
"core persona profile",
"platform persona adaptations",
"persona quality metrics",
"research persona defaults",
],
},
"persona_focus": {
"primary_goal": primary_goal,
"core_persona": core_persona,
"quality_metrics": quality_metrics,
},
}
def _build_step5_summary(self, payload: Dict[str, Any]) -> Dict[str, Any]:
integrations = payload.get("integrations") if isinstance(payload.get("integrations"), dict) else {}
providers = payload.get("providers") if isinstance(payload.get("providers"), list) else []
connected = [k for k, v in integrations.items() if bool(v)]
notes = self._truncate_text(payload.get("notes") or payload.get("integration_notes") or "", 300)
return {
"quick_facts": {
"connected_integrations_count": len(connected),
"connected_integrations": connected[:20],
"providers_count": len(providers),
},
"retrieval_hints": {
"high_signal_terms": connected[:5],
"agent_queries": [
"integration readiness",
"connected providers summary",
"missing integration dependencies",
],
},
"integration_focus": {
"notes": notes,
"integrations": integrations,
},
}
def _shrink_payload_if_needed(self, payload: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""Keep payload under budget by trimming heavy optional sections first."""
payload = self._redact_sensitive(payload if isinstance(payload, dict) else {})
original_size = self._estimate_size_bytes(payload)
trim_info = {"trimmed": False, "original_size_bytes": original_size, "trimmed_fields": []}
if original_size <= self.DEFAULT_MAX_BYTES:
return payload, trim_info
candidates = [
"raw_step2_payload",
"raw_analysis_payload",
"source_payload",
"crawl_result",
"competitors",
"strategic_insights_history",
"seo_audit",
]
mutable = dict(payload)
for field in candidates:
if self._estimate_size_bytes(mutable) <= self.DEFAULT_MAX_BYTES:
break
if field in mutable:
value = mutable.get(field)
if field == "competitors" and isinstance(value, list):
mutable[field] = value[:20]
elif isinstance(value, (dict, list)):
mutable[field] = {"omitted": True, "reason": "size_budget", "original_type": type(value).__name__}
elif isinstance(value, str):
mutable[field] = self._truncate_text(value, 500)
else:
mutable[field] = "[OMITTED:size_budget]"
trim_info["trimmed_fields"].append(field)
trim_info["trimmed"] = self._estimate_size_bytes(mutable) < original_size
trim_info["final_size_bytes"] = self._estimate_size_bytes(mutable)
return mutable, trim_info
def _atomic_write_json(self, target_file: Path, data: Dict[str, Any]) -> None:
target_file.parent.mkdir(parents=True, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(dir=str(target_file.parent), prefix=f".{target_file.name}.", suffix=".tmp")
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, separators=(",", ":"))
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, target_file)
try:
os.chmod(target_file, 0o600)
except Exception:
pass
except Exception:
try:
os.unlink(tmp_path)
except Exception:
pass
raise
def _update_manifest(self, context_type: str, filename: str, doc: Dict[str, Any]) -> None:
manifest_file = self._context_file(self.MANIFEST_FILENAME)
existing = {}
if manifest_file.exists():
try:
with open(manifest_file, "r", encoding="utf-8") as f:
existing = json.load(f) or {}
except Exception:
existing = {}
items = existing.get("documents") if isinstance(existing.get("documents"), list) else []
items = [i for i in items if not (isinstance(i, dict) and i.get("type") == context_type)]
items.append(
{
"type": context_type,
"path": filename,
"updated_at": doc.get("updated_at"),
"size_bytes": (doc.get("meta") or {}).get("data_size_bytes", 0) + (doc.get("meta") or {}).get("summary_size_bytes", 0),
"related_documents": (doc.get("document_context") or {}).get("related_documents", []),
}
)
manifest = {
"schema_version": self.SCHEMA_VERSION,
"user_id": str(self.user_id),
"updated_at": datetime.utcnow().isoformat(),
"documents": items,
}
self._atomic_write_json(manifest_file, manifest)
def _save_context_document(
self,
*,
filename: str,
context_type: str,
payload: Dict[str, Any],
summary: Dict[str, Any],
source: str,
journey_stage: str,
) -> bool:
try:
target_file = self._context_file(filename)
payload = payload if isinstance(payload, dict) else {}
summary = summary if isinstance(summary, dict) else {}
compact_payload, trim_info = self._shrink_payload_if_needed(payload)
payload_size = self._estimate_size_bytes(compact_payload)
summary_size = self._estimate_size_bytes(summary)
context_doc = {
"schema_version": self.SCHEMA_VERSION,
"context_type": context_type,
"user_id": str(self.user_id),
"updated_at": datetime.utcnow().isoformat(),
"source": source,
"document_context": self._build_document_context(
context_type=context_type,
source=source,
journey_stage=journey_stage,
fallback_order=["flat_file", "database", "sif_semantic"],
payload_size=payload_size,
summary_size=summary_size,
payload_within_budget=payload_size <= self.DEFAULT_MAX_BYTES,
),
"data": compact_payload,
"agent_summary": summary,
"meta": {
"data_size_bytes": payload_size,
"summary_size_bytes": summary_size,
"trim": trim_info,
},
}
self._atomic_write_json(target_file, context_doc)
self._update_manifest(context_type, filename, context_doc)
return True
except Exception as exc:
logger.error(f"Failed to save context for user {self.user_id} ({context_type}): {exc}")
return False
def save_step2_website_analysis(self, payload: Dict[str, Any], *, source: str = "onboarding_step2") -> bool:
return self._save_context_document(
filename=self.STEP2_FILENAME,
context_type="onboarding_step2_website_analysis",
payload=payload,
summary=self._build_step2_summary(payload if isinstance(payload, dict) else {}),
source=source,
journey_stage="onboarding_step_2",
)
def save_step3_research_preferences(self, payload: Dict[str, Any], *, source: str = "onboarding_step3") -> bool:
return self._save_context_document(
filename=self.STEP3_FILENAME,
context_type="onboarding_step3_research_preferences",
payload=payload,
summary=self._build_step3_summary(payload if isinstance(payload, dict) else {}),
source=source,
journey_stage="onboarding_step_3",
)
def save_step4_persona_data(self, payload: Dict[str, Any], *, source: str = "onboarding_step4") -> bool:
return self._save_context_document(
filename=self.STEP4_FILENAME,
context_type="onboarding_step4_persona_data",
payload=payload,
summary=self._build_step4_summary(payload if isinstance(payload, dict) else {}),
source=source,
journey_stage="onboarding_step_4",
)
def save_step5_integrations(self, payload: Dict[str, Any], *, source: str = "onboarding_step5") -> bool:
return self._save_context_document(
filename=self.STEP5_FILENAME,
context_type="onboarding_step5_integrations",
payload=payload,
summary=self._build_step5_summary(payload if isinstance(payload, dict) else {}),
source=source,
journey_stage="onboarding_step_5",
)
def _load_context_document(self, filename: str) -> Optional[Dict[str, Any]]:
try:
target_file = self._context_file(filename)
if not target_file.exists():
return None
with open(target_file, "r", encoding="utf-8") as f:
doc = json.load(f)
if isinstance(doc, dict) and str(doc.get("user_id")) != str(self.user_id):
logger.warning(f"Context user mismatch for {filename} (expected {self.user_id})")
return None
return doc if isinstance(doc, dict) else None
except Exception as exc:
logger.warning(f"Failed to load context document for user {self.user_id} ({filename}): {exc}")
return None
def load_context_manifest(self) -> Optional[Dict[str, Any]]:
return self._load_context_document(self.MANIFEST_FILENAME)
def load_step2_context_document(self) -> Optional[Dict[str, Any]]:
return self._load_context_document(self.STEP2_FILENAME)
def load_step2_website_analysis(self) -> Optional[Dict[str, Any]]:
doc = self.load_step2_context_document()
return doc.get("data") if isinstance(doc, dict) and isinstance(doc.get("data"), dict) else None
def load_step3_context_document(self) -> Optional[Dict[str, Any]]:
return self._load_context_document(self.STEP3_FILENAME)
def load_step3_research_preferences(self) -> Optional[Dict[str, Any]]:
doc = self.load_step3_context_document()
return doc.get("data") if isinstance(doc, dict) and isinstance(doc.get("data"), dict) else None
def load_step4_context_document(self) -> Optional[Dict[str, Any]]:
return self._load_context_document(self.STEP4_FILENAME)
def load_step4_persona_data(self) -> Optional[Dict[str, Any]]:
doc = self.load_step4_context_document()
return doc.get("data") if isinstance(doc, dict) and isinstance(doc.get("data"), dict) else None
def load_step5_context_document(self) -> Optional[Dict[str, Any]]:
return self._load_context_document(self.STEP5_FILENAME)
def load_step5_integrations(self) -> Optional[Dict[str, Any]]:
doc = self.load_step5_context_document()
return doc.get("data") if isinstance(doc, dict) and isinstance(doc.get("data"), dict) else None

View File

@@ -15,6 +15,7 @@ from loguru import logger
from .txtai_service import TxtaiIntelligenceService, TXTAI_AVAILABLE
from services.intelligence.agents.core_agent_framework import BaseALwrityAgent
from services.llm_providers.main_text_generation import llm_text_gen
from services.intelligence.agent_flat_context import AgentFlatContextStore
# Optional txtai imports (align with core agent framework)
try:
@@ -181,8 +182,8 @@ class SIFBaseAgent(BaseALwrityAgent):
def _create_txtai_agent(self):
"""
SIF agents primarily use the intelligence service directly, but we can expose
capabilities via a standard agent interface if available.
Expose a txtai Agent interface with flat-file context tools.
Tools are scoped to the current user workspace via AgentFlatContextStore.
"""
if not TXTAI_AVAILABLE or Agent is None:
raise RuntimeError(f"[{self.__class__.__name__}] txtai Agent not available")
@@ -191,11 +192,103 @@ class SIFBaseAgent(BaseALwrityAgent):
_llm_for_agent = self.llm
for _ in range(3):
_llm_for_agent = getattr(_llm_for_agent, "llm", _llm_for_agent)
return Agent(llm=_llm_for_agent, tools=[])
return Agent(
llm=_llm_for_agent,
tools=[
{
"name": "flat_context_manifest",
"description": "Returns manifest of available onboarding flat-context documents for this user",
"target": self._tool_flat_context_manifest,
},
{
"name": "flat_context_read",
"description": "Read a flat-context document by logical name: step2|step3|step4|step5|manifest",
"target": self._tool_flat_context_read,
},
{
"name": "flat_context_write_note",
"description": "Write lightweight agent notes/updates to a specific flat-context document",
"target": self._tool_flat_context_write_note,
},
],
)
except Exception as e:
logger.error(f"[{self.__class__.__name__}] Failed to create txtai Agent: {e}")
raise
def _tool_flat_context_manifest(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""Tool: list available flat-context docs and links."""
try:
store = AgentFlatContextStore(self.user_id)
manifest = store.load_context_manifest() or {"documents": []}
return {"ok": True, "manifest": manifest}
except Exception as e:
return {"ok": False, "error": str(e)}
def _tool_flat_context_read(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""Tool: read one user-scoped context doc."""
try:
key = str((context or {}).get("document") or "").strip().lower()
store = AgentFlatContextStore(self.user_id)
mapping = {
"step2": store.load_step2_context_document,
"step3": store.load_step3_context_document,
"step4": store.load_step4_context_document,
"step5": store.load_step5_context_document,
"manifest": store.load_context_manifest,
}
if key not in mapping:
return {"ok": False, "error": "Invalid document. Use step2|step3|step4|step5|manifest"}
data = mapping[key]()
return {"ok": True, "document": key, "data": data or {}}
except Exception as e:
return {"ok": False, "error": str(e)}
def _tool_flat_context_write_note(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""Tool: append agent note/update to step context by re-saving payload."""
try:
key = str((context or {}).get("document") or "").strip().lower()
note = str((context or {}).get("note") or "").strip()
if not note:
return {"ok": False, "error": "note is required"}
store = AgentFlatContextStore(self.user_id)
if key == "step2":
doc = store.load_step2_context_document() or {}
payload = doc.get("data") if isinstance(doc.get("data"), dict) else {}
notes = payload.get("agent_notes") if isinstance(payload.get("agent_notes"), list) else []
notes.append({"note": note, "agent": self.agent_type, "ts": datetime.utcnow().isoformat()})
payload["agent_notes"] = notes[-50:]
ok = store.save_step2_website_analysis(payload, source="agent_note")
elif key == "step3":
doc = store.load_step3_context_document() or {}
payload = doc.get("data") if isinstance(doc.get("data"), dict) else {}
notes = payload.get("agent_notes") if isinstance(payload.get("agent_notes"), list) else []
notes.append({"note": note, "agent": self.agent_type, "ts": datetime.utcnow().isoformat()})
payload["agent_notes"] = notes[-50:]
ok = store.save_step3_research_preferences(payload, source="agent_note")
elif key == "step4":
doc = store.load_step4_context_document() or {}
payload = doc.get("data") if isinstance(doc.get("data"), dict) else {}
notes = payload.get("agent_notes") if isinstance(payload.get("agent_notes"), list) else []
notes.append({"note": note, "agent": self.agent_type, "ts": datetime.utcnow().isoformat()})
payload["agent_notes"] = notes[-50:]
ok = store.save_step4_persona_data(payload, source="agent_note")
elif key == "step5":
doc = store.load_step5_context_document() or {}
payload = doc.get("data") if isinstance(doc.get("data"), dict) else {}
notes = payload.get("agent_notes") if isinstance(payload.get("agent_notes"), list) else []
notes.append({"note": note, "agent": self.agent_type, "ts": datetime.utcnow().isoformat()})
payload["agent_notes"] = notes[-50:]
ok = store.save_step5_integrations(payload, source="agent_note")
else:
return {"ok": False, "error": "Invalid document. Use step2|step3|step4|step5"}
return {"ok": bool(ok), "document": key}
except Exception as e:
return {"ok": False, "error": str(e)}
class StrategyArchitectAgent(SIFBaseAgent):
"""Agent for discovering content pillars and identifying strategic gaps."""
@@ -697,7 +790,25 @@ class ContentGuardianAgent(SIFBaseAgent):
if not text:
return {"compliance_score": 0.0, "issues": ["No text provided"]}
# 1. Fetch Style Guidelines from SIF if not provided
guidelines_source = "provided" if style_guidelines else "none"
# 1. Fetch Style Guidelines from flat-file context first, then SIF fallback
if not style_guidelines:
try:
flat_doc = AgentFlatContextStore(self.user_id).load_step2_context_document()
flat_data = (flat_doc or {}).get("data") if isinstance(flat_doc, dict) else None
if isinstance(flat_data, dict):
style_guidelines = {
"tone": (flat_data.get("brand_analysis") or {}).get("brand_voice", "neutral"),
"style_patterns": flat_data.get("style_patterns", {}),
"writing_style": flat_data.get("writing_style", {}),
"style_guidelines": flat_data.get("style_guidelines", {}),
}
guidelines_source = "flat_file"
logger.info(f"[{self.__class__.__name__}] Retrieved style guidelines from flat context")
except Exception as e:
logger.warning(f"[{self.__class__.__name__}] Failed to retrieve style guidelines from flat context: {e}")
if not style_guidelines and self.sif_service:
try:
# Search for website analysis to get brand voice/style
@@ -708,7 +819,7 @@ class ContentGuardianAgent(SIFBaseAgent):
res = results[0]
metadata_str = res.get('object')
metadata = json.loads(metadata_str) if isinstance(metadata_str, str) else (metadata_str or res)
if metadata.get('type') == 'website_analysis':
report = metadata.get('full_report', {})
style_guidelines = {
@@ -716,6 +827,7 @@ class ContentGuardianAgent(SIFBaseAgent):
"style_patterns": report.get('style_patterns', {}),
"writing_style": report.get('writing_style', {})
}
guidelines_source = "sif_index"
logger.info(f"[{self.__class__.__name__}] Retrieved style guidelines from SIF: {style_guidelines.get('tone')}")
except Exception as e:
logger.warning(f"[{self.__class__.__name__}] Failed to retrieve style guidelines from SIF: {e}")
@@ -746,7 +858,7 @@ class ContentGuardianAgent(SIFBaseAgent):
"compliance_score": max(0.0, score),
"issues": issues,
"is_compliant": score > 0.8,
"guidelines_source": "sif_index" if not style_guidelines and self.sif_service else "provided"
"guidelines_source": guidelines_source
}
except Exception as e:

View File

@@ -13,12 +13,13 @@ from sqlalchemy import select, desc
import json
from services.database import get_session_for_user, has_onboarding_session
from models.onboarding import WebsiteAnalysis, OnboardingSession, CompetitorAnalysis
from models.onboarding import WebsiteAnalysis, OnboardingSession, CompetitorAnalysis, ResearchPreferences, PersonaData
# Import existing SIF components
from .txtai_service import TxtaiIntelligenceService
from .semantic_cache import semantic_cache_manager, SemanticCacheStats
from services.intelligence.harvester import SemanticHarvesterService
from services.intelligence.agent_flat_context import AgentFlatContextStore
class SIFIntegrationService:
@@ -61,6 +62,284 @@ class SIFIntegrationService:
)
return self.trend_surfer_agent
async def get_step2_website_context(self) -> Dict[str, Any]:
"""
Retrieve onboarding step 2 website context with a strict fallback chain:
flat file -> database -> SIF semantic index.
"""
# 1) Fastest: flat-file agent context
try:
flat_doc = AgentFlatContextStore(self.user_id).load_step2_context_document()
if flat_doc:
return {
"source": "flat_file",
"data": flat_doc.get("data") or {},
"agent_summary": flat_doc.get("agent_summary") or {},
"document_context": flat_doc.get("document_context") or {},
"meta": flat_doc.get("meta") or {},
"updated_at": flat_doc.get("updated_at"),
}
except Exception as e:
logger.warning(f"Flat context lookup failed for user {self.user_id}: {e}")
# 2) Database fallback
db = None
try:
db = get_session_for_user(self.user_id)
if db:
stmt = (
select(WebsiteAnalysis)
.join(OnboardingSession, WebsiteAnalysis.session_id == OnboardingSession.id)
.where(OnboardingSession.user_id == self.user_id)
.order_by(desc(WebsiteAnalysis.updated_at))
)
row = db.execute(stmt).scalars().first()
if row:
payload = row.to_dict() if hasattr(row, "to_dict") else {}
return {
"source": "database",
"data": payload,
"agent_summary": {
"quick_facts": {
"website_url": payload.get("website_url"),
"brand_voice": (payload.get("brand_analysis") or {}).get("brand_voice") if isinstance(payload.get("brand_analysis"), dict) else "",
}
},
}
except Exception as e:
logger.warning(f"Database fallback failed for user {self.user_id}: {e}")
finally:
if db:
db.close()
# 3) Semantic fallback
try:
results = await self.intelligence_service.search("website analysis brand voice style", limit=1)
if results:
top = results[0]
metadata = top.get("object") if isinstance(top, dict) else None
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except Exception:
metadata = {}
if isinstance(metadata, dict):
report = metadata.get("full_report") if isinstance(metadata.get("full_report"), dict) else metadata
return {
"source": "sif_semantic",
"data": report,
"agent_summary": {
"quick_facts": {
"website_url": report.get("website_url") if isinstance(report, dict) else None,
}
},
}
except Exception as e:
logger.warning(f"SIF semantic fallback failed for user {self.user_id}: {e}")
return {"source": "none", "data": {}}
async def get_step3_research_context(self) -> Dict[str, Any]:
"""
Retrieve onboarding step 3 research context with fallback chain:
flat file -> database -> SIF semantic index.
"""
try:
flat_doc = AgentFlatContextStore(self.user_id).load_step3_context_document()
if flat_doc:
return {
"source": "flat_file",
"data": flat_doc.get("data") or {},
"agent_summary": flat_doc.get("agent_summary") or {},
"document_context": flat_doc.get("document_context") or {},
"meta": flat_doc.get("meta") or {},
"updated_at": flat_doc.get("updated_at"),
}
except Exception as e:
logger.warning(f"Step 3 flat context lookup failed for user {self.user_id}: {e}")
db = None
try:
db = get_session_for_user(self.user_id)
if db:
stmt = (
select(ResearchPreferences)
.join(OnboardingSession, ResearchPreferences.session_id == OnboardingSession.id)
.where(OnboardingSession.user_id == self.user_id)
.order_by(desc(ResearchPreferences.updated_at))
)
prefs = db.execute(stmt).scalars().first()
if prefs:
payload = prefs.to_dict() if hasattr(prefs, "to_dict") else {}
return {
"source": "database",
"data": payload,
"agent_summary": {
"quick_facts": {
"research_depth": payload.get("research_depth"),
"content_types_count": len(payload.get("content_types") or []),
}
},
}
except Exception as e:
logger.warning(f"Step 3 database fallback failed for user {self.user_id}: {e}")
finally:
if db:
db.close()
try:
results = await self.intelligence_service.search("research preferences competitors onboarding step 3", limit=1)
if results:
top = results[0]
metadata = top.get("object") if isinstance(top, dict) else None
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except Exception:
metadata = {}
report = metadata.get("full_report") if isinstance(metadata, dict) and isinstance(metadata.get("full_report"), dict) else (metadata if isinstance(metadata, dict) else {})
return {
"source": "sif_semantic",
"data": report,
"agent_summary": {
"quick_facts": {
"research_depth": report.get("research_depth") if isinstance(report, dict) else None,
}
},
}
except Exception as e:
logger.warning(f"Step 3 semantic fallback failed for user {self.user_id}: {e}")
return {"source": "none", "data": {}}
async def get_step4_persona_context(self) -> Dict[str, Any]:
"""
Retrieve onboarding step 4 persona context with fallback chain:
flat file -> database -> SIF semantic index.
"""
try:
flat_doc = AgentFlatContextStore(self.user_id).load_step4_context_document()
if flat_doc:
return {
"source": "flat_file",
"data": flat_doc.get("data") or {},
"agent_summary": flat_doc.get("agent_summary") or {},
"document_context": flat_doc.get("document_context") or {},
"meta": flat_doc.get("meta") or {},
"updated_at": flat_doc.get("updated_at"),
}
except Exception as e:
logger.warning(f"Step 4 flat context lookup failed for user {self.user_id}: {e}")
db = None
try:
db = get_session_for_user(self.user_id)
if db:
stmt = (
select(PersonaData)
.join(OnboardingSession, PersonaData.session_id == OnboardingSession.id)
.where(OnboardingSession.user_id == self.user_id)
.order_by(desc(PersonaData.updated_at))
)
persona = db.execute(stmt).scalars().first()
if persona:
payload = persona.to_dict() if hasattr(persona, "to_dict") else {}
return {
"source": "database",
"data": payload,
"agent_summary": {
"quick_facts": {
"selected_platforms_count": len(payload.get("selected_platforms") or []),
"has_core_persona": bool(payload.get("core_persona")),
}
},
}
except Exception as e:
logger.warning(f"Step 4 database fallback failed for user {self.user_id}: {e}")
finally:
if db:
db.close()
try:
results = await self.intelligence_service.search("persona platform personas onboarding step 4", limit=1)
if results:
top = results[0]
metadata = top.get("object") if isinstance(top, dict) else None
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except Exception:
metadata = {}
report = metadata.get("full_report") if isinstance(metadata, dict) and isinstance(metadata.get("full_report"), dict) else (metadata if isinstance(metadata, dict) else {})
return {
"source": "sif_semantic",
"data": report,
"agent_summary": {
"quick_facts": {
"has_core_persona": bool(report.get("core_persona")) if isinstance(report, dict) else False,
}
},
}
except Exception as e:
logger.warning(f"Step 4 semantic fallback failed for user {self.user_id}: {e}")
return {"source": "none", "data": {}}
async def get_step5_integrations_context(self) -> Dict[str, Any]:
"""
Retrieve onboarding step 5 integrations context with fallback chain:
flat file -> SIF semantic index.
"""
try:
flat_doc = AgentFlatContextStore(self.user_id).load_step5_context_document()
if flat_doc:
return {
"source": "flat_file",
"data": flat_doc.get("data") or {},
"agent_summary": flat_doc.get("agent_summary") or {},
"document_context": flat_doc.get("document_context") or {},
"meta": flat_doc.get("meta") or {},
"updated_at": flat_doc.get("updated_at"),
}
except Exception as e:
logger.warning(f"Step 5 flat context lookup failed for user {self.user_id}: {e}")
try:
results = await self.intelligence_service.search("integrations onboarding step 5 connected providers", limit=1)
if results:
top = results[0]
metadata = top.get("object") if isinstance(top, dict) else None
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except Exception:
metadata = {}
report = metadata.get("full_report") if isinstance(metadata, dict) and isinstance(metadata.get("full_report"), dict) else (metadata if isinstance(metadata, dict) else {})
return {
"source": "sif_semantic",
"data": report,
"agent_summary": {
"quick_facts": {
"connected_integrations_count": len((report.get("integrations") or {})) if isinstance(report, dict) and isinstance(report.get("integrations"), dict) else None,
}
},
}
except Exception as e:
logger.warning(f"Step 5 semantic fallback failed for user {self.user_id}: {e}")
return {"source": "none", "data": {}}
async def get_flat_context_manifest(self) -> Dict[str, Any]:
"""Return lightweight manifest of available flat context documents for this user."""
try:
manifest = AgentFlatContextStore(self.user_id).load_context_manifest()
if manifest:
return {"source": "flat_file", "data": manifest}
except Exception as e:
logger.warning(f"Failed to load flat context manifest for user {self.user_id}: {e}")
return {"source": "none", "data": {"documents": []}}
async def index_market_trends_run(self, trends_result: Dict[str, Any], run_id: str) -> bool:
try:
latest_id = f"market_trends_latest:{self.user_id}"

View File

@@ -189,3 +189,20 @@ All orchestration updates are emitted as typed records under a shared schema:
* **Inter-Agent Chat**: Allow agents to debate strategy (e.g., SEO Agent vs. Creative Agent).
* **Auto-Execution**: Allow agents to *perform* tasks (e.g., fix a broken link) with user approval.
* **Voice Interface**: Daily standup meeting via voice.
## ⚡ Agent Fast-Context Layer (Onboarding Step 2)
To reduce latency for repetitive agent reads, Step 2 website analysis is now persisted to a per-user flat file in workspace:
- `workspace/workspace_<safe_user_id>/agent_context/step2_website_analysis.json`
**Read order for agents:**
1. Flat-file context (agent-only, fastest)
2. Relational database (`website_analyses`)
3. SIF semantic index retrieval
This preserves SIF intelligence workflows while giving agents deterministic, low-latency access to core onboarding context.
It also stores agent-optimized `quick_facts`, `retrieval_hints`, and full-fidelity raw payload blocks so both fast inference and deep-dive reasoning are supported.
Reference design docs: `docs/flat_file_context/STEP2_FLAT_FILE_CONTEXT_DESIGN.md`, `docs/flat_file_context/STEP3_FLAT_FILE_CONTEXT_DESIGN.md`, `docs/flat_file_context/STEP4_FLAT_FILE_CONTEXT_DESIGN.md`, `docs/flat_file_context/STEP5_FLAT_FILE_CONTEXT_DESIGN.md`, `docs/flat_file_context/FLAT_FILE_CONTEXT_FRAMEWORK_DESIGN.md`, `docs/flat_file_context/FLAT_FILE_CONTEXT_SECURITY_AND_ISOLATION.md`, and `docs/flat_file_context/FLAT_FILE_CONTEXT_PROGRESS_AND_QUICK_WINS.md`.

View File

@@ -0,0 +1,69 @@
# Flat File Context Enhancements Backlog
This document tracks next-phase implementation items for the flat-file context framework.
## 1) TTL/Refresh Hints + Freshness Policy
### Objective
Prevent stale agent decisions by adding explicit freshness semantics.
### Proposed additions
- Add `m.ttl_s` (seconds) and `m.stale_after` (timestamp) to context envelope.
- Add `m.refresh_recommended` boolean.
- Define per-context defaults (Step 2 likely long TTL, but still bounded).
### Acceptance criteria
- Reader utility can classify context as `fresh|stale|expired`.
- Fallback to DB/SIF triggered automatically when stale policy requires.
---
## 2) Optional `.json.gz` Companion for Large Payloads
### Objective
Reduce disk footprint and IO for large context payloads.
### Proposed additions
- Write primary `.json` always.
- If payload exceeds threshold (e.g., >256 KB), write `.json.gz` companion.
- Add pointer metadata (`m.gz=true`, `m.gz_path`).
### Acceptance criteria
- Reader transparently supports JSON + GZIP variants.
- No regression for small payloads.
---
## 3) Section Checksums for Drift Detection
### Objective
Detect inconsistencies between flat-file context and database state.
### Proposed additions
- Add checksums per section (`d.brand`, `d.seo`, `d.audience`, etc.) under `m.chk`.
- Persist DB-row reference (`m.db_ref`) with latest row id/timestamp.
- Add `verify_drift()` utility.
### Acceptance criteria
- Drift check can flag `in_sync|partial_drift|out_of_sync`.
- On drift, reader suggests refresh + fallback path.
---
## 4) Extend Pattern to Step 3 and Step 4
### Objective
Standardize agent context retrieval across onboarding steps.
### Proposed additions
- `step3_research_context.json`
- `step4_persona_context.json`
- Shared envelope with step-specific `d/s` contracts.
### Acceptance criteria
- Same fallback chain works for step-specific readers.
- SIF agents can consume common interface across Step 2/3/4.
---
## Suggested implementation order
1. TTL/freshness
2. Checksums/drift detection
3. Step 3/4 expansion
4. Optional gzip optimization

View File

@@ -0,0 +1,140 @@
# Flat File Context Framework Design (Agent-Optimized)
## Purpose
Design a **compact, machine-first flat-file framework** for ALwrity AI agents.
This framework is optimized for:
- deterministic structure,
- minimal token footprint,
- fast parsing,
- high-signal retrieval,
- robust fallback behavior.
## Core Principles
1. **Agent-first, not human-first**
- Keys are short and stable.
- Avoid verbose prose in payloads.
- Include only fields needed for reasoning and tool actions.
2. **Compact + predictable schema**
- Fixed top-level keys in strict order.
- Canonical value types (no shape drift).
- Avoid polymorphic fields when possible.
3. **Dual-layer context**
- `d` (full normalized data for deep reasoning).
- `s` (summary/high-signal fast path for most agent reads).
4. **Fallback-safe design**
- Every context doc includes source + freshness metadata.
- If missing/stale, consumers fall back to DB then SIF semantic.
5. **Multi-tenant isolation**
- Per-user file under `workspace/workspace_<safe_user_id>/agent_context/`.
---
## Canonical Context Envelope (compact)
```json
{
"v": "1.0",
"t": "onboarding.step2.website_analysis",
"u": "<user_id>",
"ts": "<iso8601>",
"src": "onboarding_step2",
"d": {},
"s": {},
"m": {
"db": 0,
"sb": 0,
"q": []
}
}
```
### Field map
- `v`: schema version
- `t`: context type
- `u`: user id
- `ts`: updated timestamp
- `src`: source writer
- `d`: canonical normalized data
- `s`: high-signal summary for quick agent use
- `m`: meta (`db`=data bytes, `sb`=summary bytes, `q`=query hints)
---
## Agent Readability Best Practices
- Prefer enums/controlled vocab over free text.
- Use compact keys and arrays for repetitive entities.
- Truncate long textual blobs unless explicitly required.
- Keep “quick facts” flattened.
- Separate operational metadata from semantic content.
- Include retrieval hints (`q`) for consistent query drafting.
---
## Write Pipeline Pattern
1. Normalize incoming source payload.
2. Derive compact summary (`s`) from normalized data.
3. Compute lightweight metadata (`m`).
4. Atomic write JSON file.
5. Emit writer version + timestamp.
## Read Pipeline Pattern
1. Attempt flat-file load.
2. Validate minimum envelope fields (`v,t,u,ts,d`).
3. Prefer `s` for quick tasks; use `d` for deeper reasoning.
4. If invalid/missing/stale: fallback DB -> SIF semantic.
---
## Scope Expansion Pattern
Apply same envelope for:
- Step 2: website analysis
- Step 3: research preferences + competitor snapshots
- Step 4: persona profile + platform personas
Only `t`, `d`, and `s` payload contracts should vary.
---
## Governance
- Schema changes require version bump (`v`).
- Backward compatibility policy: readers support N and N-1.
- Drift checks should compare canonical hash/checksum vs DB latest row.
## Document Context + End-User Journey Metadata
Each context file should carry explicit machine-oriented document metadata so agents understand *what this file is* before reading full payloads.
Suggested `document_context` fields:
- `audience`: `ai_agents`
- `purpose`: `fast_context_retrieval`
- `context_type`: step-scoped type identifier
- `journey`: stage/action/agent expectation
- `retrieval_contract`: preferred source + fallback order
- `context_window_guidance`: byte budget and summary-first policy
This block is intentionally compact and deterministic to reduce wasted token usage for agent planning.
## Context Window and Length Policy
- Keep combined `data + summary` under a defined byte budget where practical.
- Enforce summary-first reads in agent consumers.
- Truncate long textual fields in summaries; keep full text only in `data` when needed.
- Flag oversize docs in metadata so readers can skip low-priority sections.
- Prefer short, stable keys in machine envelopes and avoid natural-language verbosity.
## Implemented baseline controls
- Atomic file writes to avoid partial documents.
- Best-effort restricted file permissions (`0600`).
- Recursive sensitive-key redaction for payload snapshots.
- Payload size budget enforcement with deterministic trimming metadata.
- Internal document linking via `related_documents` and manifest index.
Security and isolation details: `docs/flat_file_context/FLAT_FILE_CONTEXT_SECURITY_AND_ISOLATION.md`
Step docs: `docs/flat_file_context/STEP2_FLAT_FILE_CONTEXT_DESIGN.md`, `docs/flat_file_context/STEP3_FLAT_FILE_CONTEXT_DESIGN.md`, `docs/flat_file_context/STEP4_FLAT_FILE_CONTEXT_DESIGN.md`, `docs/flat_file_context/STEP5_FLAT_FILE_CONTEXT_DESIGN.md`

View File

@@ -0,0 +1,26 @@
# Flat File Context Progress Review and Quick Wins
## Progress so far
- Step 2 context: implemented (website analysis fast path + fallback).
- Step 3 context: implemented (research preferences + competitors fast path + fallback).
- Step 4 context: implemented (persona data fast path + fallback).
- Step 5 context: implemented (integrations fast path + fallback).
- Security baseline: user isolation checks, redaction, atomic writes, file-permission hardening.
- Size governance: payload budget + deterministic trimming + trim metadata.
- Internal linking: related-document links + manifest index.
## Quick-win improvements (next 1-2 sprints)
1. Add explicit TTL/staleness fields and auto-refresh hints per step.
2. Add lightweight checksums per section to detect DB drift quickly.
3. Add optional `.json.gz` companion for oversized archives.
4. Add shared reader utility for summary-first + selective field loading.
5. Add minimal unit tests for:
- redaction
- trimming behavior
- manifest linking
- cross-user load rejection
6. Add agent telemetry: record which sections are actually read to optimize summaries.
## Newly added agent tooling
- txtai agent tools for flat-file context manifest/read/write-note operations were added to SIF base agent to support file operations in agent workflows.

View File

@@ -0,0 +1,39 @@
# Flat File Context Security, Isolation, and Size Controls
## Objective
Provide minimal but practical security for agent flat-file context with strong end-user isolation and bounded document growth.
## Isolation model
- Per-user namespace: `workspace/workspace_<safe_user_id>/agent_context/`
- Sanitized user IDs only (`[a-zA-Z0-9_-]`) to prevent path traversal.
- Reader-side user check: loaded document `user_id` must match requesting user context.
## Minimal security controls implemented
1. **Atomic writes**
- Context files are written via temporary file + `os.replace`.
- Prevents partial/corrupt files under concurrent writes.
2. **File permissions**
- Context files are best-effort set to `0600`.
3. **Sensitive key redaction**
- Recursive redaction for key patterns like `api_key`, `token`, `secret`, `password`, `authorization`, `cookie`.
4. **Manifest index**
- `context_manifest.json` gives agents a controlled map of available docs and relationships.
## Size and context-window controls
- Byte budget for raw document payloads (`DEFAULT_MAX_BYTES`).
- If oversize, low-priority/heavy sections are trimmed first (`raw_*`, large snapshots, heavy arrays).
- Trim metadata is preserved under `meta.trim` for traceability.
- Agent policy remains summary-first (`agent_summary` before `data`).
## Internal document linking
- Each context file includes `document_context.related_documents`.
- Manifest includes per-document `related_documents` links.
- This enables agents to:
1. read one document,
2. discover related context files,
3. fetch only relevant next documents.
## Recommended next steps
- Add optional file-level signatures/HMAC for tamper evidence.
- Add checksum per section to detect DB drift.
- Add staleness policy (`ttl_s`, `stale_after`) and auto-refresh triggers.

View File

@@ -0,0 +1,54 @@
# Step 2 Flat File Context Design (Website Analysis)
## Intent
Step 2 context must be optimized for **AI-agent retrieval speed and token efficiency**, not human readability.
## Current storage location
- `workspace/workspace_<safe_user_id>/agent_context/step2_website_analysis.json`
## Current retrieval chain
1. Flat file (fastest)
2. DB (`website_analyses`)
3. SIF semantic fallback
## Compactness strategy
For implementation, keep two logical layers:
- **`d` equivalent (full canonical data)** for deep reasoning.
- **`s` equivalent (high-signal summary)** for fast agent prompts and most decisions.
- **`document_context`** for machine-readable orientation (purpose, journey stage, fallback contract, context-window guidance).
Agents should default to summary-first reads and only open full data when needed.
## Step 2 coverage requirements
The Step 2 context should preserve these semantic groups:
- identity/state: website url, timestamps, status/error/warning
- brand/style: writing style, style patterns/guidelines, brand analysis
- audience/content: target audience, content type, recommended settings, characteristics
- strategy/seo: strategy insights, SEO audit, strategic history
- crawl/discovery: crawl output, meta info, sitemap analysis
- traceability: raw inbound payload snapshots
## Agent-readability best practices
- Keep keys stable and deterministic.
- Prefer arrays/enums over long free text.
- Keep summary fields flattened and high signal.
- Avoid duplicate verbose nested structures unless required for correctness.
- Include retrieval hints for consistent downstream querying.
## Practical guidance for consumers
- Use summary/high-signal fields first for routing and lightweight reasoning.
- Pull deep fields only for specialist tasks (SEO, persona fidelity, editorial style checks).
- If flat-file missing/stale: auto-fallback to DB then SIF.
## Note
A generalized compact framework is documented in:
- `docs/flat_file_context/FLAT_FILE_CONTEXT_FRAMEWORK_DESIGN.md`
Future enhancements are tracked in:
- `docs/flat_file_context/FLAT_FILE_CONTEXT_ENHANCEMENTS_BACKLOG.md`
## Context window guidance
- Keep summary compact and deterministic.
- Add byte-size metadata to help agents decide whether to expand into full data.
- Prefer short keys and avoid verbose natural language in machine envelopes.

View File

@@ -0,0 +1,39 @@
# Step 3 Flat File Context Design (Research Preferences + Competitors)
## Intent
Provide agent-ready Step 3 context with compact summaries for routing plus full payload for deep analysis.
## Storage location
- `workspace/workspace_<safe_user_id>/agent_context/step3_research_preferences.json`
## Why this matters for agents
Step 3 is the bridge from website understanding (Step 2) to competitive strategy and research execution. Agents need this file to understand:
- depth and quality preference constraints,
- factuality constraints,
- content-type priorities,
- competitor landscape and industry context.
## Document-context block
Every context file should include machine-readable document metadata to orient agents quickly:
- audience (`ai_agents`)
- purpose (`fast_context_retrieval`)
- journey stage (`onboarding_step_3`)
- retrieval contract and fallback order
- context-window guidance (size budget + summary-first policy)
## Minimal Step 3 data groups
- research config: depth/content types/auto/factual
- inherited style profile (if present): writing style, target audience, recommended settings
- competitors: domain/url/title/relevance highlights
- industry context: compact market framing text
- traceability: source payload and timestamps
## Agent usage policy
1. Start with `agent_summary.quick_facts` and `retrieval_hints`.
2. Use competitor summary before opening full competitor objects.
3. Read full `data` only for tasks requiring strict evidence/fields.
4. Fall back to DB, then SIF semantic if missing or stale.
## Related-document navigation
Agents can consult `context_manifest.json` to discover linked context files and traverse only the required documents for the task.

View File

@@ -0,0 +1,25 @@
# Step 4 Flat File Context Design (Persona Data)
## Intent
Capture onboarding Step 4 persona outputs in an agent-first flat file so agents can quickly personalize strategy, content, and platform execution.
## Storage location
- `workspace/workspace_<safe_user_id>/agent_context/step4_persona_data.json`
## Required Step 4 coverage
- core persona profile (`core_persona`)
- platform personas (`platform_personas`)
- quality metrics (`quality_metrics`)
- selected platforms (`selected_platforms`)
- research persona/notes when available
- source payload + timestamps for traceability
## Agent summary expectations
- quick facts: selected platform count, persona availability flags
- retrieval hints: persona/profile adaptation queries
- persona focus: compact actionable slice of core persona + quality constraints
## Usage policy
1. Start with `agent_summary`.
2. Expand into `data` only when a task needs full fidelity.
3. Use `document_context.related_documents` to fetch upstream Step 2/Step 3 context as needed.

View File

@@ -0,0 +1,22 @@
# Step 5 Flat File Context Design (Integrations)
## Intent
Capture onboarding Step 5 integration configuration in a compact agent-readable context so agents can reason about connected services and execution constraints.
## Storage location
- `workspace/workspace_<safe_user_id>/agent_context/step5_integrations.json`
## Required Step 5 coverage
- integration map (`integrations`)
- provider list (`providers`)
- connected account references (`connected_accounts`)
- integration status and notes
- source payload and timestamps
## Agent summary expectations
- connected integration count/list
- provider count
- retrieval hints for integration readiness checks
## Linked traversal
Use `document_context.related_documents` and `context_manifest.json` to navigate Step 2/3/4 upstream dependencies when deciding tool execution paths.