529 lines
23 KiB
Python
529 lines
23 KiB
Python
"""Flat-file context storage for AI agents.
|
|
|
|
Stores onboarding context in per-user workspace files, optimized for fast agent reads.
|
|
Includes minimal security hardening, context-size controls, and internal document linking.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import tempfile
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Optional, Tuple
|
|
|
|
from loguru import logger
|
|
|
|
|
|
class AgentFlatContextStore:
|
|
"""Read/write agent-only flat-file context in per-user workspace."""
|
|
|
|
CONTEXT_DIRNAME = "agent_context"
|
|
STEP2_FILENAME = "step2_website_analysis.json"
|
|
STEP3_FILENAME = "step3_research_preferences.json"
|
|
STEP4_FILENAME = "step4_persona_data.json"
|
|
STEP5_FILENAME = "step5_integrations.json"
|
|
MANIFEST_FILENAME = "context_manifest.json"
|
|
|
|
SCHEMA_VERSION = "1.3"
|
|
DEFAULT_MAX_BYTES = 300_000
|
|
SUMMARY_TEXT_LIMIT = 800
|
|
|
|
def __init__(self, user_id: str):
|
|
self.user_id = user_id
|
|
self.safe_user_id = self._sanitize_user_id(user_id)
|
|
|
|
@staticmethod
|
|
def _sanitize_user_id(user_id: str) -> str:
|
|
safe = "".join(c for c in str(user_id) if c.isalnum() or c in ("-", "_"))
|
|
return safe or "unknown_user"
|
|
|
|
def _workspace_dir(self) -> Path:
|
|
root_dir = Path(__file__).resolve().parents[3]
|
|
return root_dir / "workspace" / f"workspace_{self.safe_user_id}"
|
|
|
|
def _context_dir(self) -> Path:
|
|
return self._workspace_dir() / self.CONTEXT_DIRNAME
|
|
|
|
def _context_file(self, filename: str) -> Path:
|
|
return self._context_dir() / filename
|
|
|
|
@staticmethod
|
|
def _estimate_size_bytes(value: Any) -> int:
|
|
try:
|
|
return len(json.dumps(value, ensure_ascii=False).encode("utf-8"))
|
|
except Exception:
|
|
return 0
|
|
|
|
@staticmethod
|
|
def _to_context_list(value: Any) -> Any:
|
|
if value is None:
|
|
return []
|
|
if isinstance(value, list):
|
|
return value
|
|
if isinstance(value, dict):
|
|
return list(value.keys())
|
|
return [str(value)]
|
|
|
|
@staticmethod
|
|
def _truncate_text(value: Any, max_chars: int = SUMMARY_TEXT_LIMIT) -> str:
|
|
text = value if isinstance(value, str) else ""
|
|
if len(text) <= max_chars:
|
|
return text
|
|
return f"{text[:max_chars]}..."
|
|
|
|
@staticmethod
|
|
def _redact_sensitive(data: Any) -> Any:
|
|
"""Minimal recursive redaction for sensitive-like keys in payload snapshots."""
|
|
sensitive_tokens = {"api_key", "token", "secret", "password", "authorization", "cookie"}
|
|
if isinstance(data, dict):
|
|
redacted = {}
|
|
for k, v in data.items():
|
|
key_lower = str(k).lower()
|
|
if any(token in key_lower for token in sensitive_tokens):
|
|
redacted[k] = "[REDACTED]"
|
|
else:
|
|
redacted[k] = AgentFlatContextStore._redact_sensitive(v)
|
|
return redacted
|
|
if isinstance(data, list):
|
|
return [AgentFlatContextStore._redact_sensitive(v) for v in data]
|
|
return data
|
|
|
|
def _related_documents(self, context_type: str) -> list:
|
|
if context_type == "onboarding_step2_website_analysis":
|
|
return [
|
|
{"type": "onboarding_step3_research_preferences", "path": self.STEP3_FILENAME, "relationship": "next_step"},
|
|
{"type": "onboarding_step4_persona_data", "path": self.STEP4_FILENAME, "relationship": "future_dependency"},
|
|
{"type": "onboarding_step5_integrations", "path": self.STEP5_FILENAME, "relationship": "future_dependency"},
|
|
]
|
|
if context_type == "onboarding_step3_research_preferences":
|
|
return [
|
|
{"type": "onboarding_step2_website_analysis", "path": self.STEP2_FILENAME, "relationship": "previous_step"},
|
|
{"type": "onboarding_step4_persona_data", "path": self.STEP4_FILENAME, "relationship": "next_step"},
|
|
{"type": "onboarding_step5_integrations", "path": self.STEP5_FILENAME, "relationship": "future_dependency"},
|
|
]
|
|
if context_type == "onboarding_step4_persona_data":
|
|
return [
|
|
{"type": "onboarding_step3_research_preferences", "path": self.STEP3_FILENAME, "relationship": "previous_step"},
|
|
{"type": "onboarding_step2_website_analysis", "path": self.STEP2_FILENAME, "relationship": "upstream_context"},
|
|
{"type": "onboarding_step5_integrations", "path": self.STEP5_FILENAME, "relationship": "next_step"},
|
|
]
|
|
if context_type == "onboarding_step5_integrations":
|
|
return [
|
|
{"type": "onboarding_step4_persona_data", "path": self.STEP4_FILENAME, "relationship": "previous_step"},
|
|
{"type": "onboarding_step3_research_preferences", "path": self.STEP3_FILENAME, "relationship": "upstream_context"},
|
|
]
|
|
return []
|
|
|
|
def _build_document_context(
|
|
self,
|
|
*,
|
|
context_type: str,
|
|
source: str,
|
|
journey_stage: str,
|
|
fallback_order: list,
|
|
payload_size: int,
|
|
summary_size: int,
|
|
payload_within_budget: bool,
|
|
) -> Dict[str, Any]:
|
|
total_size = payload_size + summary_size
|
|
return {
|
|
"audience": "ai_agents",
|
|
"purpose": "fast_context_retrieval",
|
|
"context_type": context_type,
|
|
"source": source,
|
|
"tenant": {"user_id_safe": self.safe_user_id, "isolation_scope": "workspace_user"},
|
|
"journey": {
|
|
"stage": journey_stage,
|
|
"user_action": "onboarding",
|
|
"agent_expectation": "read_summary_first_then_expand",
|
|
},
|
|
"retrieval_contract": {
|
|
"preferred": "flat_file",
|
|
"fallback_order": fallback_order,
|
|
},
|
|
"context_window_guidance": {
|
|
"max_raw_bytes": self.DEFAULT_MAX_BYTES,
|
|
"total_bytes": total_size,
|
|
"raw_document_within_budget": payload_within_budget,
|
|
"agent_policy": "Use agent_summary first; open full data only for specialist tasks",
|
|
},
|
|
"related_documents": self._related_documents(context_type),
|
|
}
|
|
|
|
def _build_step2_summary(self, payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
seo_audit = payload.get("seo_audit") if isinstance(payload.get("seo_audit"), dict) else {}
|
|
brand = payload.get("brand_analysis") if isinstance(payload.get("brand_analysis"), dict) else {}
|
|
rec_settings = payload.get("recommended_settings") if isinstance(payload.get("recommended_settings"), dict) else {}
|
|
target_audience = payload.get("target_audience") if isinstance(payload.get("target_audience"), dict) else {}
|
|
social = payload.get("social_media_presence") if isinstance(payload.get("social_media_presence"), dict) else {}
|
|
|
|
technical_issues = self._to_context_list(seo_audit.get("technical_issues"))
|
|
recommendations = self._to_context_list(seo_audit.get("recommendations"))
|
|
|
|
quick_facts = {
|
|
"website_url": payload.get("website_url") or "",
|
|
"brand_voice": brand.get("brand_voice") or "",
|
|
"industry": brand.get("industry") or "",
|
|
"target_segment": target_audience.get("primary_audience") or target_audience.get("audience_type") or "",
|
|
"writing_tone": rec_settings.get("writing_tone") or "",
|
|
"primary_content_type": (payload.get("content_type") or {}).get("primary_type") if isinstance(payload.get("content_type"), dict) else "",
|
|
"social_platforms": sorted(list(social.keys())),
|
|
"seo_issue_count": len(technical_issues),
|
|
"seo_recommendation_count": len(recommendations),
|
|
}
|
|
|
|
return {
|
|
"quick_facts": quick_facts,
|
|
"retrieval_hints": {
|
|
"high_signal_terms": [
|
|
term
|
|
for term in [
|
|
quick_facts.get("brand_voice"),
|
|
quick_facts.get("industry"),
|
|
quick_facts.get("writing_tone"),
|
|
quick_facts.get("primary_content_type"),
|
|
]
|
|
if term
|
|
],
|
|
"agent_queries": [
|
|
"brand voice guidelines",
|
|
"website style patterns",
|
|
"seo technical issues",
|
|
"content strategy opportunities",
|
|
"target audience profile",
|
|
],
|
|
},
|
|
"profile": {
|
|
"writing_style": payload.get("writing_style") or {},
|
|
"style_patterns": payload.get("style_patterns") or {},
|
|
"style_guidelines": payload.get("style_guidelines") or {},
|
|
"recommended_settings": rec_settings,
|
|
"target_audience": target_audience,
|
|
},
|
|
"seo_focus": {
|
|
"technical_issues": technical_issues,
|
|
"recommendations": recommendations,
|
|
},
|
|
}
|
|
|
|
def _build_step3_summary(self, payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
competitors = payload.get("competitors") if isinstance(payload.get("competitors"), list) else []
|
|
domains = []
|
|
for comp in competitors[:20]:
|
|
if isinstance(comp, dict):
|
|
dom = comp.get("domain") or comp.get("url")
|
|
if dom:
|
|
domains.append(str(dom))
|
|
|
|
research_depth = payload.get("research_depth") or ""
|
|
content_types = payload.get("content_types") if isinstance(payload.get("content_types"), list) else []
|
|
industry_context = self._truncate_text(payload.get("industry_context") or payload.get("industryContext") or "", 500)
|
|
|
|
return {
|
|
"quick_facts": {
|
|
"research_depth": research_depth,
|
|
"content_types": content_types,
|
|
"auto_research": bool(payload.get("auto_research", True)),
|
|
"factual_content": bool(payload.get("factual_content", True)),
|
|
"competitor_count": len(competitors),
|
|
},
|
|
"retrieval_hints": {
|
|
"high_signal_terms": [research_depth, *content_types[:5]],
|
|
"agent_queries": [
|
|
"competitor landscape summary",
|
|
"content opportunities by competitor",
|
|
"research depth preferences",
|
|
"factual content constraints",
|
|
],
|
|
},
|
|
"competitor_focus": {
|
|
"top_competitor_domains": domains[:10],
|
|
"industry_context": industry_context,
|
|
},
|
|
}
|
|
|
|
def _build_step4_summary(self, payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
core_persona = payload.get("core_persona") if isinstance(payload.get("core_persona"), dict) else {}
|
|
platform_personas = payload.get("platform_personas") if isinstance(payload.get("platform_personas"), dict) else {}
|
|
quality_metrics = payload.get("quality_metrics") if isinstance(payload.get("quality_metrics"), dict) else {}
|
|
selected_platforms = payload.get("selected_platforms") if isinstance(payload.get("selected_platforms"), list) else []
|
|
|
|
persona_name = core_persona.get("name") or core_persona.get("persona_name") or ""
|
|
primary_goal = self._truncate_text(core_persona.get("primary_goal") or core_persona.get("goal") or "", 250)
|
|
|
|
return {
|
|
"quick_facts": {
|
|
"persona_name": persona_name,
|
|
"selected_platforms": selected_platforms,
|
|
"platform_persona_count": len(platform_personas.keys()) if isinstance(platform_personas, dict) else 0,
|
|
"has_research_persona": bool(payload.get("research_persona")),
|
|
},
|
|
"retrieval_hints": {
|
|
"high_signal_terms": [persona_name, *selected_platforms[:5]],
|
|
"agent_queries": [
|
|
"core persona profile",
|
|
"platform persona adaptations",
|
|
"persona quality metrics",
|
|
"research persona defaults",
|
|
],
|
|
},
|
|
"persona_focus": {
|
|
"primary_goal": primary_goal,
|
|
"core_persona": core_persona,
|
|
"quality_metrics": quality_metrics,
|
|
},
|
|
}
|
|
|
|
def _build_step5_summary(self, payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
integrations = payload.get("integrations") if isinstance(payload.get("integrations"), dict) else {}
|
|
providers = payload.get("providers") if isinstance(payload.get("providers"), list) else []
|
|
connected = [k for k, v in integrations.items() if bool(v)]
|
|
notes = self._truncate_text(payload.get("notes") or payload.get("integration_notes") or "", 300)
|
|
|
|
return {
|
|
"quick_facts": {
|
|
"connected_integrations_count": len(connected),
|
|
"connected_integrations": connected[:20],
|
|
"providers_count": len(providers),
|
|
},
|
|
"retrieval_hints": {
|
|
"high_signal_terms": connected[:5],
|
|
"agent_queries": [
|
|
"integration readiness",
|
|
"connected providers summary",
|
|
"missing integration dependencies",
|
|
],
|
|
},
|
|
"integration_focus": {
|
|
"notes": notes,
|
|
"integrations": integrations,
|
|
},
|
|
}
|
|
|
|
def _shrink_payload_if_needed(self, payload: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
|
|
"""Keep payload under budget by trimming heavy optional sections first."""
|
|
payload = self._redact_sensitive(payload if isinstance(payload, dict) else {})
|
|
original_size = self._estimate_size_bytes(payload)
|
|
trim_info = {"trimmed": False, "original_size_bytes": original_size, "trimmed_fields": []}
|
|
|
|
if original_size <= self.DEFAULT_MAX_BYTES:
|
|
return payload, trim_info
|
|
|
|
candidates = [
|
|
"raw_step2_payload",
|
|
"raw_analysis_payload",
|
|
"source_payload",
|
|
"crawl_result",
|
|
"competitors",
|
|
"strategic_insights_history",
|
|
"seo_audit",
|
|
]
|
|
|
|
mutable = dict(payload)
|
|
for field in candidates:
|
|
if self._estimate_size_bytes(mutable) <= self.DEFAULT_MAX_BYTES:
|
|
break
|
|
if field in mutable:
|
|
value = mutable.get(field)
|
|
if field == "competitors" and isinstance(value, list):
|
|
mutable[field] = value[:20]
|
|
elif isinstance(value, (dict, list)):
|
|
mutable[field] = {"omitted": True, "reason": "size_budget", "original_type": type(value).__name__}
|
|
elif isinstance(value, str):
|
|
mutable[field] = self._truncate_text(value, 500)
|
|
else:
|
|
mutable[field] = "[OMITTED:size_budget]"
|
|
trim_info["trimmed_fields"].append(field)
|
|
|
|
trim_info["trimmed"] = self._estimate_size_bytes(mutable) < original_size
|
|
trim_info["final_size_bytes"] = self._estimate_size_bytes(mutable)
|
|
return mutable, trim_info
|
|
|
|
def _atomic_write_json(self, target_file: Path, data: Dict[str, Any]) -> None:
|
|
target_file.parent.mkdir(parents=True, exist_ok=True)
|
|
fd, tmp_path = tempfile.mkstemp(dir=str(target_file.parent), prefix=f".{target_file.name}.", suffix=".tmp")
|
|
try:
|
|
with os.fdopen(fd, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, ensure_ascii=False, separators=(",", ":"))
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
os.replace(tmp_path, target_file)
|
|
try:
|
|
os.chmod(target_file, 0o600)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
try:
|
|
os.unlink(tmp_path)
|
|
except Exception:
|
|
pass
|
|
raise
|
|
|
|
def _update_manifest(self, context_type: str, filename: str, doc: Dict[str, Any]) -> None:
|
|
manifest_file = self._context_file(self.MANIFEST_FILENAME)
|
|
existing = {}
|
|
if manifest_file.exists():
|
|
try:
|
|
with open(manifest_file, "r", encoding="utf-8") as f:
|
|
existing = json.load(f) or {}
|
|
except Exception:
|
|
existing = {}
|
|
|
|
items = existing.get("documents") if isinstance(existing.get("documents"), list) else []
|
|
items = [i for i in items if not (isinstance(i, dict) and i.get("type") == context_type)]
|
|
items.append(
|
|
{
|
|
"type": context_type,
|
|
"path": filename,
|
|
"updated_at": doc.get("updated_at"),
|
|
"size_bytes": (doc.get("meta") or {}).get("data_size_bytes", 0) + (doc.get("meta") or {}).get("summary_size_bytes", 0),
|
|
"related_documents": (doc.get("document_context") or {}).get("related_documents", []),
|
|
}
|
|
)
|
|
|
|
manifest = {
|
|
"schema_version": self.SCHEMA_VERSION,
|
|
"user_id": str(self.user_id),
|
|
"updated_at": datetime.utcnow().isoformat(),
|
|
"documents": items,
|
|
}
|
|
self._atomic_write_json(manifest_file, manifest)
|
|
|
|
def _save_context_document(
|
|
self,
|
|
*,
|
|
filename: str,
|
|
context_type: str,
|
|
payload: Dict[str, Any],
|
|
summary: Dict[str, Any],
|
|
source: str,
|
|
journey_stage: str,
|
|
) -> bool:
|
|
try:
|
|
target_file = self._context_file(filename)
|
|
payload = payload if isinstance(payload, dict) else {}
|
|
summary = summary if isinstance(summary, dict) else {}
|
|
|
|
compact_payload, trim_info = self._shrink_payload_if_needed(payload)
|
|
payload_size = self._estimate_size_bytes(compact_payload)
|
|
summary_size = self._estimate_size_bytes(summary)
|
|
|
|
context_doc = {
|
|
"schema_version": self.SCHEMA_VERSION,
|
|
"context_type": context_type,
|
|
"user_id": str(self.user_id),
|
|
"updated_at": datetime.utcnow().isoformat(),
|
|
"source": source,
|
|
"document_context": self._build_document_context(
|
|
context_type=context_type,
|
|
source=source,
|
|
journey_stage=journey_stage,
|
|
fallback_order=["flat_file", "database", "sif_semantic"],
|
|
payload_size=payload_size,
|
|
summary_size=summary_size,
|
|
payload_within_budget=payload_size <= self.DEFAULT_MAX_BYTES,
|
|
),
|
|
"data": compact_payload,
|
|
"agent_summary": summary,
|
|
"meta": {
|
|
"data_size_bytes": payload_size,
|
|
"summary_size_bytes": summary_size,
|
|
"trim": trim_info,
|
|
},
|
|
}
|
|
|
|
self._atomic_write_json(target_file, context_doc)
|
|
self._update_manifest(context_type, filename, context_doc)
|
|
return True
|
|
except Exception as exc:
|
|
logger.error(f"Failed to save context for user {self.user_id} ({context_type}): {exc}")
|
|
return False
|
|
|
|
def save_step2_website_analysis(self, payload: Dict[str, Any], *, source: str = "onboarding_step2") -> bool:
|
|
return self._save_context_document(
|
|
filename=self.STEP2_FILENAME,
|
|
context_type="onboarding_step2_website_analysis",
|
|
payload=payload,
|
|
summary=self._build_step2_summary(payload if isinstance(payload, dict) else {}),
|
|
source=source,
|
|
journey_stage="onboarding_step_2",
|
|
)
|
|
|
|
def save_step3_research_preferences(self, payload: Dict[str, Any], *, source: str = "onboarding_step3") -> bool:
|
|
return self._save_context_document(
|
|
filename=self.STEP3_FILENAME,
|
|
context_type="onboarding_step3_research_preferences",
|
|
payload=payload,
|
|
summary=self._build_step3_summary(payload if isinstance(payload, dict) else {}),
|
|
source=source,
|
|
journey_stage="onboarding_step_3",
|
|
)
|
|
|
|
def save_step4_persona_data(self, payload: Dict[str, Any], *, source: str = "onboarding_step4") -> bool:
|
|
return self._save_context_document(
|
|
filename=self.STEP4_FILENAME,
|
|
context_type="onboarding_step4_persona_data",
|
|
payload=payload,
|
|
summary=self._build_step4_summary(payload if isinstance(payload, dict) else {}),
|
|
source=source,
|
|
journey_stage="onboarding_step_4",
|
|
)
|
|
|
|
def save_step5_integrations(self, payload: Dict[str, Any], *, source: str = "onboarding_step5") -> bool:
|
|
return self._save_context_document(
|
|
filename=self.STEP5_FILENAME,
|
|
context_type="onboarding_step5_integrations",
|
|
payload=payload,
|
|
summary=self._build_step5_summary(payload if isinstance(payload, dict) else {}),
|
|
source=source,
|
|
journey_stage="onboarding_step_5",
|
|
)
|
|
|
|
def _load_context_document(self, filename: str) -> Optional[Dict[str, Any]]:
|
|
try:
|
|
target_file = self._context_file(filename)
|
|
if not target_file.exists():
|
|
return None
|
|
with open(target_file, "r", encoding="utf-8") as f:
|
|
doc = json.load(f)
|
|
if isinstance(doc, dict) and str(doc.get("user_id")) != str(self.user_id):
|
|
logger.warning(f"Context user mismatch for {filename} (expected {self.user_id})")
|
|
return None
|
|
return doc if isinstance(doc, dict) else None
|
|
except Exception as exc:
|
|
logger.warning(f"Failed to load context document for user {self.user_id} ({filename}): {exc}")
|
|
return None
|
|
|
|
def load_context_manifest(self) -> Optional[Dict[str, Any]]:
|
|
return self._load_context_document(self.MANIFEST_FILENAME)
|
|
|
|
def load_step2_context_document(self) -> Optional[Dict[str, Any]]:
|
|
return self._load_context_document(self.STEP2_FILENAME)
|
|
|
|
def load_step2_website_analysis(self) -> Optional[Dict[str, Any]]:
|
|
doc = self.load_step2_context_document()
|
|
return doc.get("data") if isinstance(doc, dict) and isinstance(doc.get("data"), dict) else None
|
|
|
|
def load_step3_context_document(self) -> Optional[Dict[str, Any]]:
|
|
return self._load_context_document(self.STEP3_FILENAME)
|
|
|
|
def load_step3_research_preferences(self) -> Optional[Dict[str, Any]]:
|
|
doc = self.load_step3_context_document()
|
|
return doc.get("data") if isinstance(doc, dict) and isinstance(doc.get("data"), dict) else None
|
|
|
|
def load_step4_context_document(self) -> Optional[Dict[str, Any]]:
|
|
return self._load_context_document(self.STEP4_FILENAME)
|
|
|
|
def load_step4_persona_data(self) -> Optional[Dict[str, Any]]:
|
|
doc = self.load_step4_context_document()
|
|
return doc.get("data") if isinstance(doc, dict) and isinstance(doc.get("data"), dict) else None
|
|
|
|
def load_step5_context_document(self) -> Optional[Dict[str, Any]]:
|
|
return self._load_context_document(self.STEP5_FILENAME)
|
|
|
|
def load_step5_integrations(self) -> Optional[Dict[str, Any]]:
|
|
doc = self.load_step5_context_document()
|
|
return doc.get("data") if isinstance(doc, dict) and isinstance(doc.get("data"), dict) else None
|