feat: add static triage and structural reader with tests

This commit is contained in:
ي
2026-04-10 21:03:39 +05:30
parent e2726805f3
commit 79f26c815b
3 changed files with 192 additions and 3 deletions

View File

@@ -100,6 +100,7 @@ class AgentContextVFS:
"/steps/persona": AgentFlatContextStore.STEP4_FILENAME,
"/steps/integrations": AgentFlatContextStore.STEP5_FILENAME,
}
HIGH_SIGNAL_MARKERS = ("agent_summary", "high_signal_terms", "quick_facts", "context_type")
def __init__(self, user_id: str, project_id: Optional[str] = None):
self.user_id = user_id
@@ -248,6 +249,93 @@ class AgentContextVFS:
clustered.sort(key=lambda r: (-int(r.get("score", 0)), str(r.get("path") or "")))
return clustered
def _keyword_density(self, snippet: str, query: str) -> float:
if not snippet or not query:
return 0.0
query_tokens = [t for t in query.lower().split() if t]
if not query_tokens:
return 0.0
text = snippet.lower()
hits = sum(text.count(tok) for tok in query_tokens)
words = max(1, len(text.split()))
return hits / words
def _static_triage(self, results: List[Dict[str, Any]], query: str) -> List[Dict[str, Any]]:
"""Semgrep-style static heuristic triage before main agent consumption."""
triaged: List[Dict[str, Any]] = []
for r in results:
snippet = str(r.get("snippet") or "")
density = self._keyword_density(snippet, query)
marker_hit = any(marker in snippet.lower() for marker in self.HIGH_SIGNAL_MARKERS)
low_probability = bool(density < 0.01 and not marker_hit)
item = dict(r)
item["keyword_density"] = round(density, 4)
item["low_probability"] = low_probability
triaged.append(item)
triaged.sort(
key=lambda x: (
bool(x.get("low_probability")),
-float(x.get("confidence", 0)),
-int(x.get("score", 0)),
)
)
return triaged
@staticmethod
def _llm_router_stub(results: List[Dict[str, Any]], top_k: int = 5) -> List[Dict[str, Any]]:
"""Fast local triage stub (drop low-probability first; keep strongest candidates)."""
ranked = sorted(
results,
key=lambda x: (
bool(x.get("low_probability")),
-float(x.get("confidence", 0)),
-int(x.get("score", 0)),
),
)
return ranked[: max(1, top_k)]
@staticmethod
def _resolve_json_path(data: Any, path_query: str) -> Any:
"""Resolve dot/bracket JSON path such as 'data.seo_audit.recommendations[0]'."""
if not path_query:
return data
current = data
query = path_query.strip()
parts: List[str] = []
buf = ""
in_brackets = False
for ch in query:
if ch == "." and not in_brackets:
if buf:
parts.append(buf)
buf = ""
continue
if ch == "[":
in_brackets = True
elif ch == "]":
in_brackets = False
buf += ch
if buf:
parts.append(buf)
for part in parts:
if "[" in part and part.endswith("]"):
key, idx_raw = part.split("[", 1)
idx = int(idx_raw[:-1])
if key:
if not isinstance(current, dict):
raise KeyError(key)
current = current[key]
if not isinstance(current, list):
raise IndexError(idx)
current = current[idx]
else:
if not isinstance(current, dict):
raise KeyError(part)
current = current[part]
return current
def _resolve_path(self, path: str) -> Tuple[str, Optional[str]]:
normalized = (path or "").strip()
if not normalized:
@@ -434,11 +522,12 @@ class AgentContextVFS:
"query": normalized,
"attempted_queries": attempted_queries,
"matched_files_count": len(matched_files),
"results": bounded_results,
"results": self._static_triage(bounded_results, normalized),
"notice": notice,
"char_budget_used": used,
"can_answer": bool(bounded_results),
}
result["triage_top5"] = self._llm_router_stub(result["results"], top_k=5)
logger.info(
f"[vfs_audit] user={self.store.safe_user_id} action=search_context query={normalized!r} results={len(result['results'])}"
)
@@ -603,6 +692,38 @@ class AgentContextVFS:
logger.warning(f"Failed to append activity log: {exc}")
return {"ok": False, "error": str(exc)}
def read_struct(self, filename: str, path_query: str) -> Dict[str, Any]:
"""AST-style structural reader for JSON context files with dependency context injection."""
resolved_kind, resolved = self._resolve_path(filename)
if resolved_kind == "virtual_summary" or not resolved:
return {"ok": False, "error": "Invalid file"}
doc = self.store.load_context_document(resolved)
if not isinstance(doc, dict):
return {"ok": False, "error": "File not found"}
try:
extracted = self._resolve_json_path(doc, path_query)
except Exception as exc:
return {"ok": False, "error": f"path_query resolution failed: {exc}"}
# Lightweight dependency context: inject brand voice from step2 when reading persona structures.
dependency_context: Dict[str, Any] = {}
if "persona" in path_query.lower() or resolved == AgentFlatContextStore.STEP4_FILENAME:
step2 = self.store.load_step2_context_document() or {}
step2_data = step2.get("data") if isinstance(step2.get("data"), dict) else {}
brand = step2_data.get("brand_analysis") if isinstance(step2_data.get("brand_analysis"), dict) else {}
dependency_context["brand_voice"] = brand.get("brand_voice")
return {
"ok": True,
"file": resolved,
"path_query": path_query,
"data": extracted,
"dependency_context": dependency_context,
"context": "Extracted via structural parse to save tokens.",
}
def build_filesystem_header(user_id: str) -> str: