dashboard-nanobot/scripts/audit_backend_integrity.py

309 lines
10 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import ast
import builtins
import dis
import importlib
import inspect
import pathlib
import pkgutil
import sys
from dataclasses import dataclass
from typing import Iterable, List, Sequence
PROJECT_ROOT = pathlib.Path(__file__).resolve().parents[1]
BACKEND_ROOT = PROJECT_ROOT / "backend"
ROUTER_MAX_LINES = 400
SERVICE_MAX_LINES = 500
@dataclass
class Finding:
severity: str
code: str
path: str
detail: str
def _iter_backend_py_files() -> Iterable[pathlib.Path]:
for path in sorted(BACKEND_ROOT.rglob("*.py")):
if "venv" in path.parts:
continue
yield path
def _iter_backend_pyc_files() -> Iterable[pathlib.Path]:
for path in sorted(BACKEND_ROOT.rglob("*.pyc")):
if "venv" in path.parts:
continue
yield path
def _module_name_from_path(path: pathlib.Path) -> str:
rel = path.relative_to(BACKEND_ROOT).with_suffix("")
return ".".join(rel.parts)
def _parse_ast(path: pathlib.Path) -> ast.AST | None:
try:
return ast.parse(path.read_text(encoding="utf-8"), filename=str(path))
except Exception as exc:
return ast.parse("", filename=f"{path} [parse failed: {exc}]")
def _check_import_all(paths: Sequence[pathlib.Path]) -> List[Finding]:
findings: List[Finding] = []
for path in paths:
tree = _parse_ast(path)
for node in ast.walk(tree):
if isinstance(node, ast.ImportFrom) and any(alias.name == "*" for alias in node.names):
findings.append(
Finding(
severity="ERROR",
code="import-star",
path=str(path.relative_to(PROJECT_ROOT)),
detail=f"wildcard import from '{node.module or ''}'",
)
)
return findings
def _check_settings_imports(paths: Sequence[pathlib.Path]) -> List[Finding]:
findings: List[Finding] = []
sys.path.insert(0, str(BACKEND_ROOT))
settings = importlib.import_module("core.settings")
available = set(dir(settings))
for path in paths:
tree = _parse_ast(path)
for node in ast.walk(tree):
if not isinstance(node, ast.ImportFrom) or node.module != "core.settings":
continue
for alias in node.names:
if alias.name == "*":
continue
if alias.name not in available:
findings.append(
Finding(
severity="ERROR",
code="settings-missing-symbol",
path=str(path.relative_to(PROJECT_ROOT)),
detail=f"imports missing symbol '{alias.name}' from core.settings",
)
)
return findings
def _check_importability(paths: Sequence[pathlib.Path]) -> List[Finding]:
findings: List[Finding] = []
sys.path.insert(0, str(BACKEND_ROOT))
for path in paths:
if path.name == "__init__.py":
continue
module_name = _module_name_from_path(path)
try:
importlib.import_module(module_name)
except Exception as exc:
findings.append(
Finding(
severity="ERROR",
code="module-import-failed",
path=str(path.relative_to(PROJECT_ROOT)),
detail=f"{module_name}: {exc.__class__.__name__}: {exc}",
)
)
return findings
def _check_runtime_global_refs(paths: Sequence[pathlib.Path]) -> List[Finding]:
findings: List[Finding] = []
sys.path.insert(0, str(BACKEND_ROOT))
builtin_names = set(dir(builtins))
module_names = {
_module_name_from_path(path)
for path in paths
if path.name != "__init__.py"
}
for module_name in sorted(module_names):
try:
module = importlib.import_module(module_name)
except Exception:
continue
module_globals = module.__dict__
def _check_function(obj: object, qualname: str) -> None:
if module_name.startswith("models.") and qualname.endswith(".__init__"):
return
try:
instructions = list(dis.get_instructions(obj))
except TypeError:
return
for ins in instructions:
if ins.opname not in {"LOAD_GLOBAL", "LOAD_NAME"}:
continue
ref_name = str(ins.argval or "").strip()
if not ref_name or ref_name in builtin_names or ref_name in module_globals:
continue
findings.append(
Finding(
severity="ERROR",
code="runtime-missing-global",
path=module_name.replace(".", "/") + ".py",
detail=f"{qualname} uses missing global '{ref_name}' ({ins.opname})",
)
)
for _attr_name, obj in vars(module).items():
if inspect.isfunction(obj) and obj.__module__ == module.__name__:
_check_function(obj, obj.__qualname__)
elif inspect.isclass(obj) and obj.__module__ == module.__name__:
for _method_name, method in vars(obj).items():
if inspect.isfunction(method):
_check_function(method, f"{obj.__qualname__}.{method.__name__}")
deduped: dict[tuple[str, str, str], Finding] = {}
for finding in findings:
key = (finding.code, finding.path, finding.detail)
deduped[key] = finding
return list(deduped.values())
def _check_app_factory() -> List[Finding]:
findings: List[Finding] = []
sys.path.insert(0, str(BACKEND_ROOT))
try:
from app_factory import create_app
except Exception as exc:
return [
Finding(
severity="ERROR",
code="app-factory-import-failed",
path="backend/app_factory.py",
detail=f"{exc.__class__.__name__}: {exc}",
)
]
try:
app = create_app()
except Exception as exc:
return [
Finding(
severity="ERROR",
code="app-factory-create-failed",
path="backend/app_factory.py",
detail=f"{exc.__class__.__name__}: {exc}",
)
]
route_keys: set[tuple[str, str]] = set()
duplicates: set[tuple[str, str]] = set()
for route in getattr(app, "routes", []):
path = str(getattr(route, "path", "") or "").strip()
methods = sorted(getattr(route, "methods", []) or [])
for method in methods:
if method in {"HEAD", "OPTIONS"}:
continue
key = (method, path)
if key in route_keys:
duplicates.add(key)
else:
route_keys.add(key)
for method, path in sorted(duplicates):
findings.append(
Finding(
severity="ERROR",
code="duplicate-route",
path="backend/app_factory.py",
detail=f"duplicate route registered for {method} {path}",
)
)
return findings
def _source_path_for_pyc(path: pathlib.Path) -> pathlib.Path | None:
if path.name == "__init__.cpython-312.pyc":
return path.parent.parent / "__init__.py"
if path.parent.name != "__pycache__":
return None
stem = path.name.split(".cpython-", 1)[0]
return path.parent.parent / f"{stem}.py"
def _check_pyc_without_source(pyc_paths: Sequence[pathlib.Path]) -> List[Finding]:
findings: List[Finding] = []
for path in pyc_paths:
source = _source_path_for_pyc(path)
if source is None:
continue
if source.exists():
continue
findings.append(
Finding(
severity="ERROR",
code="pyc-without-source",
path=str(path.relative_to(PROJECT_ROOT)),
detail=f"compiled module has no source file at {source.relative_to(PROJECT_ROOT)}",
)
)
return findings
def _check_file_sizes(paths: Sequence[pathlib.Path]) -> List[Finding]:
findings: List[Finding] = []
for path in paths:
rel = path.relative_to(BACKEND_ROOT)
line_count = sum(1 for _ in path.open("r", encoding="utf-8"))
if rel.parts[:1] == ("api",) and line_count > ROUTER_MAX_LINES:
findings.append(
Finding(
severity="WARN",
code="router-too-large",
path=str(path.relative_to(PROJECT_ROOT)),
detail=f"{line_count} lines exceeds router limit {ROUTER_MAX_LINES}",
)
)
if rel.parts[:1] == ("services",) and line_count > SERVICE_MAX_LINES:
findings.append(
Finding(
severity="WARN",
code="service-too-large",
path=str(path.relative_to(PROJECT_ROOT)),
detail=f"{line_count} lines exceeds service limit {SERVICE_MAX_LINES}",
)
)
return findings
def _print_findings(findings: Sequence[Finding]) -> None:
if not findings:
print("PASS backend integrity audit")
return
for finding in findings:
print(f"[{finding.severity}] {finding.code} {finding.path} :: {finding.detail}")
def main() -> int:
py_files = list(_iter_backend_py_files())
pyc_files = list(_iter_backend_pyc_files())
findings: List[Finding] = []
findings.extend(_check_import_all(py_files))
findings.extend(_check_settings_imports(py_files))
findings.extend(_check_importability(py_files))
findings.extend(_check_runtime_global_refs(py_files))
findings.extend(_check_app_factory())
findings.extend(_check_pyc_without_source(pyc_files))
findings.extend(_check_file_sizes(py_files))
findings.sort(key=lambda item: (item.severity != "ERROR", item.code, item.path))
_print_findings(findings)
return 1 if any(item.severity == "ERROR" for item in findings) else 0
if __name__ == "__main__":
raise SystemExit(main())