Files
ALwrity/backend/services/intelligence/agent_flat_context.py

529 lines
23 KiB
Python

"""Flat-file context storage for AI agents.
Stores onboarding context in per-user workspace files, optimized for fast agent reads.
Includes minimal security hardening, context-size controls, and internal document linking.
"""
from __future__ import annotations
import json
import os
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
from loguru import logger
class AgentFlatContextStore:
"""Read/write agent-only flat-file context in per-user workspace."""
CONTEXT_DIRNAME = "agent_context"
STEP2_FILENAME = "step2_website_analysis.json"
STEP3_FILENAME = "step3_research_preferences.json"
STEP4_FILENAME = "step4_persona_data.json"
STEP5_FILENAME = "step5_integrations.json"
MANIFEST_FILENAME = "context_manifest.json"
SCHEMA_VERSION = "1.3"
DEFAULT_MAX_BYTES = 300_000
SUMMARY_TEXT_LIMIT = 800
def __init__(self, user_id: str):
self.user_id = user_id
self.safe_user_id = self._sanitize_user_id(user_id)
@staticmethod
def _sanitize_user_id(user_id: str) -> str:
safe = "".join(c for c in str(user_id) if c.isalnum() or c in ("-", "_"))
return safe or "unknown_user"
def _workspace_dir(self) -> Path:
root_dir = Path(__file__).resolve().parents[3]
return root_dir / "workspace" / f"workspace_{self.safe_user_id}"
def _context_dir(self) -> Path:
return self._workspace_dir() / self.CONTEXT_DIRNAME
def _context_file(self, filename: str) -> Path:
return self._context_dir() / filename
@staticmethod
def _estimate_size_bytes(value: Any) -> int:
try:
return len(json.dumps(value, ensure_ascii=False).encode("utf-8"))
except Exception:
return 0
@staticmethod
def _to_context_list(value: Any) -> Any:
if value is None:
return []
if isinstance(value, list):
return value
if isinstance(value, dict):
return list(value.keys())
return [str(value)]
@staticmethod
def _truncate_text(value: Any, max_chars: int = SUMMARY_TEXT_LIMIT) -> str:
text = value if isinstance(value, str) else ""
if len(text) <= max_chars:
return text
return f"{text[:max_chars]}..."
@staticmethod
def _redact_sensitive(data: Any) -> Any:
"""Minimal recursive redaction for sensitive-like keys in payload snapshots."""
sensitive_tokens = {"api_key", "token", "secret", "password", "authorization", "cookie"}
if isinstance(data, dict):
redacted = {}
for k, v in data.items():
key_lower = str(k).lower()
if any(token in key_lower for token in sensitive_tokens):
redacted[k] = "[REDACTED]"
else:
redacted[k] = AgentFlatContextStore._redact_sensitive(v)
return redacted
if isinstance(data, list):
return [AgentFlatContextStore._redact_sensitive(v) for v in data]
return data
def _related_documents(self, context_type: str) -> list:
if context_type == "onboarding_step2_website_analysis":
return [
{"type": "onboarding_step3_research_preferences", "path": self.STEP3_FILENAME, "relationship": "next_step"},
{"type": "onboarding_step4_persona_data", "path": self.STEP4_FILENAME, "relationship": "future_dependency"},
{"type": "onboarding_step5_integrations", "path": self.STEP5_FILENAME, "relationship": "future_dependency"},
]
if context_type == "onboarding_step3_research_preferences":
return [
{"type": "onboarding_step2_website_analysis", "path": self.STEP2_FILENAME, "relationship": "previous_step"},
{"type": "onboarding_step4_persona_data", "path": self.STEP4_FILENAME, "relationship": "next_step"},
{"type": "onboarding_step5_integrations", "path": self.STEP5_FILENAME, "relationship": "future_dependency"},
]
if context_type == "onboarding_step4_persona_data":
return [
{"type": "onboarding_step3_research_preferences", "path": self.STEP3_FILENAME, "relationship": "previous_step"},
{"type": "onboarding_step2_website_analysis", "path": self.STEP2_FILENAME, "relationship": "upstream_context"},
{"type": "onboarding_step5_integrations", "path": self.STEP5_FILENAME, "relationship": "next_step"},
]
if context_type == "onboarding_step5_integrations":
return [
{"type": "onboarding_step4_persona_data", "path": self.STEP4_FILENAME, "relationship": "previous_step"},
{"type": "onboarding_step3_research_preferences", "path": self.STEP3_FILENAME, "relationship": "upstream_context"},
]
return []
def _build_document_context(
self,
*,
context_type: str,
source: str,
journey_stage: str,
fallback_order: list,
payload_size: int,
summary_size: int,
payload_within_budget: bool,
) -> Dict[str, Any]:
total_size = payload_size + summary_size
return {
"audience": "ai_agents",
"purpose": "fast_context_retrieval",
"context_type": context_type,
"source": source,
"tenant": {"user_id_safe": self.safe_user_id, "isolation_scope": "workspace_user"},
"journey": {
"stage": journey_stage,
"user_action": "onboarding",
"agent_expectation": "read_summary_first_then_expand",
},
"retrieval_contract": {
"preferred": "flat_file",
"fallback_order": fallback_order,
},
"context_window_guidance": {
"max_raw_bytes": self.DEFAULT_MAX_BYTES,
"total_bytes": total_size,
"raw_document_within_budget": payload_within_budget,
"agent_policy": "Use agent_summary first; open full data only for specialist tasks",
},
"related_documents": self._related_documents(context_type),
}
def _build_step2_summary(self, payload: Dict[str, Any]) -> Dict[str, Any]:
seo_audit = payload.get("seo_audit") if isinstance(payload.get("seo_audit"), dict) else {}
brand = payload.get("brand_analysis") if isinstance(payload.get("brand_analysis"), dict) else {}
rec_settings = payload.get("recommended_settings") if isinstance(payload.get("recommended_settings"), dict) else {}
target_audience = payload.get("target_audience") if isinstance(payload.get("target_audience"), dict) else {}
social = payload.get("social_media_presence") if isinstance(payload.get("social_media_presence"), dict) else {}
technical_issues = self._to_context_list(seo_audit.get("technical_issues"))
recommendations = self._to_context_list(seo_audit.get("recommendations"))
quick_facts = {
"website_url": payload.get("website_url") or "",
"brand_voice": brand.get("brand_voice") or "",
"industry": brand.get("industry") or "",
"target_segment": target_audience.get("primary_audience") or target_audience.get("audience_type") or "",
"writing_tone": rec_settings.get("writing_tone") or "",
"primary_content_type": (payload.get("content_type") or {}).get("primary_type") if isinstance(payload.get("content_type"), dict) else "",
"social_platforms": sorted(list(social.keys())),
"seo_issue_count": len(technical_issues),
"seo_recommendation_count": len(recommendations),
}
return {
"quick_facts": quick_facts,
"retrieval_hints": {
"high_signal_terms": [
term
for term in [
quick_facts.get("brand_voice"),
quick_facts.get("industry"),
quick_facts.get("writing_tone"),
quick_facts.get("primary_content_type"),
]
if term
],
"agent_queries": [
"brand voice guidelines",
"website style patterns",
"seo technical issues",
"content strategy opportunities",
"target audience profile",
],
},
"profile": {
"writing_style": payload.get("writing_style") or {},
"style_patterns": payload.get("style_patterns") or {},
"style_guidelines": payload.get("style_guidelines") or {},
"recommended_settings": rec_settings,
"target_audience": target_audience,
},
"seo_focus": {
"technical_issues": technical_issues,
"recommendations": recommendations,
},
}
def _build_step3_summary(self, payload: Dict[str, Any]) -> Dict[str, Any]:
competitors = payload.get("competitors") if isinstance(payload.get("competitors"), list) else []
domains = []
for comp in competitors[:20]:
if isinstance(comp, dict):
dom = comp.get("domain") or comp.get("url")
if dom:
domains.append(str(dom))
research_depth = payload.get("research_depth") or ""
content_types = payload.get("content_types") if isinstance(payload.get("content_types"), list) else []
industry_context = self._truncate_text(payload.get("industry_context") or payload.get("industryContext") or "", 500)
return {
"quick_facts": {
"research_depth": research_depth,
"content_types": content_types,
"auto_research": bool(payload.get("auto_research", True)),
"factual_content": bool(payload.get("factual_content", True)),
"competitor_count": len(competitors),
},
"retrieval_hints": {
"high_signal_terms": [research_depth, *content_types[:5]],
"agent_queries": [
"competitor landscape summary",
"content opportunities by competitor",
"research depth preferences",
"factual content constraints",
],
},
"competitor_focus": {
"top_competitor_domains": domains[:10],
"industry_context": industry_context,
},
}
def _build_step4_summary(self, payload: Dict[str, Any]) -> Dict[str, Any]:
core_persona = payload.get("core_persona") if isinstance(payload.get("core_persona"), dict) else {}
platform_personas = payload.get("platform_personas") if isinstance(payload.get("platform_personas"), dict) else {}
quality_metrics = payload.get("quality_metrics") if isinstance(payload.get("quality_metrics"), dict) else {}
selected_platforms = payload.get("selected_platforms") if isinstance(payload.get("selected_platforms"), list) else []
persona_name = core_persona.get("name") or core_persona.get("persona_name") or ""
primary_goal = self._truncate_text(core_persona.get("primary_goal") or core_persona.get("goal") or "", 250)
return {
"quick_facts": {
"persona_name": persona_name,
"selected_platforms": selected_platforms,
"platform_persona_count": len(platform_personas.keys()) if isinstance(platform_personas, dict) else 0,
"has_research_persona": bool(payload.get("research_persona")),
},
"retrieval_hints": {
"high_signal_terms": [persona_name, *selected_platforms[:5]],
"agent_queries": [
"core persona profile",
"platform persona adaptations",
"persona quality metrics",
"research persona defaults",
],
},
"persona_focus": {
"primary_goal": primary_goal,
"core_persona": core_persona,
"quality_metrics": quality_metrics,
},
}
def _build_step5_summary(self, payload: Dict[str, Any]) -> Dict[str, Any]:
integrations = payload.get("integrations") if isinstance(payload.get("integrations"), dict) else {}
providers = payload.get("providers") if isinstance(payload.get("providers"), list) else []
connected = [k for k, v in integrations.items() if bool(v)]
notes = self._truncate_text(payload.get("notes") or payload.get("integration_notes") or "", 300)
return {
"quick_facts": {
"connected_integrations_count": len(connected),
"connected_integrations": connected[:20],
"providers_count": len(providers),
},
"retrieval_hints": {
"high_signal_terms": connected[:5],
"agent_queries": [
"integration readiness",
"connected providers summary",
"missing integration dependencies",
],
},
"integration_focus": {
"notes": notes,
"integrations": integrations,
},
}
def _shrink_payload_if_needed(self, payload: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""Keep payload under budget by trimming heavy optional sections first."""
payload = self._redact_sensitive(payload if isinstance(payload, dict) else {})
original_size = self._estimate_size_bytes(payload)
trim_info = {"trimmed": False, "original_size_bytes": original_size, "trimmed_fields": []}
if original_size <= self.DEFAULT_MAX_BYTES:
return payload, trim_info
candidates = [
"raw_step2_payload",
"raw_analysis_payload",
"source_payload",
"crawl_result",
"competitors",
"strategic_insights_history",
"seo_audit",
]
mutable = dict(payload)
for field in candidates:
if self._estimate_size_bytes(mutable) <= self.DEFAULT_MAX_BYTES:
break
if field in mutable:
value = mutable.get(field)
if field == "competitors" and isinstance(value, list):
mutable[field] = value[:20]
elif isinstance(value, (dict, list)):
mutable[field] = {"omitted": True, "reason": "size_budget", "original_type": type(value).__name__}
elif isinstance(value, str):
mutable[field] = self._truncate_text(value, 500)
else:
mutable[field] = "[OMITTED:size_budget]"
trim_info["trimmed_fields"].append(field)
trim_info["trimmed"] = self._estimate_size_bytes(mutable) < original_size
trim_info["final_size_bytes"] = self._estimate_size_bytes(mutable)
return mutable, trim_info
def _atomic_write_json(self, target_file: Path, data: Dict[str, Any]) -> None:
target_file.parent.mkdir(parents=True, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(dir=str(target_file.parent), prefix=f".{target_file.name}.", suffix=".tmp")
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, separators=(",", ":"))
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, target_file)
try:
os.chmod(target_file, 0o600)
except Exception:
pass
except Exception:
try:
os.unlink(tmp_path)
except Exception:
pass
raise
def _update_manifest(self, context_type: str, filename: str, doc: Dict[str, Any]) -> None:
manifest_file = self._context_file(self.MANIFEST_FILENAME)
existing = {}
if manifest_file.exists():
try:
with open(manifest_file, "r", encoding="utf-8") as f:
existing = json.load(f) or {}
except Exception:
existing = {}
items = existing.get("documents") if isinstance(existing.get("documents"), list) else []
items = [i for i in items if not (isinstance(i, dict) and i.get("type") == context_type)]
items.append(
{
"type": context_type,
"path": filename,
"updated_at": doc.get("updated_at"),
"size_bytes": (doc.get("meta") or {}).get("data_size_bytes", 0) + (doc.get("meta") or {}).get("summary_size_bytes", 0),
"related_documents": (doc.get("document_context") or {}).get("related_documents", []),
}
)
manifest = {
"schema_version": self.SCHEMA_VERSION,
"user_id": str(self.user_id),
"updated_at": datetime.utcnow().isoformat(),
"documents": items,
}
self._atomic_write_json(manifest_file, manifest)
def _save_context_document(
self,
*,
filename: str,
context_type: str,
payload: Dict[str, Any],
summary: Dict[str, Any],
source: str,
journey_stage: str,
) -> bool:
try:
target_file = self._context_file(filename)
payload = payload if isinstance(payload, dict) else {}
summary = summary if isinstance(summary, dict) else {}
compact_payload, trim_info = self._shrink_payload_if_needed(payload)
payload_size = self._estimate_size_bytes(compact_payload)
summary_size = self._estimate_size_bytes(summary)
context_doc = {
"schema_version": self.SCHEMA_VERSION,
"context_type": context_type,
"user_id": str(self.user_id),
"updated_at": datetime.utcnow().isoformat(),
"source": source,
"document_context": self._build_document_context(
context_type=context_type,
source=source,
journey_stage=journey_stage,
fallback_order=["flat_file", "database", "sif_semantic"],
payload_size=payload_size,
summary_size=summary_size,
payload_within_budget=payload_size <= self.DEFAULT_MAX_BYTES,
),
"data": compact_payload,
"agent_summary": summary,
"meta": {
"data_size_bytes": payload_size,
"summary_size_bytes": summary_size,
"trim": trim_info,
},
}
self._atomic_write_json(target_file, context_doc)
self._update_manifest(context_type, filename, context_doc)
return True
except Exception as exc:
logger.error(f"Failed to save context for user {self.user_id} ({context_type}): {exc}")
return False
def save_step2_website_analysis(self, payload: Dict[str, Any], *, source: str = "onboarding_step2") -> bool:
return self._save_context_document(
filename=self.STEP2_FILENAME,
context_type="onboarding_step2_website_analysis",
payload=payload,
summary=self._build_step2_summary(payload if isinstance(payload, dict) else {}),
source=source,
journey_stage="onboarding_step_2",
)
def save_step3_research_preferences(self, payload: Dict[str, Any], *, source: str = "onboarding_step3") -> bool:
return self._save_context_document(
filename=self.STEP3_FILENAME,
context_type="onboarding_step3_research_preferences",
payload=payload,
summary=self._build_step3_summary(payload if isinstance(payload, dict) else {}),
source=source,
journey_stage="onboarding_step_3",
)
def save_step4_persona_data(self, payload: Dict[str, Any], *, source: str = "onboarding_step4") -> bool:
return self._save_context_document(
filename=self.STEP4_FILENAME,
context_type="onboarding_step4_persona_data",
payload=payload,
summary=self._build_step4_summary(payload if isinstance(payload, dict) else {}),
source=source,
journey_stage="onboarding_step_4",
)
def save_step5_integrations(self, payload: Dict[str, Any], *, source: str = "onboarding_step5") -> bool:
return self._save_context_document(
filename=self.STEP5_FILENAME,
context_type="onboarding_step5_integrations",
payload=payload,
summary=self._build_step5_summary(payload if isinstance(payload, dict) else {}),
source=source,
journey_stage="onboarding_step_5",
)
def _load_context_document(self, filename: str) -> Optional[Dict[str, Any]]:
try:
target_file = self._context_file(filename)
if not target_file.exists():
return None
with open(target_file, "r", encoding="utf-8") as f:
doc = json.load(f)
if isinstance(doc, dict) and str(doc.get("user_id")) != str(self.user_id):
logger.warning(f"Context user mismatch for {filename} (expected {self.user_id})")
return None
return doc if isinstance(doc, dict) else None
except Exception as exc:
logger.warning(f"Failed to load context document for user {self.user_id} ({filename}): {exc}")
return None
def load_context_manifest(self) -> Optional[Dict[str, Any]]:
return self._load_context_document(self.MANIFEST_FILENAME)
def load_step2_context_document(self) -> Optional[Dict[str, Any]]:
return self._load_context_document(self.STEP2_FILENAME)
def load_step2_website_analysis(self) -> Optional[Dict[str, Any]]:
doc = self.load_step2_context_document()
return doc.get("data") if isinstance(doc, dict) and isinstance(doc.get("data"), dict) else None
def load_step3_context_document(self) -> Optional[Dict[str, Any]]:
return self._load_context_document(self.STEP3_FILENAME)
def load_step3_research_preferences(self) -> Optional[Dict[str, Any]]:
doc = self.load_step3_context_document()
return doc.get("data") if isinstance(doc, dict) and isinstance(doc.get("data"), dict) else None
def load_step4_context_document(self) -> Optional[Dict[str, Any]]:
return self._load_context_document(self.STEP4_FILENAME)
def load_step4_persona_data(self) -> Optional[Dict[str, Any]]:
doc = self.load_step4_context_document()
return doc.get("data") if isinstance(doc, dict) and isinstance(doc.get("data"), dict) else None
def load_step5_context_document(self) -> Optional[Dict[str, Any]]:
return self._load_context_document(self.STEP5_FILENAME)
def load_step5_integrations(self) -> Optional[Dict[str, Any]]:
doc = self.load_step5_context_document()
return doc.get("data") if isinstance(doc, dict) and isinstance(doc.get("data"), dict) else None