From 79f26c815baf47c89421b6276070f1110df904ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Fri, 10 Apr 2026 21:03:39 +0530 Subject: [PATCH] feat: add static triage and structural reader with tests --- backend/docs/AGENT_FLAT_CONTEXT_REVIEW.md | 10 ++ .../intelligence/agent_context_vfs.py | 123 +++++++++++++++++- backend/tests/test_agent_context_vfs.py | 62 ++++++++- 3 files changed, 192 insertions(+), 3 deletions(-) diff --git a/backend/docs/AGENT_FLAT_CONTEXT_REVIEW.md b/backend/docs/AGENT_FLAT_CONTEXT_REVIEW.md index 47b7fc96..9682e4dc 100644 --- a/backend/docs/AGENT_FLAT_CONTEXT_REVIEW.md +++ b/backend/docs/AGENT_FLAT_CONTEXT_REVIEW.md @@ -185,3 +185,13 @@ The following enhancements are now implemented: - collaborative write path (`write_shared_note`) and append-only activity logging. - Test module: `backend/tests/test_agent_context_vfs.py`. - These tests provide a baseline regression harness for VFS retrieval quality and shared-memory safety. + +11. **Static + Structural retrieval hardening** + - Added a **static triage layer** in `search_context`: + - keyword-density scoring, + - `low_probability` flags for likely-noisy hits, + - `triage_top5` shortlist for router-style pre-filtering. + - Added `read_struct(filename, path_query)`: + - resolves dot/bracket JSON paths to return node-level data only, + - includes lightweight dependency injection (e.g., Step 4 persona reads include Step 2 brand voice context when available), + - keeps output token-efficient for downstream agents. diff --git a/backend/services/intelligence/agent_context_vfs.py b/backend/services/intelligence/agent_context_vfs.py index accb98ea..5da5f8ba 100644 --- a/backend/services/intelligence/agent_context_vfs.py +++ b/backend/services/intelligence/agent_context_vfs.py @@ -100,6 +100,7 @@ class AgentContextVFS: "/steps/persona": AgentFlatContextStore.STEP4_FILENAME, "/steps/integrations": AgentFlatContextStore.STEP5_FILENAME, } + HIGH_SIGNAL_MARKERS = ("agent_summary", "high_signal_terms", "quick_facts", "context_type") def __init__(self, user_id: str, project_id: Optional[str] = None): self.user_id = user_id @@ -248,6 +249,93 @@ class AgentContextVFS: clustered.sort(key=lambda r: (-int(r.get("score", 0)), str(r.get("path") or ""))) return clustered + def _keyword_density(self, snippet: str, query: str) -> float: + if not snippet or not query: + return 0.0 + query_tokens = [t for t in query.lower().split() if t] + if not query_tokens: + return 0.0 + text = snippet.lower() + hits = sum(text.count(tok) for tok in query_tokens) + words = max(1, len(text.split())) + return hits / words + + def _static_triage(self, results: List[Dict[str, Any]], query: str) -> List[Dict[str, Any]]: + """Semgrep-style static heuristic triage before main agent consumption.""" + triaged: List[Dict[str, Any]] = [] + for r in results: + snippet = str(r.get("snippet") or "") + density = self._keyword_density(snippet, query) + marker_hit = any(marker in snippet.lower() for marker in self.HIGH_SIGNAL_MARKERS) + low_probability = bool(density < 0.01 and not marker_hit) + item = dict(r) + item["keyword_density"] = round(density, 4) + item["low_probability"] = low_probability + triaged.append(item) + triaged.sort( + key=lambda x: ( + bool(x.get("low_probability")), + -float(x.get("confidence", 0)), + -int(x.get("score", 0)), + ) + ) + return triaged + + @staticmethod + def _llm_router_stub(results: List[Dict[str, Any]], top_k: int = 5) -> List[Dict[str, Any]]: + """Fast local triage stub (drop low-probability first; keep strongest candidates).""" + ranked = sorted( + results, + key=lambda x: ( + bool(x.get("low_probability")), + -float(x.get("confidence", 0)), + -int(x.get("score", 0)), + ), + ) + return ranked[: max(1, top_k)] + + @staticmethod + def _resolve_json_path(data: Any, path_query: str) -> Any: + """Resolve dot/bracket JSON path such as 'data.seo_audit.recommendations[0]'.""" + if not path_query: + return data + + current = data + query = path_query.strip() + parts: List[str] = [] + buf = "" + in_brackets = False + for ch in query: + if ch == "." and not in_brackets: + if buf: + parts.append(buf) + buf = "" + continue + if ch == "[": + in_brackets = True + elif ch == "]": + in_brackets = False + buf += ch + if buf: + parts.append(buf) + + for part in parts: + if "[" in part and part.endswith("]"): + key, idx_raw = part.split("[", 1) + idx = int(idx_raw[:-1]) + if key: + if not isinstance(current, dict): + raise KeyError(key) + current = current[key] + if not isinstance(current, list): + raise IndexError(idx) + current = current[idx] + else: + if not isinstance(current, dict): + raise KeyError(part) + current = current[part] + return current + def _resolve_path(self, path: str) -> Tuple[str, Optional[str]]: normalized = (path or "").strip() if not normalized: @@ -434,11 +522,12 @@ class AgentContextVFS: "query": normalized, "attempted_queries": attempted_queries, "matched_files_count": len(matched_files), - "results": bounded_results, + "results": self._static_triage(bounded_results, normalized), "notice": notice, "char_budget_used": used, "can_answer": bool(bounded_results), } + result["triage_top5"] = self._llm_router_stub(result["results"], top_k=5) logger.info( f"[vfs_audit] user={self.store.safe_user_id} action=search_context query={normalized!r} results={len(result['results'])}" ) @@ -603,6 +692,38 @@ class AgentContextVFS: logger.warning(f"Failed to append activity log: {exc}") return {"ok": False, "error": str(exc)} + def read_struct(self, filename: str, path_query: str) -> Dict[str, Any]: + """AST-style structural reader for JSON context files with dependency context injection.""" + resolved_kind, resolved = self._resolve_path(filename) + if resolved_kind == "virtual_summary" or not resolved: + return {"ok": False, "error": "Invalid file"} + + doc = self.store.load_context_document(resolved) + if not isinstance(doc, dict): + return {"ok": False, "error": "File not found"} + + try: + extracted = self._resolve_json_path(doc, path_query) + except Exception as exc: + return {"ok": False, "error": f"path_query resolution failed: {exc}"} + + # Lightweight dependency context: inject brand voice from step2 when reading persona structures. + dependency_context: Dict[str, Any] = {} + if "persona" in path_query.lower() or resolved == AgentFlatContextStore.STEP4_FILENAME: + step2 = self.store.load_step2_context_document() or {} + step2_data = step2.get("data") if isinstance(step2.get("data"), dict) else {} + brand = step2_data.get("brand_analysis") if isinstance(step2_data.get("brand_analysis"), dict) else {} + dependency_context["brand_voice"] = brand.get("brand_voice") + + return { + "ok": True, + "file": resolved, + "path_query": path_query, + "data": extracted, + "dependency_context": dependency_context, + "context": "Extracted via structural parse to save tokens.", + } + def build_filesystem_header(user_id: str) -> str: diff --git a/backend/tests/test_agent_context_vfs.py b/backend/tests/test_agent_context_vfs.py index a3523610..333c14d6 100644 --- a/backend/tests/test_agent_context_vfs.py +++ b/backend/tests/test_agent_context_vfs.py @@ -1,10 +1,38 @@ from __future__ import annotations import json +import sys +import types +import importlib.util from pathlib import Path -from services.intelligence.agent_flat_context import AgentFlatContextStore -from services.intelligence.agent_context_vfs import AgentContextVFS +# Lightweight fallback for environments missing loguru. +if "loguru" not in sys.modules: + stub = types.ModuleType("loguru") + stub.logger = types.SimpleNamespace( + info=lambda *a, **k: None, + warning=lambda *a, **k: None, + error=lambda *a, **k: None, + debug=lambda *a, **k: None, + ) + sys.modules["loguru"] = stub + +def _load_module(name: str, rel_path: str): + base = Path(__file__).resolve().parents[1] + path = base / rel_path + spec = importlib.util.spec_from_file_location(name, path) + module = importlib.util.module_from_spec(spec) + assert spec and spec.loader + spec.loader.exec_module(module) + return module + + +flat_mod = _load_module("agent_flat_context_under_test", "services/intelligence/agent_flat_context.py") +sys.modules.setdefault("services.intelligence.agent_flat_context", flat_mod) +vfs_mod = _load_module("agent_context_vfs_under_test", "services/intelligence/agent_context_vfs.py") + +AgentFlatContextStore = flat_mod.AgentFlatContextStore +AgentContextVFS = vfs_mod.AgentContextVFS def _cleanup_workspace(user_id: str, project_id: str | None = None) -> None: @@ -45,6 +73,9 @@ def test_search_context_query_variants_and_can_answer(): assert result['attempted_queries'][0] == 'tone' assert result['can_answer'] is True assert len(result['results']) >= 1 + assert 'triage_top5' in result + assert len(result['triage_top5']) >= 1 + assert 'low_probability' in result['results'][0] def test_inspect_file_large_document_summary_plus_keys(): @@ -96,3 +127,30 @@ def test_write_shared_note_and_activity_log_created(): lines = [json.loads(l) for l in log_file.read_text(encoding='utf-8').splitlines() if l.strip()] assert any(entry.get('event_type') == 'shared_note_written' for entry in lines) + + +def test_read_struct_path_resolution_and_dependency_context(): + user_id = 'pytest_struct_user' + _cleanup_workspace(user_id) + + store = AgentFlatContextStore(user_id) + assert store.save_step2_website_analysis( + { + 'website_url': 'https://struct.example.com', + 'brand_analysis': {'brand_voice': 'Pragmatic'}, + 'recommended_settings': {'writing_tone': 'Clear'}, + } + ) + assert store.save_step4_persona_data( + { + 'core_persona': {'name': 'Ops Leader', 'goal': 'Scale ops'}, + 'selected_platforms': ['linkedin'], + } + ) + + vfs = AgentContextVFS(user_id) + out = vfs.read_struct('step4_persona_data.json', 'data.core_persona.name') + + assert out['ok'] is True + assert out['data'] == 'Ops Leader' + assert out['dependency_context']['brand_voice'] == 'Pragmatic'