Harden user-scoped subscription route access checks
This commit is contained in:
82
backend/api/subscription/routes/route_access_audit.py
Normal file
82
backend/api/subscription/routes/route_access_audit.py
Normal file
@@ -0,0 +1,82 @@
|
||||
"""Quick route-level audit to enforce user-scoped access checks."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import ast
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple
|
||||
|
||||
ROUTES_DIR = Path(__file__).resolve().parent
|
||||
|
||||
|
||||
def _decorator_path(decorator: ast.AST) -> str | None:
|
||||
"""Extract route path from decorators like @router.get("/usage/{user_id}")."""
|
||||
if not isinstance(decorator, ast.Call):
|
||||
return None
|
||||
if not isinstance(decorator.func, ast.Attribute):
|
||||
return None
|
||||
if not decorator.args:
|
||||
return None
|
||||
|
||||
first_arg = decorator.args[0]
|
||||
if isinstance(first_arg, ast.Constant) and isinstance(first_arg.value, str):
|
||||
return first_arg.value
|
||||
return None
|
||||
|
||||
|
||||
def _has_current_user_dependency(fn_node: ast.AsyncFunctionDef) -> bool:
|
||||
for arg in fn_node.args.args:
|
||||
if arg.arg != "current_user":
|
||||
continue
|
||||
default_index = fn_node.args.args.index(arg) - (len(fn_node.args.args) - len(fn_node.args.defaults))
|
||||
if default_index < 0:
|
||||
continue
|
||||
default_node = fn_node.args.defaults[default_index]
|
||||
if not isinstance(default_node, ast.Call):
|
||||
continue
|
||||
if isinstance(default_node.func, ast.Name) and default_node.func.id == "Depends":
|
||||
if default_node.args and isinstance(default_node.args[0], ast.Name):
|
||||
return default_node.args[0].id == "get_current_user"
|
||||
return False
|
||||
|
||||
|
||||
def _has_verify_user_access_call(fn_node: ast.AsyncFunctionDef) -> bool:
|
||||
for node in ast.walk(fn_node):
|
||||
if not isinstance(node, ast.Call):
|
||||
continue
|
||||
if isinstance(node.func, ast.Name) and node.func.id == "verify_user_access":
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def run_access_audit() -> List[Tuple[str, str]]:
|
||||
"""Return (file, function) pairs for user-scoped routes missing auth checks."""
|
||||
failures: List[Tuple[str, str]] = []
|
||||
|
||||
for route_file in ROUTES_DIR.glob("*.py"):
|
||||
if route_file.name in {"__init__.py", Path(__file__).name}:
|
||||
continue
|
||||
|
||||
tree = ast.parse(route_file.read_text(), filename=str(route_file))
|
||||
for node in tree.body:
|
||||
if not isinstance(node, ast.AsyncFunctionDef):
|
||||
continue
|
||||
|
||||
route_paths = [p for d in node.decorator_list if (p := _decorator_path(d))]
|
||||
if not any("{user_id}" in p for p in route_paths):
|
||||
continue
|
||||
|
||||
if not _has_current_user_dependency(node) or not _has_verify_user_access_call(node):
|
||||
failures.append((route_file.name, node.name))
|
||||
|
||||
return failures
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
issues = run_access_audit()
|
||||
if issues:
|
||||
for file_name, fn_name in issues:
|
||||
print(f"FAIL: {file_name}:{fn_name} missing get_current_user/verify_user_access pattern")
|
||||
raise SystemExit(1)
|
||||
|
||||
print("PASS: all user-scoped routes include get_current_user and verify_user_access")
|
||||
Reference in New Issue
Block a user