diff --git a/backend/docs/AGENT_FLAT_CONTEXT_REVIEW.md b/backend/docs/AGENT_FLAT_CONTEXT_REVIEW.md new file mode 100644 index 00000000..9682e4dc --- /dev/null +++ b/backend/docs/AGENT_FLAT_CONTEXT_REVIEW.md @@ -0,0 +1,197 @@ +# Agent Flat-File Context System Review + +## Scope +This review documents the **current implementation** of ALwrity's onboarding flat-file context system and compares it to the proposed **Direct-to-File Virtual Shell (VFS)** model. + +--- + +## 1) Present Implementation (What Exists Today) + +### 1.1 Storage model +- Context is stored per user under: + - `backend/workspace/workspace_/agent_context/` +- Files are JSON documents, one per onboarding domain: + - `step2_website_analysis.json` + - `step3_research_preferences.json` + - `step4_persona_data.json` + - `step5_integrations.json` + - `context_manifest.json` + +### 1.2 Writer and reader +- `AgentFlatContextStore` is the core component that: + - sanitizes user IDs for path safety, + - writes documents atomically (`tempfile` + `os.replace`), + - sets restrictive file permissions (`0600` best effort), + - generates structured `agent_summary` objects, + - updates a manifest index of available documents. +- Data is loaded by direct file reads from the same class (`load_stepX_context_document`). + +### 1.3 Read-path fallback chain +`SIFIntegrationService` uses a strict fallback sequence for onboarding context retrieval: +1. **flat file** (`AgentFlatContextStore`) +2. **database** (`WebsiteAnalysis`, `ResearchPreferences`, `PersonaData`, etc.) +3. **SIF semantic index** (`TxtaiIntelligenceService.search`) + +Step 5 uses `flat_file -> sif_semantic`. + +### 1.4 Producer flow (onboarding persistence) +`StepManagementService` persists canonical snapshots to flat context when onboarding steps are saved: +- Step 2 website analysis +- Step 3 research preferences (and later competitor-enriched refresh) +- Step 4 persona data +- Step 5 integrations + +### 1.5 Context optimization currently implemented +- Sensitive-key redaction in nested payloads (`api_key`, `token`, `secret`, etc.). +- Size budgeting with trimming (`DEFAULT_MAX_BYTES = 300_000`) and trim metadata. +- Generated summaries include: + - quick facts, + - retrieval hints (high-signal terms and suggested agent queries), + - domain-specific focus blocks. +- Document context includes audience, retrieval contract, journey stage, related documents, and context-window guidance. + +--- + +## 2) Comparison vs Proposed Direct-to-File VFS + +## Strong alignment +The current system already matches the proposal in important ways: +- **Direct-to-file persistence** instead of DB-backed retrieval for fast reads. +- **Manifest/index concept** (`context_manifest.json`) that can act like a precomputed path map. +- **Agent-first retrieval semantics** (summary-first contract and fallback policy). +- **Operational safety controls** (atomic writes, redaction, path sanitization). + +## Gaps vs full virtual shell abstraction +The following pieces are not fully implemented as described in your proposed architecture: +- No explicit **virtual shell provider** (`IFileSystem`) exposing `ls/cat/grep/find` commands. +- No always-live, process-level **in-memory `Map`** for path lookups. +- No native glob/query command layer for agent shell UX. +- Not currently **read-only enforced at API surface** (writes are intentionally allowed by onboarding services to refresh context). + +--- + +## 3) Practical Recommendation: Incremental VFS Evolution + +1. **Introduce a read-only VFS facade for agents** + - Keep `AgentFlatContextStore` as the write path for trusted onboarding services. + - Add `AgentContextVFS` read adapter exposing: + - `ls(path)` from manifest, + - `cat(path)` mapped to underlying JSON, + - `find(glob)` on virtual keys, + - `grep(query)` with path prefilter + stream scan. + +2. **Promote manifest to a first-class path map** + - Build and cache an in-memory map on service startup or first access. + - Refresh map when manifest `updated_at` changes. + +3. **Add explicit write policy boundaries** + - Agent-facing interface: hard read-only (`EROFS`). + - Internal system service interface: allow writes for onboarding synchronization. + +4. **Metadata strategy for grep ranking** + - Prioritize in order: + 1) `agent_summary.quick_facts` + 2) `agent_summary.retrieval_hints.high_signal_terms` + 3) `document_context.context_type` and `journey.stage` + 4) full `data` body + +--- + +## 4) Response to the Metadata Header Question + +> "Does your current `.txt` optimization include specific metadata headers (like YAML frontmatter) that the grep tool should prioritize?" + +For this implementation, context is currently persisted as structured JSON (not `.txt` with YAML frontmatter). Equivalent high-value metadata already exists and should be prioritized for search/ranking: +- `context_type` +- `updated_at` +- `agent_summary.quick_facts` +- `agent_summary.retrieval_hints.high_signal_terms` +- `document_context.journey.stage` +- `document_context.related_documents` + +If you later move to `.txt` transport files, mirror these as frontmatter fields to preserve retrieval quality. + +--- + +## 5) Bottom line +Your current onboarding flat-file context implementation is already a strong "shim" architecture and close to the proposed model. The biggest missing piece is a dedicated virtual-shell read interface (`ls/cat/grep/find`) backed by a persistent path-map cache and a clear read-only contract for agent execution contexts. + +--- + +## 6) Implemented Follow-up (VFS Adapter + Workspace Guide) + +The following enhancements are now implemented: + +1. **Auto-generated workspace map** + - The system now generates `workspace_/README.md` whenever `context_manifest.json` is updated. + - The README includes: + - available context files, + - key signal hints from `agent_summary.retrieval_hints.high_signal_terms`, + - journey-stage hints, + - virtual path mappings and retrieval strategy guidance. + +2. **Read-only VFS facade** + - Added `AgentContextVFS` with: + - `list_context()` (`ls` equivalent), + - `search_context()` (`grep` equivalent; prioritizes `high_signal_terms` and `quick_facts`), + - `read_context_file()` (`cat` equivalent; large-file summary mode + subkey drilldown), + - explicit write rejection (`EROFS`). + +3. **Virtual path support** + - `/env/summary` maps to `AgentFlatContextStore.generate_total_summary()`. + - `/steps/website`, `/steps/research`, `/steps/persona`, `/steps/integrations` map to step documents. + +4. **System-prompt helper** + - Added `build_filesystem_header(user_id)` to inject a compact file availability + priority hint block into agent startup prompts. + +5. **Merged context helper in SIF integration** + - `SIFIntegrationService.get_merged_flat_context()` now provides a unified view across all available flat files while preserving existing per-step retrieval methods. + +6. **Basic file-level security hardening** + - Workspace and context directories are now explicitly forced to `0700`. + - Context and workspace files are written with strict `0600`. + - Added path sandboxing to ensure requested paths cannot escape user workspace roots. + - Restricted context-file loading to an allowlist of known onboarding context documents. + - Added deterministic per-user secret derivation from `.env` (`FILE_ENCRYPTION_SALT` + `safe_user_id`) with non-sensitive fingerprints for audit/debug and future encryption-at-rest rollout. + +7. **Tool-logic enhancement (coarse-to-fine search)** + - `search_context` now performs a two-pass retrieval: + 1) high-relevance summary match pass (`high_signal_terms`, `quick_facts`), + 2) parallelized stream scan pass over sandboxed allowlisted files for supporting details. + - Results include relevance labels, snippets, and line numbers for body matches. + - Large-result behavior now reports truncation guidance (show top 10 and suggest narrower keywords). + - `inspect_file` now provides token-saving behavior: full return for small files, or `agent_summary` + top-level keys for larger files, with key-level zoom-in support. + +8. **Retrieval robustness roadmap (next hardening phase)** + - **Query normalization:** Add synonym expansion and typo-tolerant matching (e.g., `tone` ≈ `brand voice`) before coarse/fine passes. + - **Confidence scoring:** Return confidence tiers that blend source freshness (`updated_at`), summary-match strength, and match density. + - **Field-aware boosting:** Weight matches by field priority (`high_signal_terms` > `quick_facts` > `data`) and document recency. + - **Deduplicated evidence:** Collapse repeated hits from the same file/key into one clustered result with a single best snippet and hit count. + - **Fallback query reformulation:** If zero hits, automatically retry with narrow/expanded variants and return attempted queries. + - **Answerability contract:** Add a lightweight `can_answer` signal in search responses so orchestrators can decide whether to ask follow-up questions or fetch more context. + - **Evaluation harness:** Track retrieval metrics over golden queries (`precision@k`, `MRR`, zero-hit rate, stale-hit rate) in CI to prevent relevance regressions. + +9. **Collaborative VFS namespace (shared memory mode)** + - Added optional `project_id` support to `AgentContextVFS` with isolated root: `workspace/project_/`. + - Introduced `scratchpad/` for collaborative writes while keeping onboarding `agent_context` read-first. + - Added `write_shared_note(...)` with advisory locking (`flock`) and strict filename/path validation. + - Added append-only `activity_log.jsonl` via `append_activity_log(...)` for watchdog/event-driven coordination. + - Maintains owner-only permissions (`0700` scratchpad dir, `0600` files) and audit trails for shared writes. + +10. **Testing readiness upgrades** + - Added automated tests for: + - query reformulation + `can_answer` behavior in `search_context`, + - large-file progressive disclosure behavior in `inspect_file`, + - collaborative write path (`write_shared_note`) and append-only activity logging. + - Test module: `backend/tests/test_agent_context_vfs.py`. + - These tests provide a baseline regression harness for VFS retrieval quality and shared-memory safety. + +11. **Static + Structural retrieval hardening** + - Added a **static triage layer** in `search_context`: + - keyword-density scoring, + - `low_probability` flags for likely-noisy hits, + - `triage_top5` shortlist for router-style pre-filtering. + - Added `read_struct(filename, path_query)`: + - resolves dot/bracket JSON paths to return node-level data only, + - includes lightweight dependency injection (e.g., Step 4 persona reads include Step 2 brand voice context when available), + - keeps output token-efficient for downstream agents. diff --git a/backend/services/intelligence/agent_context_vfs.py b/backend/services/intelligence/agent_context_vfs.py new file mode 100644 index 00000000..5da5f8ba --- /dev/null +++ b/backend/services/intelligence/agent_context_vfs.py @@ -0,0 +1,745 @@ +"""Read-only virtual filesystem facade for agent flat context documents. + +This adapter provides shell-like primitives (`list_context`, `search_context`, +`read_context_file`) over the JSON documents managed by AgentFlatContextStore. +""" + +from __future__ import annotations + +import json +import re +import os +import fcntl +from concurrent.futures import ThreadPoolExecutor, as_completed +from collections import deque +from fnmatch import fnmatch +from pathlib import Path +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional, Tuple + +from loguru import logger + +from services.intelligence.agent_flat_context import AgentFlatContextStore + + +class SmartGrepEngine: + """Streaming grep engine with regex fallback and contextual snippets.""" + + def __init__(self, context_window: int = 1): + self.context_window = max(0, int(context_window)) + + @staticmethod + def _compile_pattern(pattern: str) -> re.Pattern: + try: + return re.compile(pattern, re.IGNORECASE) + except re.error: + return re.compile(re.escape(pattern), re.IGNORECASE) + + @staticmethod + def _truncate(text: str, limit: int = 180) -> str: + text = " ".join(text.split()) + if len(text) <= limit: + return text + return text[:limit] + "..." + + def stream_file(self, file_path: Path, pattern: str, *, path_label: str) -> List[Dict[str, Any]]: + regex = self._compile_pattern(pattern) + matches: List[Dict[str, Any]] = [] + prev = deque(maxlen=self.context_window) + active: List[Dict[str, Any]] = [] + + with open(file_path, "r", encoding="utf-8", errors="ignore") as f: + for line_no, line in enumerate(f, start=1): + # Fill trailing context for active matches. + for item in active: + if item["remaining_after"] > 0: + item["after"].append(line.rstrip("\n")) + item["remaining_after"] -= 1 + + # Detect a new match on current line. + if regex.search(line): + current = line.rstrip("\n") + record = { + "path": path_label, + "line": line_no, + "before": list(prev), + "match_line": current, + "after": [], + "remaining_after": self.context_window, + } + active.append(record) + matches.append(record) + + prev.append(line.rstrip("\n")) + + formatted: List[Dict[str, Any]] = [] + for m in matches: + snippet_parts = [*m["before"], m["match_line"], *m["after"]] + snippet = self._truncate(" | ".join([p for p in snippet_parts if p is not None])) + line_l = m["match_line"].lower() + is_high_signal = any(k in line_l for k in ("agent_summary", "high_signal_terms", "quick_facts")) + formatted.append( + { + "path": m["path"], + "line": m["line"], + "snippet": snippet, + "relevance": "High Relevance" if is_high_signal else "Supporting Detail", + "reason": "matched summary field in stream" if is_high_signal else "matched streamed body line", + "score": 70 if is_high_signal else 50, + } + ) + return formatted + + +class AgentContextVFS: + """Read-only adapter that maps virtual paths to flat context documents.""" + + VIRTUAL_MAP = { + "/steps/website": AgentFlatContextStore.STEP2_FILENAME, + "/steps/research": AgentFlatContextStore.STEP3_FILENAME, + "/steps/persona": AgentFlatContextStore.STEP4_FILENAME, + "/steps/integrations": AgentFlatContextStore.STEP5_FILENAME, + } + HIGH_SIGNAL_MARKERS = ("agent_summary", "high_signal_terms", "quick_facts", "context_type") + + def __init__(self, user_id: str, project_id: Optional[str] = None): + self.user_id = user_id + self.project_id = project_id + self.store = AgentFlatContextStore(user_id) + self.grep_engine = SmartGrepEngine(context_window=1) + + @staticmethod + def _safe_slug(value: Optional[str], fallback: str) -> str: + raw = str(value or "").strip() + safe = "".join(c for c in raw if c.isalnum() or c in ("-", "_")) + return safe or fallback + + def _manifest_docs(self) -> List[Dict[str, Any]]: + manifest = self.store.load_context_manifest() or {"documents": []} + docs = manifest.get("documents") + return docs if isinstance(docs, list) else [] + + def _workspace_root(self) -> Path: + if self.project_id: + root_dir = Path(__file__).resolve().parents[3] + safe_project = self._safe_slug(self.project_id, "default_project") + project_root = root_dir / "workspace" / f"project_{safe_project}" + project_root.mkdir(parents=True, exist_ok=True) + os.chmod(project_root, 0o700) + return project_root + return self.store._workspace_dir() + + def _scratchpad_dir(self) -> Path: + scratch = self._workspace_root() / "scratchpad" + scratch.mkdir(parents=True, exist_ok=True) + os.chmod(scratch, 0o700) + return scratch + + def _allowlisted_workspace_files(self) -> List[Path]: + """Return sandboxed files eligible for streaming search.""" + files: List[Path] = [] + workspace = self._workspace_root() + context_dir = self.store._context_dir() + + # 1) manifest-backed onboarding context files + for item in self._manifest_docs(): + if not isinstance(item, dict): + continue + rel = str(item.get("path") or "") + if not rel: + continue + try: + candidate = self.store._safe_resolve_under(context_dir, rel) + if candidate.exists() and candidate.is_file(): + files.append(candidate) + except Exception: + continue + + # 2) workspace text artifacts (README, operator notes, etc.) + for candidate in workspace.glob("*.txt"): + if candidate.is_file(): + files.append(candidate.resolve()) + readme = workspace / "README.md" + if readme.exists() and readme.is_file(): + files.append(readme.resolve()) + + # dedupe + seen = set() + unique: List[Path] = [] + for p in files: + rp = str(p) + if rp in seen: + continue + seen.add(rp) + unique.append(p) + return unique + + @staticmethod + def _query_variants(query: str) -> List[str]: + """Generate normalized and synonym-expanded query variants.""" + base = (query or "").strip().lower() + if not base: + return [] + synonyms = { + "tone": ["brand voice", "writing tone"], + "voice": ["brand voice", "writing style"], + "competitor": ["competition", "rival"], + "seo": ["search", "metadata"], + "persona": ["audience profile", "target audience"], + } + variants = [base] + tokens = base.split() + for idx, tok in enumerate(tokens): + if tok in synonyms: + for repl in synonyms[tok]: + new_tokens = tokens.copy() + new_tokens[idx] = repl + variants.append(" ".join(new_tokens)) + variants.extend([base.replace("-", " "), base.replace("_", " ")]) + # dedupe, preserve order + seen = set() + out: List[str] = [] + for v in variants: + vv = v.strip() + if not vv or vv in seen: + continue + seen.add(vv) + out.append(vv) + return out + + @staticmethod + def _freshness_score(updated_at: Optional[str]) -> float: + if not updated_at: + return 0.3 + try: + from datetime import datetime, timezone + + ts = datetime.fromisoformat(str(updated_at).replace("Z", "+00:00")) + if ts.tzinfo is None: + ts = ts.replace(tzinfo=timezone.utc) + days = max(0.0, (datetime.now(timezone.utc) - ts).total_seconds() / 86400.0) + if days <= 1: + return 1.0 + if days <= 7: + return 0.9 + if days <= 30: + return 0.75 + if days <= 90: + return 0.6 + return 0.4 + except Exception: + return 0.3 + + def _cluster_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Deduplicate repeated hits by file + reason and keep strongest evidence.""" + buckets: Dict[Tuple[str, str], Dict[str, Any]] = {} + for r in results: + path = str(r.get("path") or "") + reason = str(r.get("reason") or "") + key = (path, reason) + existing = buckets.get(key) + if not existing: + buckets[key] = {**r, "hit_count": 1} + continue + existing["hit_count"] = int(existing.get("hit_count", 1)) + 1 + if int(r.get("score", 0)) > int(existing.get("score", 0)): + existing.update({k: v for k, v in r.items() if k != "hit_count"}) + existing["hit_count"] = int(existing.get("hit_count", 1)) + clustered = list(buckets.values()) + clustered.sort(key=lambda r: (-int(r.get("score", 0)), str(r.get("path") or ""))) + return clustered + + def _keyword_density(self, snippet: str, query: str) -> float: + if not snippet or not query: + return 0.0 + query_tokens = [t for t in query.lower().split() if t] + if not query_tokens: + return 0.0 + text = snippet.lower() + hits = sum(text.count(tok) for tok in query_tokens) + words = max(1, len(text.split())) + return hits / words + + def _static_triage(self, results: List[Dict[str, Any]], query: str) -> List[Dict[str, Any]]: + """Semgrep-style static heuristic triage before main agent consumption.""" + triaged: List[Dict[str, Any]] = [] + for r in results: + snippet = str(r.get("snippet") or "") + density = self._keyword_density(snippet, query) + marker_hit = any(marker in snippet.lower() for marker in self.HIGH_SIGNAL_MARKERS) + low_probability = bool(density < 0.01 and not marker_hit) + item = dict(r) + item["keyword_density"] = round(density, 4) + item["low_probability"] = low_probability + triaged.append(item) + triaged.sort( + key=lambda x: ( + bool(x.get("low_probability")), + -float(x.get("confidence", 0)), + -int(x.get("score", 0)), + ) + ) + return triaged + + @staticmethod + def _llm_router_stub(results: List[Dict[str, Any]], top_k: int = 5) -> List[Dict[str, Any]]: + """Fast local triage stub (drop low-probability first; keep strongest candidates).""" + ranked = sorted( + results, + key=lambda x: ( + bool(x.get("low_probability")), + -float(x.get("confidence", 0)), + -int(x.get("score", 0)), + ), + ) + return ranked[: max(1, top_k)] + + @staticmethod + def _resolve_json_path(data: Any, path_query: str) -> Any: + """Resolve dot/bracket JSON path such as 'data.seo_audit.recommendations[0]'.""" + if not path_query: + return data + + current = data + query = path_query.strip() + parts: List[str] = [] + buf = "" + in_brackets = False + for ch in query: + if ch == "." and not in_brackets: + if buf: + parts.append(buf) + buf = "" + continue + if ch == "[": + in_brackets = True + elif ch == "]": + in_brackets = False + buf += ch + if buf: + parts.append(buf) + + for part in parts: + if "[" in part and part.endswith("]"): + key, idx_raw = part.split("[", 1) + idx = int(idx_raw[:-1]) + if key: + if not isinstance(current, dict): + raise KeyError(key) + current = current[key] + if not isinstance(current, list): + raise IndexError(idx) + current = current[idx] + else: + if not isinstance(current, dict): + raise KeyError(part) + current = current[part] + return current + + def _resolve_path(self, path: str) -> Tuple[str, Optional[str]]: + normalized = (path or "").strip() + if not normalized: + return "", None + if normalized == "/env/summary": + return "virtual_summary", None + if normalized in self.VIRTUAL_MAP: + return "file", self.VIRTUAL_MAP[normalized] + if ".." in normalized or "\\" in normalized: + return "", None + if normalized.startswith("/"): + candidate = normalized.rsplit("/", 1)[-1] + else: + candidate = normalized + if "/" in candidate: + return "", None + allowed = AgentFlatContextStore.ALLOWED_CONTEXT_FILES - {AgentFlatContextStore.MANIFEST_FILENAME} + if candidate not in allowed: + return "", None + return "file", candidate + + def list_context(self) -> Dict[str, Any]: + """List available context files (ls-equivalent).""" + docs = self._manifest_docs() + items = [] + for d in docs: + if not isinstance(d, dict): + continue + items.append( + { + "path": d.get("path"), + "type": d.get("type"), + "updated_at": d.get("updated_at"), + "size_bytes": d.get("size_bytes", 0), + } + ) + items.sort(key=lambda x: str(x.get("path") or "")) + result = { + "workspace_hint": "Use this list to see which onboarding steps are complete.", + "tip": "Use `search_context` to find specific keywords across all steps.", + "virtual_paths": ["/env/summary", *sorted(self.VIRTUAL_MAP.keys())], + "files": items, + "collaboration": { + "scratchpad_dir": str(self._scratchpad_dir()), + "activity_log": "scratchpad/activity_log.jsonl", + }, + } + logger.info(f"[vfs_audit] user={self.store.safe_user_id} action=list_context files={len(items)}") + return result + + @staticmethod + def _flatten_strings(data: Any, limit: int = 2000) -> str: + pieces: List[str] = [] + + def walk(v: Any) -> None: + if len(pieces) >= limit: + return + if isinstance(v, dict): + for key, value in v.items(): + pieces.append(str(key)) + walk(value) + elif isinstance(v, list): + for item in v: + walk(item) + elif isinstance(v, (str, int, float, bool)): + pieces.append(str(v)) + + walk(data) + return " ".join(pieces) + + @staticmethod + def _extract_search_fields(doc: Dict[str, Any]) -> Tuple[List[str], Dict[str, Any], str]: + summary = doc.get("agent_summary") if isinstance(doc.get("agent_summary"), dict) else {} + hints = summary.get("retrieval_hints") if isinstance(summary.get("retrieval_hints"), dict) else {} + quick_facts = summary.get("quick_facts") if isinstance(summary.get("quick_facts"), dict) else {} + high_terms = hints.get("high_signal_terms") if isinstance(hints.get("high_signal_terms"), list) else [] + body = AgentContextVFS._flatten_strings(doc.get("data") if isinstance(doc.get("data"), dict) else {}) + return [str(t).lower() for t in high_terms], quick_facts, body.lower() + + def search_context(self, query: str, *, limit: int = 10, path_glob: Optional[str] = None) -> Dict[str, Any]: + """Smart grep with coarse-to-fine ranking and parallel stream scans.""" + normalized = (query or "").strip() + if not normalized: + return {"query": query, "results": []} + self.store._audit_event("vfs_search", normalized, "started") + try: + variants = self._query_variants(normalized) + attempted_queries: List[str] = [] + scored: List[Dict[str, Any]] = [] + + for candidate_query in variants: + attempted_queries.append(candidate_query) + needle = candidate_query.lower() + + # Pass 1: summary-first ranking (high relevance) + docs = self._manifest_docs() + variant_scored: List[Dict[str, Any]] = [] + for item in docs: + if not isinstance(item, dict): + continue + path = str(item.get("path") or "") + if not path: + continue + if path_glob and not fnmatch(path, path_glob): + continue + doc = self.store.load_context_document(path) or {} + high_terms, quick_facts, _ = self._extract_search_fields(doc) + + high_match = any(needle in term for term in high_terms) + quick_match = any(needle in str(v).lower() for v in quick_facts.values()) if isinstance(quick_facts, dict) else False + if not (high_match or quick_match): + continue + + score = 100 if high_match else 80 + reason = "matched high_signal_terms" if high_match else "matched quick_facts" + variant_scored.append( + { + "path": path, + "line": None, + "snippet": f"{reason}: {candidate_query}"[:100], + "type": item.get("type"), + "updated_at": item.get("updated_at"), + "relevance": "High Relevance", + "reason": reason, + "score": score, + } + ) + + # Pass 2: parallelized stream scan over allowlisted workspace files. + allowlisted = self._allowlisted_workspace_files() + body_matches: List[Dict[str, Any]] = [] + if allowlisted: + with ThreadPoolExecutor(max_workers=min(8, max(1, len(allowlisted)))) as pool: + future_map = {} + for p in allowlisted: + path_label = p.name + if path_glob and not fnmatch(path_label, path_glob): + continue + future = pool.submit(self.grep_engine.stream_file, p, candidate_query, path_label=path_label) + future_map[future] = path_label + + for future in as_completed(future_map): + try: + body_matches.extend(future.result() or []) + except Exception: + continue + + variant_scored.extend(body_matches) + if variant_scored: + scored = variant_scored + break + + scored = self._cluster_results(scored) + + # Add confidence based on score + freshness + hit density. + for r in scored: + base = min(1.0, max(0.0, float(r.get("score", 0)) / 100.0)) + freshness = self._freshness_score(r.get("updated_at")) + density = min(1.0, 0.2 + (int(r.get("hit_count", 1)) * 0.1)) + confidence = round((base * 0.6) + (freshness * 0.25) + (density * 0.15), 3) + r["confidence"] = confidence + + scored.sort(key=lambda r: (-int(r.get("score", 0)), str(r.get("path") or ""))) + matched_files = sorted({str(r.get("path") or "") for r in scored if r.get("path")}) + capped_results = scored[: max(1, limit)] + notice = None + if len(matched_files) > 10: + notice = f"Found {len(matched_files)} matches. Showing top 10. Use a more specific keyword to narrow down." + capped_results = scored[:10] + + # Token/length budgeting (~2000 tokens ~= ~8000 chars). + budget_chars = 8000 + bounded_results = [] + used = 0 + for r in capped_results: + snippet = str(r.get("snippet") or "") + cost = len(snippet) + 120 # account for metadata fields + if bounded_results and used + cost > budget_chars: + break + bounded_results.append(r) + used += cost + + result = { + "query": normalized, + "attempted_queries": attempted_queries, + "matched_files_count": len(matched_files), + "results": self._static_triage(bounded_results, normalized), + "notice": notice, + "char_budget_used": used, + "can_answer": bool(bounded_results), + } + result["triage_top5"] = self._llm_router_stub(result["results"], top_k=5) + logger.info( + f"[vfs_audit] user={self.store.safe_user_id} action=search_context query={normalized!r} results={len(result['results'])}" + ) + self.store._audit_event("vfs_search", normalized, f"success_{len(result['results'])}_hits") + return result + except Exception as exc: + self.store._audit_event("vfs_search", normalized, f"failed_{exc.__class__.__name__}") + return {"query": normalized, "matched_files_count": 0, "results": [], "notice": "Search failed.", "can_answer": False} + + @staticmethod + def _strip_technical_metadata(doc: Dict[str, Any]) -> Dict[str, Any]: + sanitized = { + "context_type": doc.get("context_type"), + "updated_at": doc.get("updated_at"), + "journey": ((doc.get("document_context") or {}).get("journey") or {}) if isinstance(doc.get("document_context"), dict) else {}, + "agent_summary": doc.get("agent_summary") if isinstance(doc.get("agent_summary"), dict) else {}, + "data": doc.get("data") if isinstance(doc.get("data"), dict) else {}, + } + return sanitized + + def inspect_file(self, path: str, *, key: Optional[str] = None, small_file_bytes: int = 5 * 1024) -> Dict[str, Any]: + """Smart reader (cat/head equivalent) with summary-first behavior.""" + kind, resolved = self._resolve_path(path) + if kind == "virtual_summary": + result = { + "path": "/env/summary", + "mode": "summary", + "data": self.store.generate_total_summary(), + } + logger.info(f"[vfs_audit] user={self.store.safe_user_id} action=read_context_file path=/env/summary mode=summary") + return result + + if not resolved: + logger.info(f"[vfs_audit] user={self.store.safe_user_id} action=read_context_file path={path!r} status=rejected") + return {"error": "File not found", "path": path} + + # JSON context doc path + doc = self.store.load_context_document(resolved) + if doc: + view = self._strip_technical_metadata(doc) + data = view.get("data") if isinstance(view.get("data"), dict) else {} + raw_size = self.store.estimate_size_bytes(view) + + if key: + if key in data: + result = { + "path": resolved, + "mode": "key", + "key": key, + "agent_summary": view.get("agent_summary"), + "data": data.get(key), + } + logger.info(f"[vfs_audit] user={self.store.safe_user_id} action=inspect_file path={resolved} mode=key") + return result + logger.info( + f"[vfs_audit] user={self.store.safe_user_id} action=inspect_file path={resolved} mode=key_missing key={key}" + ) + return { + "path": resolved, + "mode": "key_missing", + "key": key, + "available_keys": sorted(list(data.keys())), + "message": "Requested key not found. Choose one of available_keys.", + } + + if raw_size <= small_file_bytes: + result = { + "path": resolved, + "mode": "full", + "data": view, + } + logger.info(f"[vfs_audit] user={self.store.safe_user_id} action=inspect_file path={resolved} mode=full") + return result + + result = { + "path": resolved, + "mode": "summary_plus_keys", + "size_bytes": raw_size, + "agent_summary": view.get("agent_summary"), + "keys": sorted(list(data.keys())), + "message": "File is large. Re-run with key to inspect a specific section.", + } + logger.info(f"[vfs_audit] user={self.store.safe_user_id} action=inspect_file path={resolved} mode=summary_plus_keys") + return result + + logger.info(f"[vfs_audit] user={self.store.safe_user_id} action=inspect_file path={resolved} status=not_found") + return {"error": "File not found", "path": path, "resolved": resolved} + + def read_context_file(self, path: str, *, subkey: Optional[str] = None) -> Dict[str, Any]: + """Backward-compatible alias for inspect_file.""" + return self.inspect_file(path, key=subkey) + + def write_context_file(self, *_args: Any, **_kwargs: Any) -> None: + """Disallow writes from the agent-facing VFS.""" + raise OSError("EROFS: read-only file system") + + # Backward-compat function name requested in design docs. + inspect = inspect_file + + def write_shared_note(self, note: str, *, agent_id: str = "agent", filename: str = "collaboration.md") -> Dict[str, Any]: + """Append a shared project note with advisory locking in scratchpad.""" + safe_name = Path(filename).name + if safe_name != filename or ".." in filename or "/" in filename or "\\" in filename: + self.store._audit_event("write_shared_note", filename, "rejected_filename") + return {"ok": False, "error": "Invalid filename"} + + scratch = self._scratchpad_dir() + target = (scratch / safe_name).resolve() + if scratch.resolve() not in target.parents: + self.store._audit_event("write_shared_note", filename, "rejected_path") + return {"ok": False, "error": "Unsafe path"} + + lock_path = scratch / f".{safe_name}.lock" + ts = datetime.now(timezone.utc).isoformat() + header = f"\n## {ts} | {self._safe_slug(agent_id, 'agent')}\n" + payload = header + str(note).rstrip() + "\n" + + try: + with open(lock_path, "w", encoding="utf-8") as lf: + fcntl.flock(lf.fileno(), fcntl.LOCK_EX) + with open(target, "a", encoding="utf-8") as tf: + tf.write(payload) + tf.flush() + os.fsync(tf.fileno()) + os.chmod(target, 0o600) + fcntl.flock(lf.fileno(), fcntl.LOCK_UN) + self.store._audit_event("write_shared_note", safe_name, "success") + self.append_activity_log( + event_type="shared_note_written", + actor=agent_id, + details={"file": safe_name, "bytes": len(payload)}, + ) + return {"ok": True, "file": safe_name, "bytes_written": len(payload)} + except Exception as exc: + self.store._audit_event("write_shared_note", safe_name, f"failed_{exc.__class__.__name__}") + return {"ok": False, "error": str(exc)} + + def append_activity_log(self, *, event_type: str, actor: str, details: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + """Write append-only project activity log entry in JSONL format.""" + scratch = self._scratchpad_dir() + target = (scratch / "activity_log.jsonl").resolve() + lock_path = scratch / ".activity_log.jsonl.lock" + entry = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "event_type": str(event_type), + "actor": self._safe_slug(actor, "agent"), + "project_id": self._safe_slug(self.project_id, "none") if self.project_id else None, + "details": details or {}, + } + line = json.dumps(entry, ensure_ascii=False) + "\n" + try: + with open(lock_path, "w", encoding="utf-8") as lf: + fcntl.flock(lf.fileno(), fcntl.LOCK_EX) + with open(target, "a", encoding="utf-8") as tf: + tf.write(line) + tf.flush() + os.fsync(tf.fileno()) + os.chmod(target, 0o600) + fcntl.flock(lf.fileno(), fcntl.LOCK_UN) + return {"ok": True} + except Exception as exc: + logger.warning(f"Failed to append activity log: {exc}") + return {"ok": False, "error": str(exc)} + + def read_struct(self, filename: str, path_query: str) -> Dict[str, Any]: + """AST-style structural reader for JSON context files with dependency context injection.""" + resolved_kind, resolved = self._resolve_path(filename) + if resolved_kind == "virtual_summary" or not resolved: + return {"ok": False, "error": "Invalid file"} + + doc = self.store.load_context_document(resolved) + if not isinstance(doc, dict): + return {"ok": False, "error": "File not found"} + + try: + extracted = self._resolve_json_path(doc, path_query) + except Exception as exc: + return {"ok": False, "error": f"path_query resolution failed: {exc}"} + + # Lightweight dependency context: inject brand voice from step2 when reading persona structures. + dependency_context: Dict[str, Any] = {} + if "persona" in path_query.lower() or resolved == AgentFlatContextStore.STEP4_FILENAME: + step2 = self.store.load_step2_context_document() or {} + step2_data = step2.get("data") if isinstance(step2.get("data"), dict) else {} + brand = step2_data.get("brand_analysis") if isinstance(step2_data.get("brand_analysis"), dict) else {} + dependency_context["brand_voice"] = brand.get("brand_voice") + + return { + "ok": True, + "file": resolved, + "path_query": path_query, + "data": extracted, + "dependency_context": dependency_context, + "context": "Extracted via structural parse to save tokens.", + } + + + +def build_filesystem_header(user_id: str) -> str: + """Generate compact prompt header with available files and priority hints.""" + try: + store = AgentFlatContextStore(user_id) + manifest = store.load_context_manifest() or {"documents": []} + docs = manifest.get("documents") if isinstance(manifest.get("documents"), list) else [] + available = [str(d.get("path")) for d in docs if isinstance(d, dict) and d.get("path")] + files = ", ".join(sorted(available)) if available else "none" + return ( + "Workspace Context: You have access to a local flat-file store. " + f"Available Files: {files}. " + "Instructions: For style guidelines, prioritize step4_persona_data.json. " + "For technical site data, prioritize step2_website_analysis.json." + ) + except Exception as exc: + logger.warning(f"Failed to build filesystem header for user {user_id}: {exc}") + return "Workspace Context: local flat-file store unavailable." diff --git a/backend/services/intelligence/agent_flat_context.py b/backend/services/intelligence/agent_flat_context.py index ad81a91b..a42430d1 100644 --- a/backend/services/intelligence/agent_flat_context.py +++ b/backend/services/intelligence/agent_flat_context.py @@ -9,6 +9,8 @@ from __future__ import annotations import json import os import tempfile +import hmac +import hashlib from datetime import datetime from pathlib import Path from typing import Any, Dict, Optional, Tuple @@ -25,6 +27,14 @@ class AgentFlatContextStore: STEP4_FILENAME = "step4_persona_data.json" STEP5_FILENAME = "step5_integrations.json" MANIFEST_FILENAME = "context_manifest.json" + WORKSPACE_README = "README.md" + ALLOWED_CONTEXT_FILES = { + STEP2_FILENAME, + STEP3_FILENAME, + STEP4_FILENAME, + STEP5_FILENAME, + MANIFEST_FILENAME, + } SCHEMA_VERSION = "1.3" DEFAULT_MAX_BYTES = 300_000 @@ -33,12 +43,53 @@ class AgentFlatContextStore: def __init__(self, user_id: str): self.user_id = user_id self.safe_user_id = self._sanitize_user_id(user_id) + self._ensure_workspace_permissions() + + def _ensure_workspace_permissions(self) -> None: + """Ensure workspace and context directories exist with owner-only permissions.""" + workspace_dir = self._workspace_dir() + context_dir = workspace_dir / self.CONTEXT_DIRNAME + workspace_dir.mkdir(parents=True, exist_ok=True) + context_dir.mkdir(parents=True, exist_ok=True) + os.chmod(workspace_dir, 0o700) + os.chmod(context_dir, 0o700) + + @staticmethod + def _safe_resolve_under(base_dir: Path, requested_path: str) -> Path: + """Resolve path and ensure it remains inside base_dir (path sandboxing).""" + base_real = base_dir.resolve() + candidate = (base_dir / requested_path).resolve() + if candidate == base_real or base_real in candidate.parents: + return candidate + raise ValueError("Unsafe path access attempt outside sandbox") @staticmethod def _sanitize_user_id(user_id: str) -> str: safe = "".join(c for c in str(user_id) if c.isalnum() or c in ("-", "_")) return safe or "unknown_user" + def _master_salt(self) -> str: + return os.getenv("FILE_ENCRYPTION_SALT", "") + + def derive_user_secret(self) -> bytes: + """Derive deterministic per-user secret from env salt + safe user id.""" + salt = self._master_salt() + if not salt: + return b"" + return hmac.new(salt.encode("utf-8"), self.safe_user_id.encode("utf-8"), hashlib.sha256).digest() + + def user_secret_fingerprint(self) -> str: + """Short fingerprint used for diagnostics/audit only (not a key).""" + secret = self.derive_user_secret() + if not secret: + return "salt_not_configured" + return hashlib.sha256(secret).hexdigest()[:16] + + def _audit_event(self, action: str, target: str, status: str) -> None: + logger.info( + f"[flat_context_audit] user={self.safe_user_id} action={action} target={target} status={status}" + ) + def _workspace_dir(self) -> Path: root_dir = Path(__file__).resolve().parents[3] return root_dir / "workspace" / f"workspace_{self.safe_user_id}" @@ -47,7 +98,10 @@ class AgentFlatContextStore: return self._workspace_dir() / self.CONTEXT_DIRNAME def _context_file(self, filename: str) -> Path: - return self._context_dir() / filename + return self._safe_resolve_under(self._context_dir(), str(filename)) + + def _workspace_file(self, filename: str) -> Path: + return self._safe_resolve_under(self._workspace_dir(), str(filename)) @staticmethod def _estimate_size_bytes(value: Any) -> int: @@ -56,6 +110,10 @@ class AgentFlatContextStore: except Exception: return 0 + def estimate_size_bytes(self, value: Any) -> int: + """Public size estimate helper for adapter layers.""" + return self._estimate_size_bytes(value) + @staticmethod def _to_context_list(value: Any) -> Any: if value is None: @@ -143,6 +201,12 @@ class AgentFlatContextStore: "preferred": "flat_file", "fallback_order": fallback_order, }, + "security": { + "path_sandboxing": True, + "file_permissions": "0600", + "directory_permissions": "0700", + "user_secret_fingerprint": self.user_secret_fingerprint(), + }, "context_window_guidance": { "max_raw_bytes": self.DEFAULT_MAX_BYTES, "total_bytes": total_size, @@ -343,6 +407,7 @@ class AgentFlatContextStore: def _atomic_write_json(self, target_file: Path, data: Dict[str, Any]) -> None: target_file.parent.mkdir(parents=True, exist_ok=True) + os.chmod(target_file.parent, 0o700) fd, tmp_path = tempfile.mkstemp(dir=str(target_file.parent), prefix=f".{target_file.name}.", suffix=".tmp") try: with os.fdopen(fd, "w", encoding="utf-8") as f: @@ -361,6 +426,108 @@ class AgentFlatContextStore: pass raise + def _atomic_write_text(self, target_file: Path, content: str) -> None: + target_file.parent.mkdir(parents=True, exist_ok=True) + os.chmod(target_file.parent, 0o700) + fd, tmp_path = tempfile.mkstemp(dir=str(target_file.parent), prefix=f".{target_file.name}.", suffix=".tmp") + try: + with os.fdopen(fd, "w", encoding="utf-8") as f: + f.write(content) + f.flush() + os.fsync(f.fileno()) + os.replace(tmp_path, target_file) + try: + os.chmod(target_file, 0o600) + except Exception: + pass + except Exception: + try: + os.unlink(tmp_path) + except Exception: + pass + raise + + @staticmethod + def _collect_signal_terms(doc: Dict[str, Any], limit: int = 6) -> list: + summary = doc.get("agent_summary") if isinstance(doc, dict) else {} + hints = summary.get("retrieval_hints") if isinstance(summary, dict) else {} + terms = hints.get("high_signal_terms") if isinstance(hints, dict) else [] + if not isinstance(terms, list): + return [] + normalized = [str(t).strip() for t in terms if str(t).strip()] + return normalized[:limit] + + @staticmethod + def _extract_journey_stage(doc: Dict[str, Any]) -> str: + dctx = doc.get("document_context") if isinstance(doc, dict) else {} + journey = dctx.get("journey") if isinstance(dctx, dict) else {} + stage = journey.get("stage") if isinstance(journey, dict) else "" + return str(stage or "").strip() + + @staticmethod + def _context_description(filename: str) -> str: + descriptions = { + AgentFlatContextStore.STEP2_FILENAME: "Primary SEO and site structure context", + AgentFlatContextStore.STEP3_FILENAME: "Research depth, competitors, and content preferences", + AgentFlatContextStore.STEP4_FILENAME: "Persona profiles, voice adaptation, and platform strategy", + AgentFlatContextStore.STEP5_FILENAME: "Connected integrations and provider readiness", + } + return descriptions.get(filename, "Context document") + + def _generate_workspace_readme(self, manifest: Dict[str, Any]) -> str: + docs = manifest.get("documents") if isinstance(manifest, dict) and isinstance(manifest.get("documents"), list) else [] + + lines = [ + "# Agent Workspace Map", + "", + "You are in a restricted read-only VFS. Use `list_context`, `read_context_file`, and `search_context` to navigate.", + "", + "## Core Context Files", + ] + + for item in sorted(docs, key=lambda d: str((d or {}).get("path", ""))): + if not isinstance(item, dict): + continue + path = item.get("path") or "" + if not path: + continue + doc = self._load_context_document(path) or {} + signals = self._collect_signal_terms(doc) + journey_stage = self._extract_journey_stage(doc) + updated_at = str(item.get("updated_at") or "") + lines.append(f"- `{path}`: {self._context_description(path)}.") + if signals: + lines.append(f" - **Key Signals:** {', '.join(signals)}") + if journey_stage: + lines.append(f" - **Journey Stage:** {journey_stage}") + if updated_at: + lines.append(f" - **Updated:** {updated_at}") + + lines.extend( + [ + "", + "## Retrieval Strategy", + "1. Run `list_context` to check which onboarding steps are available.", + "2. Run `search_context` for targeted terms (for example: \"competitor\", \"tone\", \"integrations\").", + "3. Run `read_context_file` and ingest `agent_summary` before expanding full `data`.", + "", + "## Virtual Paths", + "- `/env/summary` -> consolidated summary generated from all available context docs", + f"- `/steps/website` -> `{self.STEP2_FILENAME}`", + f"- `/steps/research` -> `{self.STEP3_FILENAME}`", + f"- `/steps/persona` -> `{self.STEP4_FILENAME}`", + f"- `/steps/integrations` -> `{self.STEP5_FILENAME}`", + ] + ) + return "\n".join(lines) + "\n" + + def _update_workspace_readme(self, manifest: Dict[str, Any]) -> None: + try: + content = self._generate_workspace_readme(manifest) + self._atomic_write_text(self._workspace_file(self.WORKSPACE_README), content) + except Exception as exc: + logger.warning(f"Failed to update workspace README for user {self.user_id}: {exc}") + def _update_manifest(self, context_type: str, filename: str, doc: Dict[str, Any]) -> None: manifest_file = self._context_file(self.MANIFEST_FILENAME) existing = {} @@ -390,6 +557,7 @@ class AgentFlatContextStore: "documents": items, } self._atomic_write_json(manifest_file, manifest) + self._update_workspace_readme(manifest) def _save_context_document( self, @@ -436,9 +604,11 @@ class AgentFlatContextStore: self._atomic_write_json(target_file, context_doc) self._update_manifest(context_type, filename, context_doc) + self._audit_event("write_context", filename, "success") return True except Exception as exc: logger.error(f"Failed to save context for user {self.user_id} ({context_type}): {exc}") + self._audit_event("write_context", filename, "error") return False def save_step2_website_analysis(self, payload: Dict[str, Any], *, source: str = "onboarding_step2") -> bool: @@ -483,19 +653,31 @@ class AgentFlatContextStore: def _load_context_document(self, filename: str) -> Optional[Dict[str, Any]]: try: + if str(filename) not in self.ALLOWED_CONTEXT_FILES: + logger.warning(f"Rejected non-allowed context filename for user {self.user_id}: {filename}") + self._audit_event("read_context", str(filename), "rejected_filename") + return None target_file = self._context_file(filename) if not target_file.exists(): + self._audit_event("read_context", str(filename), "not_found") return None with open(target_file, "r", encoding="utf-8") as f: doc = json.load(f) if isinstance(doc, dict) and str(doc.get("user_id")) != str(self.user_id): logger.warning(f"Context user mismatch for {filename} (expected {self.user_id})") + self._audit_event("read_context", str(filename), "user_mismatch") return None + self._audit_event("read_context", str(filename), "success") return doc if isinstance(doc, dict) else None except Exception as exc: logger.warning(f"Failed to load context document for user {self.user_id} ({filename}): {exc}") + self._audit_event("read_context", str(filename), "error") return None + def load_context_document(self, filename: str) -> Optional[Dict[str, Any]]: + """Public loader for a named context document file.""" + return self._load_context_document(filename) + def load_context_manifest(self) -> Optional[Dict[str, Any]]: return self._load_context_document(self.MANIFEST_FILENAME) @@ -526,3 +708,35 @@ class AgentFlatContextStore: def load_step5_integrations(self) -> Optional[Dict[str, Any]]: doc = self.load_step5_context_document() return doc.get("data") if isinstance(doc, dict) and isinstance(doc.get("data"), dict) else None + + def generate_total_summary(self) -> Dict[str, Any]: + """Build a lightweight consolidated summary across available context documents.""" + manifest = self.load_context_manifest() or {"documents": []} + docs = manifest.get("documents") if isinstance(manifest.get("documents"), list) else [] + overview = [] + for item in docs: + if not isinstance(item, dict): + continue + path = str(item.get("path") or "") + if not path: + continue + doc = self._load_context_document(path) or {} + summary = doc.get("agent_summary") if isinstance(doc.get("agent_summary"), dict) else {} + quick_facts = summary.get("quick_facts") if isinstance(summary.get("quick_facts"), dict) else {} + hints = summary.get("retrieval_hints") if isinstance(summary.get("retrieval_hints"), dict) else {} + overview.append( + { + "path": path, + "context_type": doc.get("context_type"), + "updated_at": doc.get("updated_at") or item.get("updated_at"), + "journey_stage": self._extract_journey_stage(doc), + "high_signal_terms": hints.get("high_signal_terms") if isinstance(hints.get("high_signal_terms"), list) else [], + "quick_facts": quick_facts, + } + ) + return { + "user_id": str(self.user_id), + "generated_at": datetime.utcnow().isoformat(), + "document_count": len(overview), + "documents": overview, + } diff --git a/backend/services/intelligence/sif_integration.py b/backend/services/intelligence/sif_integration.py index bcb2d39e..ddab6781 100644 --- a/backend/services/intelligence/sif_integration.py +++ b/backend/services/intelligence/sif_integration.py @@ -340,6 +340,46 @@ class SIFIntegrationService: logger.warning(f"Failed to load flat context manifest for user {self.user_id}: {e}") return {"source": "none", "data": {"documents": []}} + async def get_merged_flat_context(self) -> Dict[str, Any]: + """Return merged onboarding context from all available flat context documents. + + This is an aggregation helper; step-specific APIs still return one-by-one files. + """ + store = AgentFlatContextStore(self.user_id) + manifest = store.load_context_manifest() or {"documents": []} + docs = manifest.get("documents") if isinstance(manifest.get("documents"), list) else [] + + merged: Dict[str, Any] = { + "source": "flat_file", + "user_id": self.user_id, + "manifest_updated_at": manifest.get("updated_at"), + "steps": {}, + "agent_summaries": {}, + "documents": [], + } + + for item in docs: + if not isinstance(item, dict): + continue + path = item.get("path") + if not path: + continue + doc = store.load_context_document(str(path)) or {} + context_type = str(doc.get("context_type") or item.get("type") or path) + merged["documents"].append( + { + "path": path, + "context_type": context_type, + "updated_at": doc.get("updated_at") or item.get("updated_at"), + "size_bytes": item.get("size_bytes"), + } + ) + merged["steps"][context_type] = doc.get("data") if isinstance(doc.get("data"), dict) else {} + merged["agent_summaries"][context_type] = doc.get("agent_summary") if isinstance(doc.get("agent_summary"), dict) else {} + + merged["document_count"] = len(merged["documents"]) + return merged + async def index_market_trends_run(self, trends_result: Dict[str, Any], run_id: str) -> bool: try: latest_id = f"market_trends_latest:{self.user_id}" diff --git a/backend/tests/test_agent_context_vfs.py b/backend/tests/test_agent_context_vfs.py new file mode 100644 index 00000000..333c14d6 --- /dev/null +++ b/backend/tests/test_agent_context_vfs.py @@ -0,0 +1,156 @@ +from __future__ import annotations + +import json +import sys +import types +import importlib.util +from pathlib import Path + +# Lightweight fallback for environments missing loguru. +if "loguru" not in sys.modules: + stub = types.ModuleType("loguru") + stub.logger = types.SimpleNamespace( + info=lambda *a, **k: None, + warning=lambda *a, **k: None, + error=lambda *a, **k: None, + debug=lambda *a, **k: None, + ) + sys.modules["loguru"] = stub + +def _load_module(name: str, rel_path: str): + base = Path(__file__).resolve().parents[1] + path = base / rel_path + spec = importlib.util.spec_from_file_location(name, path) + module = importlib.util.module_from_spec(spec) + assert spec and spec.loader + spec.loader.exec_module(module) + return module + + +flat_mod = _load_module("agent_flat_context_under_test", "services/intelligence/agent_flat_context.py") +sys.modules.setdefault("services.intelligence.agent_flat_context", flat_mod) +vfs_mod = _load_module("agent_context_vfs_under_test", "services/intelligence/agent_context_vfs.py") + +AgentFlatContextStore = flat_mod.AgentFlatContextStore +AgentContextVFS = vfs_mod.AgentContextVFS + + +def _cleanup_workspace(user_id: str, project_id: str | None = None) -> None: + safe_user = ''.join(c for c in str(user_id) if c.isalnum() or c in ('-', '_')) or 'unknown_user' + root = Path(__file__).resolve().parents[2] / 'workspace' + user_dir = root / f'workspace_{safe_user}' + if user_dir.exists(): + import shutil + shutil.rmtree(user_dir, ignore_errors=True) + + if project_id: + safe_project = ''.join(c for c in str(project_id) if c.isalnum() or c in ('-', '_')) or 'default_project' + project_dir = root / f'project_{safe_project}' + if project_dir.exists(): + import shutil + shutil.rmtree(project_dir, ignore_errors=True) + + +def test_search_context_query_variants_and_can_answer(): + user_id = 'pytest_vfs_user' + _cleanup_workspace(user_id) + + store = AgentFlatContextStore(user_id) + payload = { + 'website_url': 'https://example.com', + 'brand_analysis': {'brand_voice': 'Authoritative'}, + 'recommended_settings': {'writing_tone': 'Conversational'}, + 'content_type': {'primary_type': 'Blog'}, + 'target_audience': {'primary_audience': 'Founders'}, + } + assert store.save_step2_website_analysis(payload) + + vfs = AgentContextVFS(user_id) + result = vfs.search_context('tone') + + assert result['query'] == 'tone' + assert 'attempted_queries' in result + assert result['attempted_queries'][0] == 'tone' + assert result['can_answer'] is True + assert len(result['results']) >= 1 + assert 'triage_top5' in result + assert len(result['triage_top5']) >= 1 + assert 'low_probability' in result['results'][0] + + +def test_inspect_file_large_document_summary_plus_keys(): + user_id = 'pytest_vfs_large' + _cleanup_workspace(user_id) + + store = AgentFlatContextStore(user_id) + large_blob = 'x' * 9000 + payload = { + 'website_url': 'https://big.example.com', + 'brand_analysis': {'brand_voice': 'Bold'}, + 'recommended_settings': {'writing_tone': 'Direct'}, + 'target_audience': {'primary_audience': 'Teams'}, + 'crawl_result': {'raw': large_blob}, + } + assert store.save_step2_website_analysis(payload) + + vfs = AgentContextVFS(user_id) + out = vfs.inspect_file('step2_website_analysis.json') + + assert out['mode'] == 'summary_plus_keys' + assert 'agent_summary' in out + assert 'keys' in out + assert 'crawl_result' in out['keys'] + + +def test_write_shared_note_and_activity_log_created(): + user_id = 'pytest_collab_user' + project_id = 'proj_abc' + _cleanup_workspace(user_id, project_id) + + vfs = AgentContextVFS(user_id, project_id=project_id) + write_res = vfs.write_shared_note('Draft collaboration note', agent_id='agent_one') + + assert write_res['ok'] is True + assert write_res['file'] == 'collaboration.md' + + collab = vfs.list_context()['collaboration'] + scratchpad = Path(collab['scratchpad_dir']) + note_file = scratchpad / 'collaboration.md' + log_file = scratchpad / 'activity_log.jsonl' + + assert note_file.exists() + assert log_file.exists() + + content = note_file.read_text(encoding='utf-8') + assert 'agent_one' in content + assert 'Draft collaboration note' in content + + lines = [json.loads(l) for l in log_file.read_text(encoding='utf-8').splitlines() if l.strip()] + assert any(entry.get('event_type') == 'shared_note_written' for entry in lines) + + +def test_read_struct_path_resolution_and_dependency_context(): + user_id = 'pytest_struct_user' + _cleanup_workspace(user_id) + + store = AgentFlatContextStore(user_id) + assert store.save_step2_website_analysis( + { + 'website_url': 'https://struct.example.com', + 'brand_analysis': {'brand_voice': 'Pragmatic'}, + 'recommended_settings': {'writing_tone': 'Clear'}, + } + ) + assert store.save_step4_persona_data( + { + 'core_persona': {'name': 'Ops Leader', 'goal': 'Scale ops'}, + 'selected_platforms': ['linkedin'], + } + ) + + vfs = AgentContextVFS(user_id) + out = vfs.read_struct('step4_persona_data.json', 'data.core_persona.name') + + assert out['ok'] is True + assert out['data'] == 'Ops Leader' + assert out['dependency_context']['brand_voice'] == 'Pragmatic'