Files
ALwrity/backend/services/intelligence/agent_flat_context.py

743 lines
33 KiB
Python

"""Flat-file context storage for AI agents.
Stores onboarding context in per-user workspace files, optimized for fast agent reads.
Includes minimal security hardening, context-size controls, and internal document linking.
"""
from __future__ import annotations
import json
import os
import tempfile
import hmac
import hashlib
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
from loguru import logger
class AgentFlatContextStore:
"""Read/write agent-only flat-file context in per-user workspace."""
CONTEXT_DIRNAME = "agent_context"
STEP2_FILENAME = "step2_website_analysis.json"
STEP3_FILENAME = "step3_research_preferences.json"
STEP4_FILENAME = "step4_persona_data.json"
STEP5_FILENAME = "step5_integrations.json"
MANIFEST_FILENAME = "context_manifest.json"
WORKSPACE_README = "README.md"
ALLOWED_CONTEXT_FILES = {
STEP2_FILENAME,
STEP3_FILENAME,
STEP4_FILENAME,
STEP5_FILENAME,
MANIFEST_FILENAME,
}
SCHEMA_VERSION = "1.3"
DEFAULT_MAX_BYTES = 300_000
SUMMARY_TEXT_LIMIT = 800
def __init__(self, user_id: str):
self.user_id = user_id
self.safe_user_id = self._sanitize_user_id(user_id)
self._ensure_workspace_permissions()
def _ensure_workspace_permissions(self) -> None:
"""Ensure workspace and context directories exist with owner-only permissions."""
workspace_dir = self._workspace_dir()
context_dir = workspace_dir / self.CONTEXT_DIRNAME
workspace_dir.mkdir(parents=True, exist_ok=True)
context_dir.mkdir(parents=True, exist_ok=True)
os.chmod(workspace_dir, 0o700)
os.chmod(context_dir, 0o700)
@staticmethod
def _safe_resolve_under(base_dir: Path, requested_path: str) -> Path:
"""Resolve path and ensure it remains inside base_dir (path sandboxing)."""
base_real = base_dir.resolve()
candidate = (base_dir / requested_path).resolve()
if candidate == base_real or base_real in candidate.parents:
return candidate
raise ValueError("Unsafe path access attempt outside sandbox")
@staticmethod
def _sanitize_user_id(user_id: str) -> str:
safe = "".join(c for c in str(user_id) if c.isalnum() or c in ("-", "_"))
return safe or "unknown_user"
def _master_salt(self) -> str:
return os.getenv("FILE_ENCRYPTION_SALT", "")
def derive_user_secret(self) -> bytes:
"""Derive deterministic per-user secret from env salt + safe user id."""
salt = self._master_salt()
if not salt:
return b""
return hmac.new(salt.encode("utf-8"), self.safe_user_id.encode("utf-8"), hashlib.sha256).digest()
def user_secret_fingerprint(self) -> str:
"""Short fingerprint used for diagnostics/audit only (not a key)."""
secret = self.derive_user_secret()
if not secret:
return "salt_not_configured"
return hashlib.sha256(secret).hexdigest()[:16]
def _audit_event(self, action: str, target: str, status: str) -> None:
logger.info(
f"[flat_context_audit] user={self.safe_user_id} action={action} target={target} status={status}"
)
def _workspace_dir(self) -> Path:
root_dir = Path(__file__).resolve().parents[3]
return root_dir / "workspace" / f"workspace_{self.safe_user_id}"
def _context_dir(self) -> Path:
return self._workspace_dir() / self.CONTEXT_DIRNAME
def _context_file(self, filename: str) -> Path:
return self._safe_resolve_under(self._context_dir(), str(filename))
def _workspace_file(self, filename: str) -> Path:
return self._safe_resolve_under(self._workspace_dir(), str(filename))
@staticmethod
def _estimate_size_bytes(value: Any) -> int:
try:
return len(json.dumps(value, ensure_ascii=False).encode("utf-8"))
except Exception:
return 0
def estimate_size_bytes(self, value: Any) -> int:
"""Public size estimate helper for adapter layers."""
return self._estimate_size_bytes(value)
@staticmethod
def _to_context_list(value: Any) -> Any:
if value is None:
return []
if isinstance(value, list):
return value
if isinstance(value, dict):
return list(value.keys())
return [str(value)]
@staticmethod
def _truncate_text(value: Any, max_chars: int = SUMMARY_TEXT_LIMIT) -> str:
text = value if isinstance(value, str) else ""
if len(text) <= max_chars:
return text
return f"{text[:max_chars]}..."
@staticmethod
def _redact_sensitive(data: Any) -> Any:
"""Minimal recursive redaction for sensitive-like keys in payload snapshots."""
sensitive_tokens = {"api_key", "token", "secret", "password", "authorization", "cookie"}
if isinstance(data, dict):
redacted = {}
for k, v in data.items():
key_lower = str(k).lower()
if any(token in key_lower for token in sensitive_tokens):
redacted[k] = "[REDACTED]"
else:
redacted[k] = AgentFlatContextStore._redact_sensitive(v)
return redacted
if isinstance(data, list):
return [AgentFlatContextStore._redact_sensitive(v) for v in data]
return data
def _related_documents(self, context_type: str) -> list:
if context_type == "onboarding_step2_website_analysis":
return [
{"type": "onboarding_step3_research_preferences", "path": self.STEP3_FILENAME, "relationship": "next_step"},
{"type": "onboarding_step4_persona_data", "path": self.STEP4_FILENAME, "relationship": "future_dependency"},
{"type": "onboarding_step5_integrations", "path": self.STEP5_FILENAME, "relationship": "future_dependency"},
]
if context_type == "onboarding_step3_research_preferences":
return [
{"type": "onboarding_step2_website_analysis", "path": self.STEP2_FILENAME, "relationship": "previous_step"},
{"type": "onboarding_step4_persona_data", "path": self.STEP4_FILENAME, "relationship": "next_step"},
{"type": "onboarding_step5_integrations", "path": self.STEP5_FILENAME, "relationship": "future_dependency"},
]
if context_type == "onboarding_step4_persona_data":
return [
{"type": "onboarding_step3_research_preferences", "path": self.STEP3_FILENAME, "relationship": "previous_step"},
{"type": "onboarding_step2_website_analysis", "path": self.STEP2_FILENAME, "relationship": "upstream_context"},
{"type": "onboarding_step5_integrations", "path": self.STEP5_FILENAME, "relationship": "next_step"},
]
if context_type == "onboarding_step5_integrations":
return [
{"type": "onboarding_step4_persona_data", "path": self.STEP4_FILENAME, "relationship": "previous_step"},
{"type": "onboarding_step3_research_preferences", "path": self.STEP3_FILENAME, "relationship": "upstream_context"},
]
return []
def _build_document_context(
self,
*,
context_type: str,
source: str,
journey_stage: str,
fallback_order: list,
payload_size: int,
summary_size: int,
payload_within_budget: bool,
) -> Dict[str, Any]:
total_size = payload_size + summary_size
return {
"audience": "ai_agents",
"purpose": "fast_context_retrieval",
"context_type": context_type,
"source": source,
"tenant": {"user_id_safe": self.safe_user_id, "isolation_scope": "workspace_user"},
"journey": {
"stage": journey_stage,
"user_action": "onboarding",
"agent_expectation": "read_summary_first_then_expand",
},
"retrieval_contract": {
"preferred": "flat_file",
"fallback_order": fallback_order,
},
"security": {
"path_sandboxing": True,
"file_permissions": "0600",
"directory_permissions": "0700",
"user_secret_fingerprint": self.user_secret_fingerprint(),
},
"context_window_guidance": {
"max_raw_bytes": self.DEFAULT_MAX_BYTES,
"total_bytes": total_size,
"raw_document_within_budget": payload_within_budget,
"agent_policy": "Use agent_summary first; open full data only for specialist tasks",
},
"related_documents": self._related_documents(context_type),
}
def _build_step2_summary(self, payload: Dict[str, Any]) -> Dict[str, Any]:
seo_audit = payload.get("seo_audit") if isinstance(payload.get("seo_audit"), dict) else {}
brand = payload.get("brand_analysis") if isinstance(payload.get("brand_analysis"), dict) else {}
rec_settings = payload.get("recommended_settings") if isinstance(payload.get("recommended_settings"), dict) else {}
target_audience = payload.get("target_audience") if isinstance(payload.get("target_audience"), dict) else {}
social = payload.get("social_media_presence") if isinstance(payload.get("social_media_presence"), dict) else {}
technical_issues = self._to_context_list(seo_audit.get("technical_issues"))
recommendations = self._to_context_list(seo_audit.get("recommendations"))
quick_facts = {
"website_url": payload.get("website_url") or "",
"brand_voice": brand.get("brand_voice") or "",
"industry": brand.get("industry") or "",
"target_segment": target_audience.get("primary_audience") or target_audience.get("audience_type") or "",
"writing_tone": rec_settings.get("writing_tone") or "",
"primary_content_type": (payload.get("content_type") or {}).get("primary_type") if isinstance(payload.get("content_type"), dict) else "",
"social_platforms": sorted(list(social.keys())),
"seo_issue_count": len(technical_issues),
"seo_recommendation_count": len(recommendations),
}
return {
"quick_facts": quick_facts,
"retrieval_hints": {
"high_signal_terms": [
term
for term in [
quick_facts.get("brand_voice"),
quick_facts.get("industry"),
quick_facts.get("writing_tone"),
quick_facts.get("primary_content_type"),
]
if term
],
"agent_queries": [
"brand voice guidelines",
"website style patterns",
"seo technical issues",
"content strategy opportunities",
"target audience profile",
],
},
"profile": {
"writing_style": payload.get("writing_style") or {},
"style_patterns": payload.get("style_patterns") or {},
"style_guidelines": payload.get("style_guidelines") or {},
"recommended_settings": rec_settings,
"target_audience": target_audience,
},
"seo_focus": {
"technical_issues": technical_issues,
"recommendations": recommendations,
},
}
def _build_step3_summary(self, payload: Dict[str, Any]) -> Dict[str, Any]:
competitors = payload.get("competitors") if isinstance(payload.get("competitors"), list) else []
domains = []
for comp in competitors[:20]:
if isinstance(comp, dict):
dom = comp.get("domain") or comp.get("url")
if dom:
domains.append(str(dom))
research_depth = payload.get("research_depth") or ""
content_types = payload.get("content_types") if isinstance(payload.get("content_types"), list) else []
industry_context = self._truncate_text(payload.get("industry_context") or payload.get("industryContext") or "", 500)
return {
"quick_facts": {
"research_depth": research_depth,
"content_types": content_types,
"auto_research": bool(payload.get("auto_research", True)),
"factual_content": bool(payload.get("factual_content", True)),
"competitor_count": len(competitors),
},
"retrieval_hints": {
"high_signal_terms": [research_depth, *content_types[:5]],
"agent_queries": [
"competitor landscape summary",
"content opportunities by competitor",
"research depth preferences",
"factual content constraints",
],
},
"competitor_focus": {
"top_competitor_domains": domains[:10],
"industry_context": industry_context,
},
}
def _build_step4_summary(self, payload: Dict[str, Any]) -> Dict[str, Any]:
core_persona = payload.get("core_persona") if isinstance(payload.get("core_persona"), dict) else {}
platform_personas = payload.get("platform_personas") if isinstance(payload.get("platform_personas"), dict) else {}
quality_metrics = payload.get("quality_metrics") if isinstance(payload.get("quality_metrics"), dict) else {}
selected_platforms = payload.get("selected_platforms") if isinstance(payload.get("selected_platforms"), list) else []
persona_name = core_persona.get("name") or core_persona.get("persona_name") or ""
primary_goal = self._truncate_text(core_persona.get("primary_goal") or core_persona.get("goal") or "", 250)
return {
"quick_facts": {
"persona_name": persona_name,
"selected_platforms": selected_platforms,
"platform_persona_count": len(platform_personas.keys()) if isinstance(platform_personas, dict) else 0,
"has_research_persona": bool(payload.get("research_persona")),
},
"retrieval_hints": {
"high_signal_terms": [persona_name, *selected_platforms[:5]],
"agent_queries": [
"core persona profile",
"platform persona adaptations",
"persona quality metrics",
"research persona defaults",
],
},
"persona_focus": {
"primary_goal": primary_goal,
"core_persona": core_persona,
"quality_metrics": quality_metrics,
},
}
def _build_step5_summary(self, payload: Dict[str, Any]) -> Dict[str, Any]:
integrations = payload.get("integrations") if isinstance(payload.get("integrations"), dict) else {}
providers = payload.get("providers") if isinstance(payload.get("providers"), list) else []
connected = [k for k, v in integrations.items() if bool(v)]
notes = self._truncate_text(payload.get("notes") or payload.get("integration_notes") or "", 300)
return {
"quick_facts": {
"connected_integrations_count": len(connected),
"connected_integrations": connected[:20],
"providers_count": len(providers),
},
"retrieval_hints": {
"high_signal_terms": connected[:5],
"agent_queries": [
"integration readiness",
"connected providers summary",
"missing integration dependencies",
],
},
"integration_focus": {
"notes": notes,
"integrations": integrations,
},
}
def _shrink_payload_if_needed(self, payload: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""Keep payload under budget by trimming heavy optional sections first."""
payload = self._redact_sensitive(payload if isinstance(payload, dict) else {})
original_size = self._estimate_size_bytes(payload)
trim_info = {"trimmed": False, "original_size_bytes": original_size, "trimmed_fields": []}
if original_size <= self.DEFAULT_MAX_BYTES:
return payload, trim_info
candidates = [
"raw_step2_payload",
"raw_analysis_payload",
"source_payload",
"crawl_result",
"competitors",
"strategic_insights_history",
"seo_audit",
]
mutable = dict(payload)
for field in candidates:
if self._estimate_size_bytes(mutable) <= self.DEFAULT_MAX_BYTES:
break
if field in mutable:
value = mutable.get(field)
if field == "competitors" and isinstance(value, list):
mutable[field] = value[:20]
elif isinstance(value, (dict, list)):
mutable[field] = {"omitted": True, "reason": "size_budget", "original_type": type(value).__name__}
elif isinstance(value, str):
mutable[field] = self._truncate_text(value, 500)
else:
mutable[field] = "[OMITTED:size_budget]"
trim_info["trimmed_fields"].append(field)
trim_info["trimmed"] = self._estimate_size_bytes(mutable) < original_size
trim_info["final_size_bytes"] = self._estimate_size_bytes(mutable)
return mutable, trim_info
def _atomic_write_json(self, target_file: Path, data: Dict[str, Any]) -> None:
target_file.parent.mkdir(parents=True, exist_ok=True)
os.chmod(target_file.parent, 0o700)
fd, tmp_path = tempfile.mkstemp(dir=str(target_file.parent), prefix=f".{target_file.name}.", suffix=".tmp")
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, separators=(",", ":"))
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, target_file)
try:
os.chmod(target_file, 0o600)
except Exception:
pass
except Exception:
try:
os.unlink(tmp_path)
except Exception:
pass
raise
def _atomic_write_text(self, target_file: Path, content: str) -> None:
target_file.parent.mkdir(parents=True, exist_ok=True)
os.chmod(target_file.parent, 0o700)
fd, tmp_path = tempfile.mkstemp(dir=str(target_file.parent), prefix=f".{target_file.name}.", suffix=".tmp")
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write(content)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, target_file)
try:
os.chmod(target_file, 0o600)
except Exception:
pass
except Exception:
try:
os.unlink(tmp_path)
except Exception:
pass
raise
@staticmethod
def _collect_signal_terms(doc: Dict[str, Any], limit: int = 6) -> list:
summary = doc.get("agent_summary") if isinstance(doc, dict) else {}
hints = summary.get("retrieval_hints") if isinstance(summary, dict) else {}
terms = hints.get("high_signal_terms") if isinstance(hints, dict) else []
if not isinstance(terms, list):
return []
normalized = [str(t).strip() for t in terms if str(t).strip()]
return normalized[:limit]
@staticmethod
def _extract_journey_stage(doc: Dict[str, Any]) -> str:
dctx = doc.get("document_context") if isinstance(doc, dict) else {}
journey = dctx.get("journey") if isinstance(dctx, dict) else {}
stage = journey.get("stage") if isinstance(journey, dict) else ""
return str(stage or "").strip()
@staticmethod
def _context_description(filename: str) -> str:
descriptions = {
AgentFlatContextStore.STEP2_FILENAME: "Primary SEO and site structure context",
AgentFlatContextStore.STEP3_FILENAME: "Research depth, competitors, and content preferences",
AgentFlatContextStore.STEP4_FILENAME: "Persona profiles, voice adaptation, and platform strategy",
AgentFlatContextStore.STEP5_FILENAME: "Connected integrations and provider readiness",
}
return descriptions.get(filename, "Context document")
def _generate_workspace_readme(self, manifest: Dict[str, Any]) -> str:
docs = manifest.get("documents") if isinstance(manifest, dict) and isinstance(manifest.get("documents"), list) else []
lines = [
"# Agent Workspace Map",
"",
"You are in a restricted read-only VFS. Use `list_context`, `read_context_file`, and `search_context` to navigate.",
"",
"## Core Context Files",
]
for item in sorted(docs, key=lambda d: str((d or {}).get("path", ""))):
if not isinstance(item, dict):
continue
path = item.get("path") or ""
if not path:
continue
doc = self._load_context_document(path) or {}
signals = self._collect_signal_terms(doc)
journey_stage = self._extract_journey_stage(doc)
updated_at = str(item.get("updated_at") or "")
lines.append(f"- `{path}`: {self._context_description(path)}.")
if signals:
lines.append(f" - **Key Signals:** {', '.join(signals)}")
if journey_stage:
lines.append(f" - **Journey Stage:** {journey_stage}")
if updated_at:
lines.append(f" - **Updated:** {updated_at}")
lines.extend(
[
"",
"## Retrieval Strategy",
"1. Run `list_context` to check which onboarding steps are available.",
"2. Run `search_context` for targeted terms (for example: \"competitor\", \"tone\", \"integrations\").",
"3. Run `read_context_file` and ingest `agent_summary` before expanding full `data`.",
"",
"## Virtual Paths",
"- `/env/summary` -> consolidated summary generated from all available context docs",
f"- `/steps/website` -> `{self.STEP2_FILENAME}`",
f"- `/steps/research` -> `{self.STEP3_FILENAME}`",
f"- `/steps/persona` -> `{self.STEP4_FILENAME}`",
f"- `/steps/integrations` -> `{self.STEP5_FILENAME}`",
]
)
return "\n".join(lines) + "\n"
def _update_workspace_readme(self, manifest: Dict[str, Any]) -> None:
try:
content = self._generate_workspace_readme(manifest)
self._atomic_write_text(self._workspace_file(self.WORKSPACE_README), content)
except Exception as exc:
logger.warning(f"Failed to update workspace README for user {self.user_id}: {exc}")
def _update_manifest(self, context_type: str, filename: str, doc: Dict[str, Any]) -> None:
manifest_file = self._context_file(self.MANIFEST_FILENAME)
existing = {}
if manifest_file.exists():
try:
with open(manifest_file, "r", encoding="utf-8") as f:
existing = json.load(f) or {}
except Exception:
existing = {}
items = existing.get("documents") if isinstance(existing.get("documents"), list) else []
items = [i for i in items if not (isinstance(i, dict) and i.get("type") == context_type)]
items.append(
{
"type": context_type,
"path": filename,
"updated_at": doc.get("updated_at"),
"size_bytes": (doc.get("meta") or {}).get("data_size_bytes", 0) + (doc.get("meta") or {}).get("summary_size_bytes", 0),
"related_documents": (doc.get("document_context") or {}).get("related_documents", []),
}
)
manifest = {
"schema_version": self.SCHEMA_VERSION,
"user_id": str(self.user_id),
"updated_at": datetime.utcnow().isoformat(),
"documents": items,
}
self._atomic_write_json(manifest_file, manifest)
self._update_workspace_readme(manifest)
def _save_context_document(
self,
*,
filename: str,
context_type: str,
payload: Dict[str, Any],
summary: Dict[str, Any],
source: str,
journey_stage: str,
) -> bool:
try:
target_file = self._context_file(filename)
payload = payload if isinstance(payload, dict) else {}
summary = summary if isinstance(summary, dict) else {}
compact_payload, trim_info = self._shrink_payload_if_needed(payload)
payload_size = self._estimate_size_bytes(compact_payload)
summary_size = self._estimate_size_bytes(summary)
context_doc = {
"schema_version": self.SCHEMA_VERSION,
"context_type": context_type,
"user_id": str(self.user_id),
"updated_at": datetime.utcnow().isoformat(),
"source": source,
"document_context": self._build_document_context(
context_type=context_type,
source=source,
journey_stage=journey_stage,
fallback_order=["flat_file", "database", "sif_semantic"],
payload_size=payload_size,
summary_size=summary_size,
payload_within_budget=payload_size <= self.DEFAULT_MAX_BYTES,
),
"data": compact_payload,
"agent_summary": summary,
"meta": {
"data_size_bytes": payload_size,
"summary_size_bytes": summary_size,
"trim": trim_info,
},
}
self._atomic_write_json(target_file, context_doc)
self._update_manifest(context_type, filename, context_doc)
self._audit_event("write_context", filename, "success")
return True
except Exception as exc:
logger.error(f"Failed to save context for user {self.user_id} ({context_type}): {exc}")
self._audit_event("write_context", filename, "error")
return False
def save_step2_website_analysis(self, payload: Dict[str, Any], *, source: str = "onboarding_step2") -> bool:
return self._save_context_document(
filename=self.STEP2_FILENAME,
context_type="onboarding_step2_website_analysis",
payload=payload,
summary=self._build_step2_summary(payload if isinstance(payload, dict) else {}),
source=source,
journey_stage="onboarding_step_2",
)
def save_step3_research_preferences(self, payload: Dict[str, Any], *, source: str = "onboarding_step3") -> bool:
return self._save_context_document(
filename=self.STEP3_FILENAME,
context_type="onboarding_step3_research_preferences",
payload=payload,
summary=self._build_step3_summary(payload if isinstance(payload, dict) else {}),
source=source,
journey_stage="onboarding_step_3",
)
def save_step4_persona_data(self, payload: Dict[str, Any], *, source: str = "onboarding_step4") -> bool:
return self._save_context_document(
filename=self.STEP4_FILENAME,
context_type="onboarding_step4_persona_data",
payload=payload,
summary=self._build_step4_summary(payload if isinstance(payload, dict) else {}),
source=source,
journey_stage="onboarding_step_4",
)
def save_step5_integrations(self, payload: Dict[str, Any], *, source: str = "onboarding_step5") -> bool:
return self._save_context_document(
filename=self.STEP5_FILENAME,
context_type="onboarding_step5_integrations",
payload=payload,
summary=self._build_step5_summary(payload if isinstance(payload, dict) else {}),
source=source,
journey_stage="onboarding_step_5",
)
def _load_context_document(self, filename: str) -> Optional[Dict[str, Any]]:
try:
if str(filename) not in self.ALLOWED_CONTEXT_FILES:
logger.warning(f"Rejected non-allowed context filename for user {self.user_id}: {filename}")
self._audit_event("read_context", str(filename), "rejected_filename")
return None
target_file = self._context_file(filename)
if not target_file.exists():
self._audit_event("read_context", str(filename), "not_found")
return None
with open(target_file, "r", encoding="utf-8") as f:
doc = json.load(f)
if isinstance(doc, dict) and str(doc.get("user_id")) != str(self.user_id):
logger.warning(f"Context user mismatch for {filename} (expected {self.user_id})")
self._audit_event("read_context", str(filename), "user_mismatch")
return None
self._audit_event("read_context", str(filename), "success")
return doc if isinstance(doc, dict) else None
except Exception as exc:
logger.warning(f"Failed to load context document for user {self.user_id} ({filename}): {exc}")
self._audit_event("read_context", str(filename), "error")
return None
def load_context_document(self, filename: str) -> Optional[Dict[str, Any]]:
"""Public loader for a named context document file."""
return self._load_context_document(filename)
def load_context_manifest(self) -> Optional[Dict[str, Any]]:
return self._load_context_document(self.MANIFEST_FILENAME)
def load_step2_context_document(self) -> Optional[Dict[str, Any]]:
return self._load_context_document(self.STEP2_FILENAME)
def load_step2_website_analysis(self) -> Optional[Dict[str, Any]]:
doc = self.load_step2_context_document()
return doc.get("data") if isinstance(doc, dict) and isinstance(doc.get("data"), dict) else None
def load_step3_context_document(self) -> Optional[Dict[str, Any]]:
return self._load_context_document(self.STEP3_FILENAME)
def load_step3_research_preferences(self) -> Optional[Dict[str, Any]]:
doc = self.load_step3_context_document()
return doc.get("data") if isinstance(doc, dict) and isinstance(doc.get("data"), dict) else None
def load_step4_context_document(self) -> Optional[Dict[str, Any]]:
return self._load_context_document(self.STEP4_FILENAME)
def load_step4_persona_data(self) -> Optional[Dict[str, Any]]:
doc = self.load_step4_context_document()
return doc.get("data") if isinstance(doc, dict) and isinstance(doc.get("data"), dict) else None
def load_step5_context_document(self) -> Optional[Dict[str, Any]]:
return self._load_context_document(self.STEP5_FILENAME)
def load_step5_integrations(self) -> Optional[Dict[str, Any]]:
doc = self.load_step5_context_document()
return doc.get("data") if isinstance(doc, dict) and isinstance(doc.get("data"), dict) else None
def generate_total_summary(self) -> Dict[str, Any]:
"""Build a lightweight consolidated summary across available context documents."""
manifest = self.load_context_manifest() or {"documents": []}
docs = manifest.get("documents") if isinstance(manifest.get("documents"), list) else []
overview = []
for item in docs:
if not isinstance(item, dict):
continue
path = str(item.get("path") or "")
if not path:
continue
doc = self._load_context_document(path) or {}
summary = doc.get("agent_summary") if isinstance(doc.get("agent_summary"), dict) else {}
quick_facts = summary.get("quick_facts") if isinstance(summary.get("quick_facts"), dict) else {}
hints = summary.get("retrieval_hints") if isinstance(summary.get("retrieval_hints"), dict) else {}
overview.append(
{
"path": path,
"context_type": doc.get("context_type"),
"updated_at": doc.get("updated_at") or item.get("updated_at"),
"journey_stage": self._extract_journey_stage(doc),
"high_signal_terms": hints.get("high_signal_terms") if isinstance(hints.get("high_signal_terms"), list) else [],
"quick_facts": quick_facts,
}
)
return {
"user_id": str(self.user_id),
"generated_at": datetime.utcnow().isoformat(),
"document_count": len(overview),
"documents": overview,
}