Merge remote-tracking branch 'origin/codex/review-flat-file-context-system-implementation'
This commit is contained in:
197
backend/docs/AGENT_FLAT_CONTEXT_REVIEW.md
Normal file
197
backend/docs/AGENT_FLAT_CONTEXT_REVIEW.md
Normal file
@@ -0,0 +1,197 @@
|
|||||||
|
# Agent Flat-File Context System Review
|
||||||
|
|
||||||
|
## Scope
|
||||||
|
This review documents the **current implementation** of ALwrity's onboarding flat-file context system and compares it to the proposed **Direct-to-File Virtual Shell (VFS)** model.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 1) Present Implementation (What Exists Today)
|
||||||
|
|
||||||
|
### 1.1 Storage model
|
||||||
|
- Context is stored per user under:
|
||||||
|
- `backend/workspace/workspace_<safe_user_id>/agent_context/`
|
||||||
|
- Files are JSON documents, one per onboarding domain:
|
||||||
|
- `step2_website_analysis.json`
|
||||||
|
- `step3_research_preferences.json`
|
||||||
|
- `step4_persona_data.json`
|
||||||
|
- `step5_integrations.json`
|
||||||
|
- `context_manifest.json`
|
||||||
|
|
||||||
|
### 1.2 Writer and reader
|
||||||
|
- `AgentFlatContextStore` is the core component that:
|
||||||
|
- sanitizes user IDs for path safety,
|
||||||
|
- writes documents atomically (`tempfile` + `os.replace`),
|
||||||
|
- sets restrictive file permissions (`0600` best effort),
|
||||||
|
- generates structured `agent_summary` objects,
|
||||||
|
- updates a manifest index of available documents.
|
||||||
|
- Data is loaded by direct file reads from the same class (`load_stepX_context_document`).
|
||||||
|
|
||||||
|
### 1.3 Read-path fallback chain
|
||||||
|
`SIFIntegrationService` uses a strict fallback sequence for onboarding context retrieval:
|
||||||
|
1. **flat file** (`AgentFlatContextStore`)
|
||||||
|
2. **database** (`WebsiteAnalysis`, `ResearchPreferences`, `PersonaData`, etc.)
|
||||||
|
3. **SIF semantic index** (`TxtaiIntelligenceService.search`)
|
||||||
|
|
||||||
|
Step 5 uses `flat_file -> sif_semantic`.
|
||||||
|
|
||||||
|
### 1.4 Producer flow (onboarding persistence)
|
||||||
|
`StepManagementService` persists canonical snapshots to flat context when onboarding steps are saved:
|
||||||
|
- Step 2 website analysis
|
||||||
|
- Step 3 research preferences (and later competitor-enriched refresh)
|
||||||
|
- Step 4 persona data
|
||||||
|
- Step 5 integrations
|
||||||
|
|
||||||
|
### 1.5 Context optimization currently implemented
|
||||||
|
- Sensitive-key redaction in nested payloads (`api_key`, `token`, `secret`, etc.).
|
||||||
|
- Size budgeting with trimming (`DEFAULT_MAX_BYTES = 300_000`) and trim metadata.
|
||||||
|
- Generated summaries include:
|
||||||
|
- quick facts,
|
||||||
|
- retrieval hints (high-signal terms and suggested agent queries),
|
||||||
|
- domain-specific focus blocks.
|
||||||
|
- Document context includes audience, retrieval contract, journey stage, related documents, and context-window guidance.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 2) Comparison vs Proposed Direct-to-File VFS
|
||||||
|
|
||||||
|
## Strong alignment
|
||||||
|
The current system already matches the proposal in important ways:
|
||||||
|
- **Direct-to-file persistence** instead of DB-backed retrieval for fast reads.
|
||||||
|
- **Manifest/index concept** (`context_manifest.json`) that can act like a precomputed path map.
|
||||||
|
- **Agent-first retrieval semantics** (summary-first contract and fallback policy).
|
||||||
|
- **Operational safety controls** (atomic writes, redaction, path sanitization).
|
||||||
|
|
||||||
|
## Gaps vs full virtual shell abstraction
|
||||||
|
The following pieces are not fully implemented as described in your proposed architecture:
|
||||||
|
- No explicit **virtual shell provider** (`IFileSystem`) exposing `ls/cat/grep/find` commands.
|
||||||
|
- No always-live, process-level **in-memory `Map<virtualPath, absolutePath>`** for path lookups.
|
||||||
|
- No native glob/query command layer for agent shell UX.
|
||||||
|
- Not currently **read-only enforced at API surface** (writes are intentionally allowed by onboarding services to refresh context).
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 3) Practical Recommendation: Incremental VFS Evolution
|
||||||
|
|
||||||
|
1. **Introduce a read-only VFS facade for agents**
|
||||||
|
- Keep `AgentFlatContextStore` as the write path for trusted onboarding services.
|
||||||
|
- Add `AgentContextVFS` read adapter exposing:
|
||||||
|
- `ls(path)` from manifest,
|
||||||
|
- `cat(path)` mapped to underlying JSON,
|
||||||
|
- `find(glob)` on virtual keys,
|
||||||
|
- `grep(query)` with path prefilter + stream scan.
|
||||||
|
|
||||||
|
2. **Promote manifest to a first-class path map**
|
||||||
|
- Build and cache an in-memory map on service startup or first access.
|
||||||
|
- Refresh map when manifest `updated_at` changes.
|
||||||
|
|
||||||
|
3. **Add explicit write policy boundaries**
|
||||||
|
- Agent-facing interface: hard read-only (`EROFS`).
|
||||||
|
- Internal system service interface: allow writes for onboarding synchronization.
|
||||||
|
|
||||||
|
4. **Metadata strategy for grep ranking**
|
||||||
|
- Prioritize in order:
|
||||||
|
1) `agent_summary.quick_facts`
|
||||||
|
2) `agent_summary.retrieval_hints.high_signal_terms`
|
||||||
|
3) `document_context.context_type` and `journey.stage`
|
||||||
|
4) full `data` body
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 4) Response to the Metadata Header Question
|
||||||
|
|
||||||
|
> "Does your current `.txt` optimization include specific metadata headers (like YAML frontmatter) that the grep tool should prioritize?"
|
||||||
|
|
||||||
|
For this implementation, context is currently persisted as structured JSON (not `.txt` with YAML frontmatter). Equivalent high-value metadata already exists and should be prioritized for search/ranking:
|
||||||
|
- `context_type`
|
||||||
|
- `updated_at`
|
||||||
|
- `agent_summary.quick_facts`
|
||||||
|
- `agent_summary.retrieval_hints.high_signal_terms`
|
||||||
|
- `document_context.journey.stage`
|
||||||
|
- `document_context.related_documents`
|
||||||
|
|
||||||
|
If you later move to `.txt` transport files, mirror these as frontmatter fields to preserve retrieval quality.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 5) Bottom line
|
||||||
|
Your current onboarding flat-file context implementation is already a strong "shim" architecture and close to the proposed model. The biggest missing piece is a dedicated virtual-shell read interface (`ls/cat/grep/find`) backed by a persistent path-map cache and a clear read-only contract for agent execution contexts.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 6) Implemented Follow-up (VFS Adapter + Workspace Guide)
|
||||||
|
|
||||||
|
The following enhancements are now implemented:
|
||||||
|
|
||||||
|
1. **Auto-generated workspace map**
|
||||||
|
- The system now generates `workspace_<user>/README.md` whenever `context_manifest.json` is updated.
|
||||||
|
- The README includes:
|
||||||
|
- available context files,
|
||||||
|
- key signal hints from `agent_summary.retrieval_hints.high_signal_terms`,
|
||||||
|
- journey-stage hints,
|
||||||
|
- virtual path mappings and retrieval strategy guidance.
|
||||||
|
|
||||||
|
2. **Read-only VFS facade**
|
||||||
|
- Added `AgentContextVFS` with:
|
||||||
|
- `list_context()` (`ls` equivalent),
|
||||||
|
- `search_context()` (`grep` equivalent; prioritizes `high_signal_terms` and `quick_facts`),
|
||||||
|
- `read_context_file()` (`cat` equivalent; large-file summary mode + subkey drilldown),
|
||||||
|
- explicit write rejection (`EROFS`).
|
||||||
|
|
||||||
|
3. **Virtual path support**
|
||||||
|
- `/env/summary` maps to `AgentFlatContextStore.generate_total_summary()`.
|
||||||
|
- `/steps/website`, `/steps/research`, `/steps/persona`, `/steps/integrations` map to step documents.
|
||||||
|
|
||||||
|
4. **System-prompt helper**
|
||||||
|
- Added `build_filesystem_header(user_id)` to inject a compact file availability + priority hint block into agent startup prompts.
|
||||||
|
|
||||||
|
5. **Merged context helper in SIF integration**
|
||||||
|
- `SIFIntegrationService.get_merged_flat_context()` now provides a unified view across all available flat files while preserving existing per-step retrieval methods.
|
||||||
|
|
||||||
|
6. **Basic file-level security hardening**
|
||||||
|
- Workspace and context directories are now explicitly forced to `0700`.
|
||||||
|
- Context and workspace files are written with strict `0600`.
|
||||||
|
- Added path sandboxing to ensure requested paths cannot escape user workspace roots.
|
||||||
|
- Restricted context-file loading to an allowlist of known onboarding context documents.
|
||||||
|
- Added deterministic per-user secret derivation from `.env` (`FILE_ENCRYPTION_SALT` + `safe_user_id`) with non-sensitive fingerprints for audit/debug and future encryption-at-rest rollout.
|
||||||
|
|
||||||
|
7. **Tool-logic enhancement (coarse-to-fine search)**
|
||||||
|
- `search_context` now performs a two-pass retrieval:
|
||||||
|
1) high-relevance summary match pass (`high_signal_terms`, `quick_facts`),
|
||||||
|
2) parallelized stream scan pass over sandboxed allowlisted files for supporting details.
|
||||||
|
- Results include relevance labels, snippets, and line numbers for body matches.
|
||||||
|
- Large-result behavior now reports truncation guidance (show top 10 and suggest narrower keywords).
|
||||||
|
- `inspect_file` now provides token-saving behavior: full return for small files, or `agent_summary` + top-level keys for larger files, with key-level zoom-in support.
|
||||||
|
|
||||||
|
8. **Retrieval robustness roadmap (next hardening phase)**
|
||||||
|
- **Query normalization:** Add synonym expansion and typo-tolerant matching (e.g., `tone` ≈ `brand voice`) before coarse/fine passes.
|
||||||
|
- **Confidence scoring:** Return confidence tiers that blend source freshness (`updated_at`), summary-match strength, and match density.
|
||||||
|
- **Field-aware boosting:** Weight matches by field priority (`high_signal_terms` > `quick_facts` > `data`) and document recency.
|
||||||
|
- **Deduplicated evidence:** Collapse repeated hits from the same file/key into one clustered result with a single best snippet and hit count.
|
||||||
|
- **Fallback query reformulation:** If zero hits, automatically retry with narrow/expanded variants and return attempted queries.
|
||||||
|
- **Answerability contract:** Add a lightweight `can_answer` signal in search responses so orchestrators can decide whether to ask follow-up questions or fetch more context.
|
||||||
|
- **Evaluation harness:** Track retrieval metrics over golden queries (`precision@k`, `MRR`, zero-hit rate, stale-hit rate) in CI to prevent relevance regressions.
|
||||||
|
|
||||||
|
9. **Collaborative VFS namespace (shared memory mode)**
|
||||||
|
- Added optional `project_id` support to `AgentContextVFS` with isolated root: `workspace/project_<project_id>/`.
|
||||||
|
- Introduced `scratchpad/` for collaborative writes while keeping onboarding `agent_context` read-first.
|
||||||
|
- Added `write_shared_note(...)` with advisory locking (`flock`) and strict filename/path validation.
|
||||||
|
- Added append-only `activity_log.jsonl` via `append_activity_log(...)` for watchdog/event-driven coordination.
|
||||||
|
- Maintains owner-only permissions (`0700` scratchpad dir, `0600` files) and audit trails for shared writes.
|
||||||
|
|
||||||
|
10. **Testing readiness upgrades**
|
||||||
|
- Added automated tests for:
|
||||||
|
- query reformulation + `can_answer` behavior in `search_context`,
|
||||||
|
- large-file progressive disclosure behavior in `inspect_file`,
|
||||||
|
- collaborative write path (`write_shared_note`) and append-only activity logging.
|
||||||
|
- Test module: `backend/tests/test_agent_context_vfs.py`.
|
||||||
|
- These tests provide a baseline regression harness for VFS retrieval quality and shared-memory safety.
|
||||||
|
|
||||||
|
11. **Static + Structural retrieval hardening**
|
||||||
|
- Added a **static triage layer** in `search_context`:
|
||||||
|
- keyword-density scoring,
|
||||||
|
- `low_probability` flags for likely-noisy hits,
|
||||||
|
- `triage_top5` shortlist for router-style pre-filtering.
|
||||||
|
- Added `read_struct(filename, path_query)`:
|
||||||
|
- resolves dot/bracket JSON paths to return node-level data only,
|
||||||
|
- includes lightweight dependency injection (e.g., Step 4 persona reads include Step 2 brand voice context when available),
|
||||||
|
- keeps output token-efficient for downstream agents.
|
||||||
745
backend/services/intelligence/agent_context_vfs.py
Normal file
745
backend/services/intelligence/agent_context_vfs.py
Normal file
@@ -0,0 +1,745 @@
|
|||||||
|
"""Read-only virtual filesystem facade for agent flat context documents.
|
||||||
|
|
||||||
|
This adapter provides shell-like primitives (`list_context`, `search_context`,
|
||||||
|
`read_context_file`) over the JSON documents managed by AgentFlatContextStore.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
import os
|
||||||
|
import fcntl
|
||||||
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
|
from collections import deque
|
||||||
|
from fnmatch import fnmatch
|
||||||
|
from pathlib import Path
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Any, Dict, List, Optional, Tuple
|
||||||
|
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
from services.intelligence.agent_flat_context import AgentFlatContextStore
|
||||||
|
|
||||||
|
|
||||||
|
class SmartGrepEngine:
|
||||||
|
"""Streaming grep engine with regex fallback and contextual snippets."""
|
||||||
|
|
||||||
|
def __init__(self, context_window: int = 1):
|
||||||
|
self.context_window = max(0, int(context_window))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _compile_pattern(pattern: str) -> re.Pattern:
|
||||||
|
try:
|
||||||
|
return re.compile(pattern, re.IGNORECASE)
|
||||||
|
except re.error:
|
||||||
|
return re.compile(re.escape(pattern), re.IGNORECASE)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _truncate(text: str, limit: int = 180) -> str:
|
||||||
|
text = " ".join(text.split())
|
||||||
|
if len(text) <= limit:
|
||||||
|
return text
|
||||||
|
return text[:limit] + "..."
|
||||||
|
|
||||||
|
def stream_file(self, file_path: Path, pattern: str, *, path_label: str) -> List[Dict[str, Any]]:
|
||||||
|
regex = self._compile_pattern(pattern)
|
||||||
|
matches: List[Dict[str, Any]] = []
|
||||||
|
prev = deque(maxlen=self.context_window)
|
||||||
|
active: List[Dict[str, Any]] = []
|
||||||
|
|
||||||
|
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
|
||||||
|
for line_no, line in enumerate(f, start=1):
|
||||||
|
# Fill trailing context for active matches.
|
||||||
|
for item in active:
|
||||||
|
if item["remaining_after"] > 0:
|
||||||
|
item["after"].append(line.rstrip("\n"))
|
||||||
|
item["remaining_after"] -= 1
|
||||||
|
|
||||||
|
# Detect a new match on current line.
|
||||||
|
if regex.search(line):
|
||||||
|
current = line.rstrip("\n")
|
||||||
|
record = {
|
||||||
|
"path": path_label,
|
||||||
|
"line": line_no,
|
||||||
|
"before": list(prev),
|
||||||
|
"match_line": current,
|
||||||
|
"after": [],
|
||||||
|
"remaining_after": self.context_window,
|
||||||
|
}
|
||||||
|
active.append(record)
|
||||||
|
matches.append(record)
|
||||||
|
|
||||||
|
prev.append(line.rstrip("\n"))
|
||||||
|
|
||||||
|
formatted: List[Dict[str, Any]] = []
|
||||||
|
for m in matches:
|
||||||
|
snippet_parts = [*m["before"], m["match_line"], *m["after"]]
|
||||||
|
snippet = self._truncate(" | ".join([p for p in snippet_parts if p is not None]))
|
||||||
|
line_l = m["match_line"].lower()
|
||||||
|
is_high_signal = any(k in line_l for k in ("agent_summary", "high_signal_terms", "quick_facts"))
|
||||||
|
formatted.append(
|
||||||
|
{
|
||||||
|
"path": m["path"],
|
||||||
|
"line": m["line"],
|
||||||
|
"snippet": snippet,
|
||||||
|
"relevance": "High Relevance" if is_high_signal else "Supporting Detail",
|
||||||
|
"reason": "matched summary field in stream" if is_high_signal else "matched streamed body line",
|
||||||
|
"score": 70 if is_high_signal else 50,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
return formatted
|
||||||
|
|
||||||
|
|
||||||
|
class AgentContextVFS:
|
||||||
|
"""Read-only adapter that maps virtual paths to flat context documents."""
|
||||||
|
|
||||||
|
VIRTUAL_MAP = {
|
||||||
|
"/steps/website": AgentFlatContextStore.STEP2_FILENAME,
|
||||||
|
"/steps/research": AgentFlatContextStore.STEP3_FILENAME,
|
||||||
|
"/steps/persona": AgentFlatContextStore.STEP4_FILENAME,
|
||||||
|
"/steps/integrations": AgentFlatContextStore.STEP5_FILENAME,
|
||||||
|
}
|
||||||
|
HIGH_SIGNAL_MARKERS = ("agent_summary", "high_signal_terms", "quick_facts", "context_type")
|
||||||
|
|
||||||
|
def __init__(self, user_id: str, project_id: Optional[str] = None):
|
||||||
|
self.user_id = user_id
|
||||||
|
self.project_id = project_id
|
||||||
|
self.store = AgentFlatContextStore(user_id)
|
||||||
|
self.grep_engine = SmartGrepEngine(context_window=1)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _safe_slug(value: Optional[str], fallback: str) -> str:
|
||||||
|
raw = str(value or "").strip()
|
||||||
|
safe = "".join(c for c in raw if c.isalnum() or c in ("-", "_"))
|
||||||
|
return safe or fallback
|
||||||
|
|
||||||
|
def _manifest_docs(self) -> List[Dict[str, Any]]:
|
||||||
|
manifest = self.store.load_context_manifest() or {"documents": []}
|
||||||
|
docs = manifest.get("documents")
|
||||||
|
return docs if isinstance(docs, list) else []
|
||||||
|
|
||||||
|
def _workspace_root(self) -> Path:
|
||||||
|
if self.project_id:
|
||||||
|
root_dir = Path(__file__).resolve().parents[3]
|
||||||
|
safe_project = self._safe_slug(self.project_id, "default_project")
|
||||||
|
project_root = root_dir / "workspace" / f"project_{safe_project}"
|
||||||
|
project_root.mkdir(parents=True, exist_ok=True)
|
||||||
|
os.chmod(project_root, 0o700)
|
||||||
|
return project_root
|
||||||
|
return self.store._workspace_dir()
|
||||||
|
|
||||||
|
def _scratchpad_dir(self) -> Path:
|
||||||
|
scratch = self._workspace_root() / "scratchpad"
|
||||||
|
scratch.mkdir(parents=True, exist_ok=True)
|
||||||
|
os.chmod(scratch, 0o700)
|
||||||
|
return scratch
|
||||||
|
|
||||||
|
def _allowlisted_workspace_files(self) -> List[Path]:
|
||||||
|
"""Return sandboxed files eligible for streaming search."""
|
||||||
|
files: List[Path] = []
|
||||||
|
workspace = self._workspace_root()
|
||||||
|
context_dir = self.store._context_dir()
|
||||||
|
|
||||||
|
# 1) manifest-backed onboarding context files
|
||||||
|
for item in self._manifest_docs():
|
||||||
|
if not isinstance(item, dict):
|
||||||
|
continue
|
||||||
|
rel = str(item.get("path") or "")
|
||||||
|
if not rel:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
candidate = self.store._safe_resolve_under(context_dir, rel)
|
||||||
|
if candidate.exists() and candidate.is_file():
|
||||||
|
files.append(candidate)
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 2) workspace text artifacts (README, operator notes, etc.)
|
||||||
|
for candidate in workspace.glob("*.txt"):
|
||||||
|
if candidate.is_file():
|
||||||
|
files.append(candidate.resolve())
|
||||||
|
readme = workspace / "README.md"
|
||||||
|
if readme.exists() and readme.is_file():
|
||||||
|
files.append(readme.resolve())
|
||||||
|
|
||||||
|
# dedupe
|
||||||
|
seen = set()
|
||||||
|
unique: List[Path] = []
|
||||||
|
for p in files:
|
||||||
|
rp = str(p)
|
||||||
|
if rp in seen:
|
||||||
|
continue
|
||||||
|
seen.add(rp)
|
||||||
|
unique.append(p)
|
||||||
|
return unique
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _query_variants(query: str) -> List[str]:
|
||||||
|
"""Generate normalized and synonym-expanded query variants."""
|
||||||
|
base = (query or "").strip().lower()
|
||||||
|
if not base:
|
||||||
|
return []
|
||||||
|
synonyms = {
|
||||||
|
"tone": ["brand voice", "writing tone"],
|
||||||
|
"voice": ["brand voice", "writing style"],
|
||||||
|
"competitor": ["competition", "rival"],
|
||||||
|
"seo": ["search", "metadata"],
|
||||||
|
"persona": ["audience profile", "target audience"],
|
||||||
|
}
|
||||||
|
variants = [base]
|
||||||
|
tokens = base.split()
|
||||||
|
for idx, tok in enumerate(tokens):
|
||||||
|
if tok in synonyms:
|
||||||
|
for repl in synonyms[tok]:
|
||||||
|
new_tokens = tokens.copy()
|
||||||
|
new_tokens[idx] = repl
|
||||||
|
variants.append(" ".join(new_tokens))
|
||||||
|
variants.extend([base.replace("-", " "), base.replace("_", " ")])
|
||||||
|
# dedupe, preserve order
|
||||||
|
seen = set()
|
||||||
|
out: List[str] = []
|
||||||
|
for v in variants:
|
||||||
|
vv = v.strip()
|
||||||
|
if not vv or vv in seen:
|
||||||
|
continue
|
||||||
|
seen.add(vv)
|
||||||
|
out.append(vv)
|
||||||
|
return out
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _freshness_score(updated_at: Optional[str]) -> float:
|
||||||
|
if not updated_at:
|
||||||
|
return 0.3
|
||||||
|
try:
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
ts = datetime.fromisoformat(str(updated_at).replace("Z", "+00:00"))
|
||||||
|
if ts.tzinfo is None:
|
||||||
|
ts = ts.replace(tzinfo=timezone.utc)
|
||||||
|
days = max(0.0, (datetime.now(timezone.utc) - ts).total_seconds() / 86400.0)
|
||||||
|
if days <= 1:
|
||||||
|
return 1.0
|
||||||
|
if days <= 7:
|
||||||
|
return 0.9
|
||||||
|
if days <= 30:
|
||||||
|
return 0.75
|
||||||
|
if days <= 90:
|
||||||
|
return 0.6
|
||||||
|
return 0.4
|
||||||
|
except Exception:
|
||||||
|
return 0.3
|
||||||
|
|
||||||
|
def _cluster_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||||
|
"""Deduplicate repeated hits by file + reason and keep strongest evidence."""
|
||||||
|
buckets: Dict[Tuple[str, str], Dict[str, Any]] = {}
|
||||||
|
for r in results:
|
||||||
|
path = str(r.get("path") or "")
|
||||||
|
reason = str(r.get("reason") or "")
|
||||||
|
key = (path, reason)
|
||||||
|
existing = buckets.get(key)
|
||||||
|
if not existing:
|
||||||
|
buckets[key] = {**r, "hit_count": 1}
|
||||||
|
continue
|
||||||
|
existing["hit_count"] = int(existing.get("hit_count", 1)) + 1
|
||||||
|
if int(r.get("score", 0)) > int(existing.get("score", 0)):
|
||||||
|
existing.update({k: v for k, v in r.items() if k != "hit_count"})
|
||||||
|
existing["hit_count"] = int(existing.get("hit_count", 1))
|
||||||
|
clustered = list(buckets.values())
|
||||||
|
clustered.sort(key=lambda r: (-int(r.get("score", 0)), str(r.get("path") or "")))
|
||||||
|
return clustered
|
||||||
|
|
||||||
|
def _keyword_density(self, snippet: str, query: str) -> float:
|
||||||
|
if not snippet or not query:
|
||||||
|
return 0.0
|
||||||
|
query_tokens = [t for t in query.lower().split() if t]
|
||||||
|
if not query_tokens:
|
||||||
|
return 0.0
|
||||||
|
text = snippet.lower()
|
||||||
|
hits = sum(text.count(tok) for tok in query_tokens)
|
||||||
|
words = max(1, len(text.split()))
|
||||||
|
return hits / words
|
||||||
|
|
||||||
|
def _static_triage(self, results: List[Dict[str, Any]], query: str) -> List[Dict[str, Any]]:
|
||||||
|
"""Semgrep-style static heuristic triage before main agent consumption."""
|
||||||
|
triaged: List[Dict[str, Any]] = []
|
||||||
|
for r in results:
|
||||||
|
snippet = str(r.get("snippet") or "")
|
||||||
|
density = self._keyword_density(snippet, query)
|
||||||
|
marker_hit = any(marker in snippet.lower() for marker in self.HIGH_SIGNAL_MARKERS)
|
||||||
|
low_probability = bool(density < 0.01 and not marker_hit)
|
||||||
|
item = dict(r)
|
||||||
|
item["keyword_density"] = round(density, 4)
|
||||||
|
item["low_probability"] = low_probability
|
||||||
|
triaged.append(item)
|
||||||
|
triaged.sort(
|
||||||
|
key=lambda x: (
|
||||||
|
bool(x.get("low_probability")),
|
||||||
|
-float(x.get("confidence", 0)),
|
||||||
|
-int(x.get("score", 0)),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return triaged
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _llm_router_stub(results: List[Dict[str, Any]], top_k: int = 5) -> List[Dict[str, Any]]:
|
||||||
|
"""Fast local triage stub (drop low-probability first; keep strongest candidates)."""
|
||||||
|
ranked = sorted(
|
||||||
|
results,
|
||||||
|
key=lambda x: (
|
||||||
|
bool(x.get("low_probability")),
|
||||||
|
-float(x.get("confidence", 0)),
|
||||||
|
-int(x.get("score", 0)),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
return ranked[: max(1, top_k)]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _resolve_json_path(data: Any, path_query: str) -> Any:
|
||||||
|
"""Resolve dot/bracket JSON path such as 'data.seo_audit.recommendations[0]'."""
|
||||||
|
if not path_query:
|
||||||
|
return data
|
||||||
|
|
||||||
|
current = data
|
||||||
|
query = path_query.strip()
|
||||||
|
parts: List[str] = []
|
||||||
|
buf = ""
|
||||||
|
in_brackets = False
|
||||||
|
for ch in query:
|
||||||
|
if ch == "." and not in_brackets:
|
||||||
|
if buf:
|
||||||
|
parts.append(buf)
|
||||||
|
buf = ""
|
||||||
|
continue
|
||||||
|
if ch == "[":
|
||||||
|
in_brackets = True
|
||||||
|
elif ch == "]":
|
||||||
|
in_brackets = False
|
||||||
|
buf += ch
|
||||||
|
if buf:
|
||||||
|
parts.append(buf)
|
||||||
|
|
||||||
|
for part in parts:
|
||||||
|
if "[" in part and part.endswith("]"):
|
||||||
|
key, idx_raw = part.split("[", 1)
|
||||||
|
idx = int(idx_raw[:-1])
|
||||||
|
if key:
|
||||||
|
if not isinstance(current, dict):
|
||||||
|
raise KeyError(key)
|
||||||
|
current = current[key]
|
||||||
|
if not isinstance(current, list):
|
||||||
|
raise IndexError(idx)
|
||||||
|
current = current[idx]
|
||||||
|
else:
|
||||||
|
if not isinstance(current, dict):
|
||||||
|
raise KeyError(part)
|
||||||
|
current = current[part]
|
||||||
|
return current
|
||||||
|
|
||||||
|
def _resolve_path(self, path: str) -> Tuple[str, Optional[str]]:
|
||||||
|
normalized = (path or "").strip()
|
||||||
|
if not normalized:
|
||||||
|
return "", None
|
||||||
|
if normalized == "/env/summary":
|
||||||
|
return "virtual_summary", None
|
||||||
|
if normalized in self.VIRTUAL_MAP:
|
||||||
|
return "file", self.VIRTUAL_MAP[normalized]
|
||||||
|
if ".." in normalized or "\\" in normalized:
|
||||||
|
return "", None
|
||||||
|
if normalized.startswith("/"):
|
||||||
|
candidate = normalized.rsplit("/", 1)[-1]
|
||||||
|
else:
|
||||||
|
candidate = normalized
|
||||||
|
if "/" in candidate:
|
||||||
|
return "", None
|
||||||
|
allowed = AgentFlatContextStore.ALLOWED_CONTEXT_FILES - {AgentFlatContextStore.MANIFEST_FILENAME}
|
||||||
|
if candidate not in allowed:
|
||||||
|
return "", None
|
||||||
|
return "file", candidate
|
||||||
|
|
||||||
|
def list_context(self) -> Dict[str, Any]:
|
||||||
|
"""List available context files (ls-equivalent)."""
|
||||||
|
docs = self._manifest_docs()
|
||||||
|
items = []
|
||||||
|
for d in docs:
|
||||||
|
if not isinstance(d, dict):
|
||||||
|
continue
|
||||||
|
items.append(
|
||||||
|
{
|
||||||
|
"path": d.get("path"),
|
||||||
|
"type": d.get("type"),
|
||||||
|
"updated_at": d.get("updated_at"),
|
||||||
|
"size_bytes": d.get("size_bytes", 0),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
items.sort(key=lambda x: str(x.get("path") or ""))
|
||||||
|
result = {
|
||||||
|
"workspace_hint": "Use this list to see which onboarding steps are complete.",
|
||||||
|
"tip": "Use `search_context` to find specific keywords across all steps.",
|
||||||
|
"virtual_paths": ["/env/summary", *sorted(self.VIRTUAL_MAP.keys())],
|
||||||
|
"files": items,
|
||||||
|
"collaboration": {
|
||||||
|
"scratchpad_dir": str(self._scratchpad_dir()),
|
||||||
|
"activity_log": "scratchpad/activity_log.jsonl",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
logger.info(f"[vfs_audit] user={self.store.safe_user_id} action=list_context files={len(items)}")
|
||||||
|
return result
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _flatten_strings(data: Any, limit: int = 2000) -> str:
|
||||||
|
pieces: List[str] = []
|
||||||
|
|
||||||
|
def walk(v: Any) -> None:
|
||||||
|
if len(pieces) >= limit:
|
||||||
|
return
|
||||||
|
if isinstance(v, dict):
|
||||||
|
for key, value in v.items():
|
||||||
|
pieces.append(str(key))
|
||||||
|
walk(value)
|
||||||
|
elif isinstance(v, list):
|
||||||
|
for item in v:
|
||||||
|
walk(item)
|
||||||
|
elif isinstance(v, (str, int, float, bool)):
|
||||||
|
pieces.append(str(v))
|
||||||
|
|
||||||
|
walk(data)
|
||||||
|
return " ".join(pieces)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _extract_search_fields(doc: Dict[str, Any]) -> Tuple[List[str], Dict[str, Any], str]:
|
||||||
|
summary = doc.get("agent_summary") if isinstance(doc.get("agent_summary"), dict) else {}
|
||||||
|
hints = summary.get("retrieval_hints") if isinstance(summary.get("retrieval_hints"), dict) else {}
|
||||||
|
quick_facts = summary.get("quick_facts") if isinstance(summary.get("quick_facts"), dict) else {}
|
||||||
|
high_terms = hints.get("high_signal_terms") if isinstance(hints.get("high_signal_terms"), list) else []
|
||||||
|
body = AgentContextVFS._flatten_strings(doc.get("data") if isinstance(doc.get("data"), dict) else {})
|
||||||
|
return [str(t).lower() for t in high_terms], quick_facts, body.lower()
|
||||||
|
|
||||||
|
def search_context(self, query: str, *, limit: int = 10, path_glob: Optional[str] = None) -> Dict[str, Any]:
|
||||||
|
"""Smart grep with coarse-to-fine ranking and parallel stream scans."""
|
||||||
|
normalized = (query or "").strip()
|
||||||
|
if not normalized:
|
||||||
|
return {"query": query, "results": []}
|
||||||
|
self.store._audit_event("vfs_search", normalized, "started")
|
||||||
|
try:
|
||||||
|
variants = self._query_variants(normalized)
|
||||||
|
attempted_queries: List[str] = []
|
||||||
|
scored: List[Dict[str, Any]] = []
|
||||||
|
|
||||||
|
for candidate_query in variants:
|
||||||
|
attempted_queries.append(candidate_query)
|
||||||
|
needle = candidate_query.lower()
|
||||||
|
|
||||||
|
# Pass 1: summary-first ranking (high relevance)
|
||||||
|
docs = self._manifest_docs()
|
||||||
|
variant_scored: List[Dict[str, Any]] = []
|
||||||
|
for item in docs:
|
||||||
|
if not isinstance(item, dict):
|
||||||
|
continue
|
||||||
|
path = str(item.get("path") or "")
|
||||||
|
if not path:
|
||||||
|
continue
|
||||||
|
if path_glob and not fnmatch(path, path_glob):
|
||||||
|
continue
|
||||||
|
doc = self.store.load_context_document(path) or {}
|
||||||
|
high_terms, quick_facts, _ = self._extract_search_fields(doc)
|
||||||
|
|
||||||
|
high_match = any(needle in term for term in high_terms)
|
||||||
|
quick_match = any(needle in str(v).lower() for v in quick_facts.values()) if isinstance(quick_facts, dict) else False
|
||||||
|
if not (high_match or quick_match):
|
||||||
|
continue
|
||||||
|
|
||||||
|
score = 100 if high_match else 80
|
||||||
|
reason = "matched high_signal_terms" if high_match else "matched quick_facts"
|
||||||
|
variant_scored.append(
|
||||||
|
{
|
||||||
|
"path": path,
|
||||||
|
"line": None,
|
||||||
|
"snippet": f"{reason}: {candidate_query}"[:100],
|
||||||
|
"type": item.get("type"),
|
||||||
|
"updated_at": item.get("updated_at"),
|
||||||
|
"relevance": "High Relevance",
|
||||||
|
"reason": reason,
|
||||||
|
"score": score,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Pass 2: parallelized stream scan over allowlisted workspace files.
|
||||||
|
allowlisted = self._allowlisted_workspace_files()
|
||||||
|
body_matches: List[Dict[str, Any]] = []
|
||||||
|
if allowlisted:
|
||||||
|
with ThreadPoolExecutor(max_workers=min(8, max(1, len(allowlisted)))) as pool:
|
||||||
|
future_map = {}
|
||||||
|
for p in allowlisted:
|
||||||
|
path_label = p.name
|
||||||
|
if path_glob and not fnmatch(path_label, path_glob):
|
||||||
|
continue
|
||||||
|
future = pool.submit(self.grep_engine.stream_file, p, candidate_query, path_label=path_label)
|
||||||
|
future_map[future] = path_label
|
||||||
|
|
||||||
|
for future in as_completed(future_map):
|
||||||
|
try:
|
||||||
|
body_matches.extend(future.result() or [])
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
|
||||||
|
variant_scored.extend(body_matches)
|
||||||
|
if variant_scored:
|
||||||
|
scored = variant_scored
|
||||||
|
break
|
||||||
|
|
||||||
|
scored = self._cluster_results(scored)
|
||||||
|
|
||||||
|
# Add confidence based on score + freshness + hit density.
|
||||||
|
for r in scored:
|
||||||
|
base = min(1.0, max(0.0, float(r.get("score", 0)) / 100.0))
|
||||||
|
freshness = self._freshness_score(r.get("updated_at"))
|
||||||
|
density = min(1.0, 0.2 + (int(r.get("hit_count", 1)) * 0.1))
|
||||||
|
confidence = round((base * 0.6) + (freshness * 0.25) + (density * 0.15), 3)
|
||||||
|
r["confidence"] = confidence
|
||||||
|
|
||||||
|
scored.sort(key=lambda r: (-int(r.get("score", 0)), str(r.get("path") or "")))
|
||||||
|
matched_files = sorted({str(r.get("path") or "") for r in scored if r.get("path")})
|
||||||
|
capped_results = scored[: max(1, limit)]
|
||||||
|
notice = None
|
||||||
|
if len(matched_files) > 10:
|
||||||
|
notice = f"Found {len(matched_files)} matches. Showing top 10. Use a more specific keyword to narrow down."
|
||||||
|
capped_results = scored[:10]
|
||||||
|
|
||||||
|
# Token/length budgeting (~2000 tokens ~= ~8000 chars).
|
||||||
|
budget_chars = 8000
|
||||||
|
bounded_results = []
|
||||||
|
used = 0
|
||||||
|
for r in capped_results:
|
||||||
|
snippet = str(r.get("snippet") or "")
|
||||||
|
cost = len(snippet) + 120 # account for metadata fields
|
||||||
|
if bounded_results and used + cost > budget_chars:
|
||||||
|
break
|
||||||
|
bounded_results.append(r)
|
||||||
|
used += cost
|
||||||
|
|
||||||
|
result = {
|
||||||
|
"query": normalized,
|
||||||
|
"attempted_queries": attempted_queries,
|
||||||
|
"matched_files_count": len(matched_files),
|
||||||
|
"results": self._static_triage(bounded_results, normalized),
|
||||||
|
"notice": notice,
|
||||||
|
"char_budget_used": used,
|
||||||
|
"can_answer": bool(bounded_results),
|
||||||
|
}
|
||||||
|
result["triage_top5"] = self._llm_router_stub(result["results"], top_k=5)
|
||||||
|
logger.info(
|
||||||
|
f"[vfs_audit] user={self.store.safe_user_id} action=search_context query={normalized!r} results={len(result['results'])}"
|
||||||
|
)
|
||||||
|
self.store._audit_event("vfs_search", normalized, f"success_{len(result['results'])}_hits")
|
||||||
|
return result
|
||||||
|
except Exception as exc:
|
||||||
|
self.store._audit_event("vfs_search", normalized, f"failed_{exc.__class__.__name__}")
|
||||||
|
return {"query": normalized, "matched_files_count": 0, "results": [], "notice": "Search failed.", "can_answer": False}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _strip_technical_metadata(doc: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
|
sanitized = {
|
||||||
|
"context_type": doc.get("context_type"),
|
||||||
|
"updated_at": doc.get("updated_at"),
|
||||||
|
"journey": ((doc.get("document_context") or {}).get("journey") or {}) if isinstance(doc.get("document_context"), dict) else {},
|
||||||
|
"agent_summary": doc.get("agent_summary") if isinstance(doc.get("agent_summary"), dict) else {},
|
||||||
|
"data": doc.get("data") if isinstance(doc.get("data"), dict) else {},
|
||||||
|
}
|
||||||
|
return sanitized
|
||||||
|
|
||||||
|
def inspect_file(self, path: str, *, key: Optional[str] = None, small_file_bytes: int = 5 * 1024) -> Dict[str, Any]:
|
||||||
|
"""Smart reader (cat/head equivalent) with summary-first behavior."""
|
||||||
|
kind, resolved = self._resolve_path(path)
|
||||||
|
if kind == "virtual_summary":
|
||||||
|
result = {
|
||||||
|
"path": "/env/summary",
|
||||||
|
"mode": "summary",
|
||||||
|
"data": self.store.generate_total_summary(),
|
||||||
|
}
|
||||||
|
logger.info(f"[vfs_audit] user={self.store.safe_user_id} action=read_context_file path=/env/summary mode=summary")
|
||||||
|
return result
|
||||||
|
|
||||||
|
if not resolved:
|
||||||
|
logger.info(f"[vfs_audit] user={self.store.safe_user_id} action=read_context_file path={path!r} status=rejected")
|
||||||
|
return {"error": "File not found", "path": path}
|
||||||
|
|
||||||
|
# JSON context doc path
|
||||||
|
doc = self.store.load_context_document(resolved)
|
||||||
|
if doc:
|
||||||
|
view = self._strip_technical_metadata(doc)
|
||||||
|
data = view.get("data") if isinstance(view.get("data"), dict) else {}
|
||||||
|
raw_size = self.store.estimate_size_bytes(view)
|
||||||
|
|
||||||
|
if key:
|
||||||
|
if key in data:
|
||||||
|
result = {
|
||||||
|
"path": resolved,
|
||||||
|
"mode": "key",
|
||||||
|
"key": key,
|
||||||
|
"agent_summary": view.get("agent_summary"),
|
||||||
|
"data": data.get(key),
|
||||||
|
}
|
||||||
|
logger.info(f"[vfs_audit] user={self.store.safe_user_id} action=inspect_file path={resolved} mode=key")
|
||||||
|
return result
|
||||||
|
logger.info(
|
||||||
|
f"[vfs_audit] user={self.store.safe_user_id} action=inspect_file path={resolved} mode=key_missing key={key}"
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"path": resolved,
|
||||||
|
"mode": "key_missing",
|
||||||
|
"key": key,
|
||||||
|
"available_keys": sorted(list(data.keys())),
|
||||||
|
"message": "Requested key not found. Choose one of available_keys.",
|
||||||
|
}
|
||||||
|
|
||||||
|
if raw_size <= small_file_bytes:
|
||||||
|
result = {
|
||||||
|
"path": resolved,
|
||||||
|
"mode": "full",
|
||||||
|
"data": view,
|
||||||
|
}
|
||||||
|
logger.info(f"[vfs_audit] user={self.store.safe_user_id} action=inspect_file path={resolved} mode=full")
|
||||||
|
return result
|
||||||
|
|
||||||
|
result = {
|
||||||
|
"path": resolved,
|
||||||
|
"mode": "summary_plus_keys",
|
||||||
|
"size_bytes": raw_size,
|
||||||
|
"agent_summary": view.get("agent_summary"),
|
||||||
|
"keys": sorted(list(data.keys())),
|
||||||
|
"message": "File is large. Re-run with key to inspect a specific section.",
|
||||||
|
}
|
||||||
|
logger.info(f"[vfs_audit] user={self.store.safe_user_id} action=inspect_file path={resolved} mode=summary_plus_keys")
|
||||||
|
return result
|
||||||
|
|
||||||
|
logger.info(f"[vfs_audit] user={self.store.safe_user_id} action=inspect_file path={resolved} status=not_found")
|
||||||
|
return {"error": "File not found", "path": path, "resolved": resolved}
|
||||||
|
|
||||||
|
def read_context_file(self, path: str, *, subkey: Optional[str] = None) -> Dict[str, Any]:
|
||||||
|
"""Backward-compatible alias for inspect_file."""
|
||||||
|
return self.inspect_file(path, key=subkey)
|
||||||
|
|
||||||
|
def write_context_file(self, *_args: Any, **_kwargs: Any) -> None:
|
||||||
|
"""Disallow writes from the agent-facing VFS."""
|
||||||
|
raise OSError("EROFS: read-only file system")
|
||||||
|
|
||||||
|
# Backward-compat function name requested in design docs.
|
||||||
|
inspect = inspect_file
|
||||||
|
|
||||||
|
def write_shared_note(self, note: str, *, agent_id: str = "agent", filename: str = "collaboration.md") -> Dict[str, Any]:
|
||||||
|
"""Append a shared project note with advisory locking in scratchpad."""
|
||||||
|
safe_name = Path(filename).name
|
||||||
|
if safe_name != filename or ".." in filename or "/" in filename or "\\" in filename:
|
||||||
|
self.store._audit_event("write_shared_note", filename, "rejected_filename")
|
||||||
|
return {"ok": False, "error": "Invalid filename"}
|
||||||
|
|
||||||
|
scratch = self._scratchpad_dir()
|
||||||
|
target = (scratch / safe_name).resolve()
|
||||||
|
if scratch.resolve() not in target.parents:
|
||||||
|
self.store._audit_event("write_shared_note", filename, "rejected_path")
|
||||||
|
return {"ok": False, "error": "Unsafe path"}
|
||||||
|
|
||||||
|
lock_path = scratch / f".{safe_name}.lock"
|
||||||
|
ts = datetime.now(timezone.utc).isoformat()
|
||||||
|
header = f"\n## {ts} | {self._safe_slug(agent_id, 'agent')}\n"
|
||||||
|
payload = header + str(note).rstrip() + "\n"
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(lock_path, "w", encoding="utf-8") as lf:
|
||||||
|
fcntl.flock(lf.fileno(), fcntl.LOCK_EX)
|
||||||
|
with open(target, "a", encoding="utf-8") as tf:
|
||||||
|
tf.write(payload)
|
||||||
|
tf.flush()
|
||||||
|
os.fsync(tf.fileno())
|
||||||
|
os.chmod(target, 0o600)
|
||||||
|
fcntl.flock(lf.fileno(), fcntl.LOCK_UN)
|
||||||
|
self.store._audit_event("write_shared_note", safe_name, "success")
|
||||||
|
self.append_activity_log(
|
||||||
|
event_type="shared_note_written",
|
||||||
|
actor=agent_id,
|
||||||
|
details={"file": safe_name, "bytes": len(payload)},
|
||||||
|
)
|
||||||
|
return {"ok": True, "file": safe_name, "bytes_written": len(payload)}
|
||||||
|
except Exception as exc:
|
||||||
|
self.store._audit_event("write_shared_note", safe_name, f"failed_{exc.__class__.__name__}")
|
||||||
|
return {"ok": False, "error": str(exc)}
|
||||||
|
|
||||||
|
def append_activity_log(self, *, event_type: str, actor: str, details: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
||||||
|
"""Write append-only project activity log entry in JSONL format."""
|
||||||
|
scratch = self._scratchpad_dir()
|
||||||
|
target = (scratch / "activity_log.jsonl").resolve()
|
||||||
|
lock_path = scratch / ".activity_log.jsonl.lock"
|
||||||
|
entry = {
|
||||||
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||||
|
"event_type": str(event_type),
|
||||||
|
"actor": self._safe_slug(actor, "agent"),
|
||||||
|
"project_id": self._safe_slug(self.project_id, "none") if self.project_id else None,
|
||||||
|
"details": details or {},
|
||||||
|
}
|
||||||
|
line = json.dumps(entry, ensure_ascii=False) + "\n"
|
||||||
|
try:
|
||||||
|
with open(lock_path, "w", encoding="utf-8") as lf:
|
||||||
|
fcntl.flock(lf.fileno(), fcntl.LOCK_EX)
|
||||||
|
with open(target, "a", encoding="utf-8") as tf:
|
||||||
|
tf.write(line)
|
||||||
|
tf.flush()
|
||||||
|
os.fsync(tf.fileno())
|
||||||
|
os.chmod(target, 0o600)
|
||||||
|
fcntl.flock(lf.fileno(), fcntl.LOCK_UN)
|
||||||
|
return {"ok": True}
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning(f"Failed to append activity log: {exc}")
|
||||||
|
return {"ok": False, "error": str(exc)}
|
||||||
|
|
||||||
|
def read_struct(self, filename: str, path_query: str) -> Dict[str, Any]:
|
||||||
|
"""AST-style structural reader for JSON context files with dependency context injection."""
|
||||||
|
resolved_kind, resolved = self._resolve_path(filename)
|
||||||
|
if resolved_kind == "virtual_summary" or not resolved:
|
||||||
|
return {"ok": False, "error": "Invalid file"}
|
||||||
|
|
||||||
|
doc = self.store.load_context_document(resolved)
|
||||||
|
if not isinstance(doc, dict):
|
||||||
|
return {"ok": False, "error": "File not found"}
|
||||||
|
|
||||||
|
try:
|
||||||
|
extracted = self._resolve_json_path(doc, path_query)
|
||||||
|
except Exception as exc:
|
||||||
|
return {"ok": False, "error": f"path_query resolution failed: {exc}"}
|
||||||
|
|
||||||
|
# Lightweight dependency context: inject brand voice from step2 when reading persona structures.
|
||||||
|
dependency_context: Dict[str, Any] = {}
|
||||||
|
if "persona" in path_query.lower() or resolved == AgentFlatContextStore.STEP4_FILENAME:
|
||||||
|
step2 = self.store.load_step2_context_document() or {}
|
||||||
|
step2_data = step2.get("data") if isinstance(step2.get("data"), dict) else {}
|
||||||
|
brand = step2_data.get("brand_analysis") if isinstance(step2_data.get("brand_analysis"), dict) else {}
|
||||||
|
dependency_context["brand_voice"] = brand.get("brand_voice")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"ok": True,
|
||||||
|
"file": resolved,
|
||||||
|
"path_query": path_query,
|
||||||
|
"data": extracted,
|
||||||
|
"dependency_context": dependency_context,
|
||||||
|
"context": "Extracted via structural parse to save tokens.",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def build_filesystem_header(user_id: str) -> str:
|
||||||
|
"""Generate compact prompt header with available files and priority hints."""
|
||||||
|
try:
|
||||||
|
store = AgentFlatContextStore(user_id)
|
||||||
|
manifest = store.load_context_manifest() or {"documents": []}
|
||||||
|
docs = manifest.get("documents") if isinstance(manifest.get("documents"), list) else []
|
||||||
|
available = [str(d.get("path")) for d in docs if isinstance(d, dict) and d.get("path")]
|
||||||
|
files = ", ".join(sorted(available)) if available else "none"
|
||||||
|
return (
|
||||||
|
"Workspace Context: You have access to a local flat-file store. "
|
||||||
|
f"Available Files: {files}. "
|
||||||
|
"Instructions: For style guidelines, prioritize step4_persona_data.json. "
|
||||||
|
"For technical site data, prioritize step2_website_analysis.json."
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning(f"Failed to build filesystem header for user {user_id}: {exc}")
|
||||||
|
return "Workspace Context: local flat-file store unavailable."
|
||||||
@@ -9,6 +9,8 @@ from __future__ import annotations
|
|||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import tempfile
|
import tempfile
|
||||||
|
import hmac
|
||||||
|
import hashlib
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, Dict, Optional, Tuple
|
from typing import Any, Dict, Optional, Tuple
|
||||||
@@ -25,6 +27,14 @@ class AgentFlatContextStore:
|
|||||||
STEP4_FILENAME = "step4_persona_data.json"
|
STEP4_FILENAME = "step4_persona_data.json"
|
||||||
STEP5_FILENAME = "step5_integrations.json"
|
STEP5_FILENAME = "step5_integrations.json"
|
||||||
MANIFEST_FILENAME = "context_manifest.json"
|
MANIFEST_FILENAME = "context_manifest.json"
|
||||||
|
WORKSPACE_README = "README.md"
|
||||||
|
ALLOWED_CONTEXT_FILES = {
|
||||||
|
STEP2_FILENAME,
|
||||||
|
STEP3_FILENAME,
|
||||||
|
STEP4_FILENAME,
|
||||||
|
STEP5_FILENAME,
|
||||||
|
MANIFEST_FILENAME,
|
||||||
|
}
|
||||||
|
|
||||||
SCHEMA_VERSION = "1.3"
|
SCHEMA_VERSION = "1.3"
|
||||||
DEFAULT_MAX_BYTES = 300_000
|
DEFAULT_MAX_BYTES = 300_000
|
||||||
@@ -33,12 +43,53 @@ class AgentFlatContextStore:
|
|||||||
def __init__(self, user_id: str):
|
def __init__(self, user_id: str):
|
||||||
self.user_id = user_id
|
self.user_id = user_id
|
||||||
self.safe_user_id = self._sanitize_user_id(user_id)
|
self.safe_user_id = self._sanitize_user_id(user_id)
|
||||||
|
self._ensure_workspace_permissions()
|
||||||
|
|
||||||
|
def _ensure_workspace_permissions(self) -> None:
|
||||||
|
"""Ensure workspace and context directories exist with owner-only permissions."""
|
||||||
|
workspace_dir = self._workspace_dir()
|
||||||
|
context_dir = workspace_dir / self.CONTEXT_DIRNAME
|
||||||
|
workspace_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
context_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
os.chmod(workspace_dir, 0o700)
|
||||||
|
os.chmod(context_dir, 0o700)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _safe_resolve_under(base_dir: Path, requested_path: str) -> Path:
|
||||||
|
"""Resolve path and ensure it remains inside base_dir (path sandboxing)."""
|
||||||
|
base_real = base_dir.resolve()
|
||||||
|
candidate = (base_dir / requested_path).resolve()
|
||||||
|
if candidate == base_real or base_real in candidate.parents:
|
||||||
|
return candidate
|
||||||
|
raise ValueError("Unsafe path access attempt outside sandbox")
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _sanitize_user_id(user_id: str) -> str:
|
def _sanitize_user_id(user_id: str) -> str:
|
||||||
safe = "".join(c for c in str(user_id) if c.isalnum() or c in ("-", "_"))
|
safe = "".join(c for c in str(user_id) if c.isalnum() or c in ("-", "_"))
|
||||||
return safe or "unknown_user"
|
return safe or "unknown_user"
|
||||||
|
|
||||||
|
def _master_salt(self) -> str:
|
||||||
|
return os.getenv("FILE_ENCRYPTION_SALT", "")
|
||||||
|
|
||||||
|
def derive_user_secret(self) -> bytes:
|
||||||
|
"""Derive deterministic per-user secret from env salt + safe user id."""
|
||||||
|
salt = self._master_salt()
|
||||||
|
if not salt:
|
||||||
|
return b""
|
||||||
|
return hmac.new(salt.encode("utf-8"), self.safe_user_id.encode("utf-8"), hashlib.sha256).digest()
|
||||||
|
|
||||||
|
def user_secret_fingerprint(self) -> str:
|
||||||
|
"""Short fingerprint used for diagnostics/audit only (not a key)."""
|
||||||
|
secret = self.derive_user_secret()
|
||||||
|
if not secret:
|
||||||
|
return "salt_not_configured"
|
||||||
|
return hashlib.sha256(secret).hexdigest()[:16]
|
||||||
|
|
||||||
|
def _audit_event(self, action: str, target: str, status: str) -> None:
|
||||||
|
logger.info(
|
||||||
|
f"[flat_context_audit] user={self.safe_user_id} action={action} target={target} status={status}"
|
||||||
|
)
|
||||||
|
|
||||||
def _workspace_dir(self) -> Path:
|
def _workspace_dir(self) -> Path:
|
||||||
root_dir = Path(__file__).resolve().parents[3]
|
root_dir = Path(__file__).resolve().parents[3]
|
||||||
return root_dir / "workspace" / f"workspace_{self.safe_user_id}"
|
return root_dir / "workspace" / f"workspace_{self.safe_user_id}"
|
||||||
@@ -47,7 +98,10 @@ class AgentFlatContextStore:
|
|||||||
return self._workspace_dir() / self.CONTEXT_DIRNAME
|
return self._workspace_dir() / self.CONTEXT_DIRNAME
|
||||||
|
|
||||||
def _context_file(self, filename: str) -> Path:
|
def _context_file(self, filename: str) -> Path:
|
||||||
return self._context_dir() / filename
|
return self._safe_resolve_under(self._context_dir(), str(filename))
|
||||||
|
|
||||||
|
def _workspace_file(self, filename: str) -> Path:
|
||||||
|
return self._safe_resolve_under(self._workspace_dir(), str(filename))
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _estimate_size_bytes(value: Any) -> int:
|
def _estimate_size_bytes(value: Any) -> int:
|
||||||
@@ -56,6 +110,10 @@ class AgentFlatContextStore:
|
|||||||
except Exception:
|
except Exception:
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
def estimate_size_bytes(self, value: Any) -> int:
|
||||||
|
"""Public size estimate helper for adapter layers."""
|
||||||
|
return self._estimate_size_bytes(value)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _to_context_list(value: Any) -> Any:
|
def _to_context_list(value: Any) -> Any:
|
||||||
if value is None:
|
if value is None:
|
||||||
@@ -143,6 +201,12 @@ class AgentFlatContextStore:
|
|||||||
"preferred": "flat_file",
|
"preferred": "flat_file",
|
||||||
"fallback_order": fallback_order,
|
"fallback_order": fallback_order,
|
||||||
},
|
},
|
||||||
|
"security": {
|
||||||
|
"path_sandboxing": True,
|
||||||
|
"file_permissions": "0600",
|
||||||
|
"directory_permissions": "0700",
|
||||||
|
"user_secret_fingerprint": self.user_secret_fingerprint(),
|
||||||
|
},
|
||||||
"context_window_guidance": {
|
"context_window_guidance": {
|
||||||
"max_raw_bytes": self.DEFAULT_MAX_BYTES,
|
"max_raw_bytes": self.DEFAULT_MAX_BYTES,
|
||||||
"total_bytes": total_size,
|
"total_bytes": total_size,
|
||||||
@@ -343,6 +407,7 @@ class AgentFlatContextStore:
|
|||||||
|
|
||||||
def _atomic_write_json(self, target_file: Path, data: Dict[str, Any]) -> None:
|
def _atomic_write_json(self, target_file: Path, data: Dict[str, Any]) -> None:
|
||||||
target_file.parent.mkdir(parents=True, exist_ok=True)
|
target_file.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
os.chmod(target_file.parent, 0o700)
|
||||||
fd, tmp_path = tempfile.mkstemp(dir=str(target_file.parent), prefix=f".{target_file.name}.", suffix=".tmp")
|
fd, tmp_path = tempfile.mkstemp(dir=str(target_file.parent), prefix=f".{target_file.name}.", suffix=".tmp")
|
||||||
try:
|
try:
|
||||||
with os.fdopen(fd, "w", encoding="utf-8") as f:
|
with os.fdopen(fd, "w", encoding="utf-8") as f:
|
||||||
@@ -361,6 +426,108 @@ class AgentFlatContextStore:
|
|||||||
pass
|
pass
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
def _atomic_write_text(self, target_file: Path, content: str) -> None:
|
||||||
|
target_file.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
os.chmod(target_file.parent, 0o700)
|
||||||
|
fd, tmp_path = tempfile.mkstemp(dir=str(target_file.parent), prefix=f".{target_file.name}.", suffix=".tmp")
|
||||||
|
try:
|
||||||
|
with os.fdopen(fd, "w", encoding="utf-8") as f:
|
||||||
|
f.write(content)
|
||||||
|
f.flush()
|
||||||
|
os.fsync(f.fileno())
|
||||||
|
os.replace(tmp_path, target_file)
|
||||||
|
try:
|
||||||
|
os.chmod(target_file, 0o600)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
except Exception:
|
||||||
|
try:
|
||||||
|
os.unlink(tmp_path)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
raise
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _collect_signal_terms(doc: Dict[str, Any], limit: int = 6) -> list:
|
||||||
|
summary = doc.get("agent_summary") if isinstance(doc, dict) else {}
|
||||||
|
hints = summary.get("retrieval_hints") if isinstance(summary, dict) else {}
|
||||||
|
terms = hints.get("high_signal_terms") if isinstance(hints, dict) else []
|
||||||
|
if not isinstance(terms, list):
|
||||||
|
return []
|
||||||
|
normalized = [str(t).strip() for t in terms if str(t).strip()]
|
||||||
|
return normalized[:limit]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _extract_journey_stage(doc: Dict[str, Any]) -> str:
|
||||||
|
dctx = doc.get("document_context") if isinstance(doc, dict) else {}
|
||||||
|
journey = dctx.get("journey") if isinstance(dctx, dict) else {}
|
||||||
|
stage = journey.get("stage") if isinstance(journey, dict) else ""
|
||||||
|
return str(stage or "").strip()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _context_description(filename: str) -> str:
|
||||||
|
descriptions = {
|
||||||
|
AgentFlatContextStore.STEP2_FILENAME: "Primary SEO and site structure context",
|
||||||
|
AgentFlatContextStore.STEP3_FILENAME: "Research depth, competitors, and content preferences",
|
||||||
|
AgentFlatContextStore.STEP4_FILENAME: "Persona profiles, voice adaptation, and platform strategy",
|
||||||
|
AgentFlatContextStore.STEP5_FILENAME: "Connected integrations and provider readiness",
|
||||||
|
}
|
||||||
|
return descriptions.get(filename, "Context document")
|
||||||
|
|
||||||
|
def _generate_workspace_readme(self, manifest: Dict[str, Any]) -> str:
|
||||||
|
docs = manifest.get("documents") if isinstance(manifest, dict) and isinstance(manifest.get("documents"), list) else []
|
||||||
|
|
||||||
|
lines = [
|
||||||
|
"# Agent Workspace Map",
|
||||||
|
"",
|
||||||
|
"You are in a restricted read-only VFS. Use `list_context`, `read_context_file`, and `search_context` to navigate.",
|
||||||
|
"",
|
||||||
|
"## Core Context Files",
|
||||||
|
]
|
||||||
|
|
||||||
|
for item in sorted(docs, key=lambda d: str((d or {}).get("path", ""))):
|
||||||
|
if not isinstance(item, dict):
|
||||||
|
continue
|
||||||
|
path = item.get("path") or ""
|
||||||
|
if not path:
|
||||||
|
continue
|
||||||
|
doc = self._load_context_document(path) or {}
|
||||||
|
signals = self._collect_signal_terms(doc)
|
||||||
|
journey_stage = self._extract_journey_stage(doc)
|
||||||
|
updated_at = str(item.get("updated_at") or "")
|
||||||
|
lines.append(f"- `{path}`: {self._context_description(path)}.")
|
||||||
|
if signals:
|
||||||
|
lines.append(f" - **Key Signals:** {', '.join(signals)}")
|
||||||
|
if journey_stage:
|
||||||
|
lines.append(f" - **Journey Stage:** {journey_stage}")
|
||||||
|
if updated_at:
|
||||||
|
lines.append(f" - **Updated:** {updated_at}")
|
||||||
|
|
||||||
|
lines.extend(
|
||||||
|
[
|
||||||
|
"",
|
||||||
|
"## Retrieval Strategy",
|
||||||
|
"1. Run `list_context` to check which onboarding steps are available.",
|
||||||
|
"2. Run `search_context` for targeted terms (for example: \"competitor\", \"tone\", \"integrations\").",
|
||||||
|
"3. Run `read_context_file` and ingest `agent_summary` before expanding full `data`.",
|
||||||
|
"",
|
||||||
|
"## Virtual Paths",
|
||||||
|
"- `/env/summary` -> consolidated summary generated from all available context docs",
|
||||||
|
f"- `/steps/website` -> `{self.STEP2_FILENAME}`",
|
||||||
|
f"- `/steps/research` -> `{self.STEP3_FILENAME}`",
|
||||||
|
f"- `/steps/persona` -> `{self.STEP4_FILENAME}`",
|
||||||
|
f"- `/steps/integrations` -> `{self.STEP5_FILENAME}`",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
return "\n".join(lines) + "\n"
|
||||||
|
|
||||||
|
def _update_workspace_readme(self, manifest: Dict[str, Any]) -> None:
|
||||||
|
try:
|
||||||
|
content = self._generate_workspace_readme(manifest)
|
||||||
|
self._atomic_write_text(self._workspace_file(self.WORKSPACE_README), content)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning(f"Failed to update workspace README for user {self.user_id}: {exc}")
|
||||||
|
|
||||||
def _update_manifest(self, context_type: str, filename: str, doc: Dict[str, Any]) -> None:
|
def _update_manifest(self, context_type: str, filename: str, doc: Dict[str, Any]) -> None:
|
||||||
manifest_file = self._context_file(self.MANIFEST_FILENAME)
|
manifest_file = self._context_file(self.MANIFEST_FILENAME)
|
||||||
existing = {}
|
existing = {}
|
||||||
@@ -390,6 +557,7 @@ class AgentFlatContextStore:
|
|||||||
"documents": items,
|
"documents": items,
|
||||||
}
|
}
|
||||||
self._atomic_write_json(manifest_file, manifest)
|
self._atomic_write_json(manifest_file, manifest)
|
||||||
|
self._update_workspace_readme(manifest)
|
||||||
|
|
||||||
def _save_context_document(
|
def _save_context_document(
|
||||||
self,
|
self,
|
||||||
@@ -436,9 +604,11 @@ class AgentFlatContextStore:
|
|||||||
|
|
||||||
self._atomic_write_json(target_file, context_doc)
|
self._atomic_write_json(target_file, context_doc)
|
||||||
self._update_manifest(context_type, filename, context_doc)
|
self._update_manifest(context_type, filename, context_doc)
|
||||||
|
self._audit_event("write_context", filename, "success")
|
||||||
return True
|
return True
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.error(f"Failed to save context for user {self.user_id} ({context_type}): {exc}")
|
logger.error(f"Failed to save context for user {self.user_id} ({context_type}): {exc}")
|
||||||
|
self._audit_event("write_context", filename, "error")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def save_step2_website_analysis(self, payload: Dict[str, Any], *, source: str = "onboarding_step2") -> bool:
|
def save_step2_website_analysis(self, payload: Dict[str, Any], *, source: str = "onboarding_step2") -> bool:
|
||||||
@@ -483,19 +653,31 @@ class AgentFlatContextStore:
|
|||||||
|
|
||||||
def _load_context_document(self, filename: str) -> Optional[Dict[str, Any]]:
|
def _load_context_document(self, filename: str) -> Optional[Dict[str, Any]]:
|
||||||
try:
|
try:
|
||||||
|
if str(filename) not in self.ALLOWED_CONTEXT_FILES:
|
||||||
|
logger.warning(f"Rejected non-allowed context filename for user {self.user_id}: {filename}")
|
||||||
|
self._audit_event("read_context", str(filename), "rejected_filename")
|
||||||
|
return None
|
||||||
target_file = self._context_file(filename)
|
target_file = self._context_file(filename)
|
||||||
if not target_file.exists():
|
if not target_file.exists():
|
||||||
|
self._audit_event("read_context", str(filename), "not_found")
|
||||||
return None
|
return None
|
||||||
with open(target_file, "r", encoding="utf-8") as f:
|
with open(target_file, "r", encoding="utf-8") as f:
|
||||||
doc = json.load(f)
|
doc = json.load(f)
|
||||||
if isinstance(doc, dict) and str(doc.get("user_id")) != str(self.user_id):
|
if isinstance(doc, dict) and str(doc.get("user_id")) != str(self.user_id):
|
||||||
logger.warning(f"Context user mismatch for {filename} (expected {self.user_id})")
|
logger.warning(f"Context user mismatch for {filename} (expected {self.user_id})")
|
||||||
|
self._audit_event("read_context", str(filename), "user_mismatch")
|
||||||
return None
|
return None
|
||||||
|
self._audit_event("read_context", str(filename), "success")
|
||||||
return doc if isinstance(doc, dict) else None
|
return doc if isinstance(doc, dict) else None
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.warning(f"Failed to load context document for user {self.user_id} ({filename}): {exc}")
|
logger.warning(f"Failed to load context document for user {self.user_id} ({filename}): {exc}")
|
||||||
|
self._audit_event("read_context", str(filename), "error")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def load_context_document(self, filename: str) -> Optional[Dict[str, Any]]:
|
||||||
|
"""Public loader for a named context document file."""
|
||||||
|
return self._load_context_document(filename)
|
||||||
|
|
||||||
def load_context_manifest(self) -> Optional[Dict[str, Any]]:
|
def load_context_manifest(self) -> Optional[Dict[str, Any]]:
|
||||||
return self._load_context_document(self.MANIFEST_FILENAME)
|
return self._load_context_document(self.MANIFEST_FILENAME)
|
||||||
|
|
||||||
@@ -526,3 +708,35 @@ class AgentFlatContextStore:
|
|||||||
def load_step5_integrations(self) -> Optional[Dict[str, Any]]:
|
def load_step5_integrations(self) -> Optional[Dict[str, Any]]:
|
||||||
doc = self.load_step5_context_document()
|
doc = self.load_step5_context_document()
|
||||||
return doc.get("data") if isinstance(doc, dict) and isinstance(doc.get("data"), dict) else None
|
return doc.get("data") if isinstance(doc, dict) and isinstance(doc.get("data"), dict) else None
|
||||||
|
|
||||||
|
def generate_total_summary(self) -> Dict[str, Any]:
|
||||||
|
"""Build a lightweight consolidated summary across available context documents."""
|
||||||
|
manifest = self.load_context_manifest() or {"documents": []}
|
||||||
|
docs = manifest.get("documents") if isinstance(manifest.get("documents"), list) else []
|
||||||
|
overview = []
|
||||||
|
for item in docs:
|
||||||
|
if not isinstance(item, dict):
|
||||||
|
continue
|
||||||
|
path = str(item.get("path") or "")
|
||||||
|
if not path:
|
||||||
|
continue
|
||||||
|
doc = self._load_context_document(path) or {}
|
||||||
|
summary = doc.get("agent_summary") if isinstance(doc.get("agent_summary"), dict) else {}
|
||||||
|
quick_facts = summary.get("quick_facts") if isinstance(summary.get("quick_facts"), dict) else {}
|
||||||
|
hints = summary.get("retrieval_hints") if isinstance(summary.get("retrieval_hints"), dict) else {}
|
||||||
|
overview.append(
|
||||||
|
{
|
||||||
|
"path": path,
|
||||||
|
"context_type": doc.get("context_type"),
|
||||||
|
"updated_at": doc.get("updated_at") or item.get("updated_at"),
|
||||||
|
"journey_stage": self._extract_journey_stage(doc),
|
||||||
|
"high_signal_terms": hints.get("high_signal_terms") if isinstance(hints.get("high_signal_terms"), list) else [],
|
||||||
|
"quick_facts": quick_facts,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"user_id": str(self.user_id),
|
||||||
|
"generated_at": datetime.utcnow().isoformat(),
|
||||||
|
"document_count": len(overview),
|
||||||
|
"documents": overview,
|
||||||
|
}
|
||||||
|
|||||||
@@ -340,6 +340,46 @@ class SIFIntegrationService:
|
|||||||
logger.warning(f"Failed to load flat context manifest for user {self.user_id}: {e}")
|
logger.warning(f"Failed to load flat context manifest for user {self.user_id}: {e}")
|
||||||
return {"source": "none", "data": {"documents": []}}
|
return {"source": "none", "data": {"documents": []}}
|
||||||
|
|
||||||
|
async def get_merged_flat_context(self) -> Dict[str, Any]:
|
||||||
|
"""Return merged onboarding context from all available flat context documents.
|
||||||
|
|
||||||
|
This is an aggregation helper; step-specific APIs still return one-by-one files.
|
||||||
|
"""
|
||||||
|
store = AgentFlatContextStore(self.user_id)
|
||||||
|
manifest = store.load_context_manifest() or {"documents": []}
|
||||||
|
docs = manifest.get("documents") if isinstance(manifest.get("documents"), list) else []
|
||||||
|
|
||||||
|
merged: Dict[str, Any] = {
|
||||||
|
"source": "flat_file",
|
||||||
|
"user_id": self.user_id,
|
||||||
|
"manifest_updated_at": manifest.get("updated_at"),
|
||||||
|
"steps": {},
|
||||||
|
"agent_summaries": {},
|
||||||
|
"documents": [],
|
||||||
|
}
|
||||||
|
|
||||||
|
for item in docs:
|
||||||
|
if not isinstance(item, dict):
|
||||||
|
continue
|
||||||
|
path = item.get("path")
|
||||||
|
if not path:
|
||||||
|
continue
|
||||||
|
doc = store.load_context_document(str(path)) or {}
|
||||||
|
context_type = str(doc.get("context_type") or item.get("type") or path)
|
||||||
|
merged["documents"].append(
|
||||||
|
{
|
||||||
|
"path": path,
|
||||||
|
"context_type": context_type,
|
||||||
|
"updated_at": doc.get("updated_at") or item.get("updated_at"),
|
||||||
|
"size_bytes": item.get("size_bytes"),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
merged["steps"][context_type] = doc.get("data") if isinstance(doc.get("data"), dict) else {}
|
||||||
|
merged["agent_summaries"][context_type] = doc.get("agent_summary") if isinstance(doc.get("agent_summary"), dict) else {}
|
||||||
|
|
||||||
|
merged["document_count"] = len(merged["documents"])
|
||||||
|
return merged
|
||||||
|
|
||||||
async def index_market_trends_run(self, trends_result: Dict[str, Any], run_id: str) -> bool:
|
async def index_market_trends_run(self, trends_result: Dict[str, Any], run_id: str) -> bool:
|
||||||
try:
|
try:
|
||||||
latest_id = f"market_trends_latest:{self.user_id}"
|
latest_id = f"market_trends_latest:{self.user_id}"
|
||||||
|
|||||||
156
backend/tests/test_agent_context_vfs.py
Normal file
156
backend/tests/test_agent_context_vfs.py
Normal file
@@ -0,0 +1,156 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
import types
|
||||||
|
import importlib.util
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
# Lightweight fallback for environments missing loguru.
|
||||||
|
if "loguru" not in sys.modules:
|
||||||
|
stub = types.ModuleType("loguru")
|
||||||
|
stub.logger = types.SimpleNamespace(
|
||||||
|
info=lambda *a, **k: None,
|
||||||
|
warning=lambda *a, **k: None,
|
||||||
|
error=lambda *a, **k: None,
|
||||||
|
debug=lambda *a, **k: None,
|
||||||
|
)
|
||||||
|
sys.modules["loguru"] = stub
|
||||||
|
|
||||||
|
def _load_module(name: str, rel_path: str):
|
||||||
|
base = Path(__file__).resolve().parents[1]
|
||||||
|
path = base / rel_path
|
||||||
|
spec = importlib.util.spec_from_file_location(name, path)
|
||||||
|
module = importlib.util.module_from_spec(spec)
|
||||||
|
assert spec and spec.loader
|
||||||
|
spec.loader.exec_module(module)
|
||||||
|
return module
|
||||||
|
|
||||||
|
|
||||||
|
flat_mod = _load_module("agent_flat_context_under_test", "services/intelligence/agent_flat_context.py")
|
||||||
|
sys.modules.setdefault("services.intelligence.agent_flat_context", flat_mod)
|
||||||
|
vfs_mod = _load_module("agent_context_vfs_under_test", "services/intelligence/agent_context_vfs.py")
|
||||||
|
|
||||||
|
AgentFlatContextStore = flat_mod.AgentFlatContextStore
|
||||||
|
AgentContextVFS = vfs_mod.AgentContextVFS
|
||||||
|
|
||||||
|
|
||||||
|
def _cleanup_workspace(user_id: str, project_id: str | None = None) -> None:
|
||||||
|
safe_user = ''.join(c for c in str(user_id) if c.isalnum() or c in ('-', '_')) or 'unknown_user'
|
||||||
|
root = Path(__file__).resolve().parents[2] / 'workspace'
|
||||||
|
user_dir = root / f'workspace_{safe_user}'
|
||||||
|
if user_dir.exists():
|
||||||
|
import shutil
|
||||||
|
shutil.rmtree(user_dir, ignore_errors=True)
|
||||||
|
|
||||||
|
if project_id:
|
||||||
|
safe_project = ''.join(c for c in str(project_id) if c.isalnum() or c in ('-', '_')) or 'default_project'
|
||||||
|
project_dir = root / f'project_{safe_project}'
|
||||||
|
if project_dir.exists():
|
||||||
|
import shutil
|
||||||
|
shutil.rmtree(project_dir, ignore_errors=True)
|
||||||
|
|
||||||
|
|
||||||
|
def test_search_context_query_variants_and_can_answer():
|
||||||
|
user_id = 'pytest_vfs_user'
|
||||||
|
_cleanup_workspace(user_id)
|
||||||
|
|
||||||
|
store = AgentFlatContextStore(user_id)
|
||||||
|
payload = {
|
||||||
|
'website_url': 'https://example.com',
|
||||||
|
'brand_analysis': {'brand_voice': 'Authoritative'},
|
||||||
|
'recommended_settings': {'writing_tone': 'Conversational'},
|
||||||
|
'content_type': {'primary_type': 'Blog'},
|
||||||
|
'target_audience': {'primary_audience': 'Founders'},
|
||||||
|
}
|
||||||
|
assert store.save_step2_website_analysis(payload)
|
||||||
|
|
||||||
|
vfs = AgentContextVFS(user_id)
|
||||||
|
result = vfs.search_context('tone')
|
||||||
|
|
||||||
|
assert result['query'] == 'tone'
|
||||||
|
assert 'attempted_queries' in result
|
||||||
|
assert result['attempted_queries'][0] == 'tone'
|
||||||
|
assert result['can_answer'] is True
|
||||||
|
assert len(result['results']) >= 1
|
||||||
|
assert 'triage_top5' in result
|
||||||
|
assert len(result['triage_top5']) >= 1
|
||||||
|
assert 'low_probability' in result['results'][0]
|
||||||
|
|
||||||
|
|
||||||
|
def test_inspect_file_large_document_summary_plus_keys():
|
||||||
|
user_id = 'pytest_vfs_large'
|
||||||
|
_cleanup_workspace(user_id)
|
||||||
|
|
||||||
|
store = AgentFlatContextStore(user_id)
|
||||||
|
large_blob = 'x' * 9000
|
||||||
|
payload = {
|
||||||
|
'website_url': 'https://big.example.com',
|
||||||
|
'brand_analysis': {'brand_voice': 'Bold'},
|
||||||
|
'recommended_settings': {'writing_tone': 'Direct'},
|
||||||
|
'target_audience': {'primary_audience': 'Teams'},
|
||||||
|
'crawl_result': {'raw': large_blob},
|
||||||
|
}
|
||||||
|
assert store.save_step2_website_analysis(payload)
|
||||||
|
|
||||||
|
vfs = AgentContextVFS(user_id)
|
||||||
|
out = vfs.inspect_file('step2_website_analysis.json')
|
||||||
|
|
||||||
|
assert out['mode'] == 'summary_plus_keys'
|
||||||
|
assert 'agent_summary' in out
|
||||||
|
assert 'keys' in out
|
||||||
|
assert 'crawl_result' in out['keys']
|
||||||
|
|
||||||
|
|
||||||
|
def test_write_shared_note_and_activity_log_created():
|
||||||
|
user_id = 'pytest_collab_user'
|
||||||
|
project_id = 'proj_abc'
|
||||||
|
_cleanup_workspace(user_id, project_id)
|
||||||
|
|
||||||
|
vfs = AgentContextVFS(user_id, project_id=project_id)
|
||||||
|
write_res = vfs.write_shared_note('Draft collaboration note', agent_id='agent_one')
|
||||||
|
|
||||||
|
assert write_res['ok'] is True
|
||||||
|
assert write_res['file'] == 'collaboration.md'
|
||||||
|
|
||||||
|
collab = vfs.list_context()['collaboration']
|
||||||
|
scratchpad = Path(collab['scratchpad_dir'])
|
||||||
|
note_file = scratchpad / 'collaboration.md'
|
||||||
|
log_file = scratchpad / 'activity_log.jsonl'
|
||||||
|
|
||||||
|
assert note_file.exists()
|
||||||
|
assert log_file.exists()
|
||||||
|
|
||||||
|
content = note_file.read_text(encoding='utf-8')
|
||||||
|
assert 'agent_one' in content
|
||||||
|
assert 'Draft collaboration note' in content
|
||||||
|
|
||||||
|
lines = [json.loads(l) for l in log_file.read_text(encoding='utf-8').splitlines() if l.strip()]
|
||||||
|
assert any(entry.get('event_type') == 'shared_note_written' for entry in lines)
|
||||||
|
|
||||||
|
|
||||||
|
def test_read_struct_path_resolution_and_dependency_context():
|
||||||
|
user_id = 'pytest_struct_user'
|
||||||
|
_cleanup_workspace(user_id)
|
||||||
|
|
||||||
|
store = AgentFlatContextStore(user_id)
|
||||||
|
assert store.save_step2_website_analysis(
|
||||||
|
{
|
||||||
|
'website_url': 'https://struct.example.com',
|
||||||
|
'brand_analysis': {'brand_voice': 'Pragmatic'},
|
||||||
|
'recommended_settings': {'writing_tone': 'Clear'},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
assert store.save_step4_persona_data(
|
||||||
|
{
|
||||||
|
'core_persona': {'name': 'Ops Leader', 'goal': 'Scale ops'},
|
||||||
|
'selected_platforms': ['linkedin'],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
vfs = AgentContextVFS(user_id)
|
||||||
|
out = vfs.read_struct('step4_persona_data.json', 'data.core_persona.name')
|
||||||
|
|
||||||
|
assert out['ok'] is True
|
||||||
|
assert out['data'] == 'Ops Leader'
|
||||||
|
assert out['dependency_context']['brand_voice'] == 'Pragmatic'
|
||||||
Reference in New Issue
Block a user