#!/usr/bin/env python3 from __future__ import annotations import ast import builtins import dis import importlib import inspect import pathlib import pkgutil import sys from dataclasses import dataclass from typing import Iterable, List, Sequence PROJECT_ROOT = pathlib.Path(__file__).resolve().parents[1] BACKEND_ROOT = PROJECT_ROOT / "backend" ROUTER_MAX_LINES = 400 SERVICE_MAX_LINES = 500 @dataclass class Finding: severity: str code: str path: str detail: str def _iter_backend_py_files() -> Iterable[pathlib.Path]: for path in sorted(BACKEND_ROOT.rglob("*.py")): if "venv" in path.parts: continue yield path def _iter_backend_pyc_files() -> Iterable[pathlib.Path]: for path in sorted(BACKEND_ROOT.rglob("*.pyc")): if "venv" in path.parts: continue yield path def _module_name_from_path(path: pathlib.Path) -> str: rel = path.relative_to(BACKEND_ROOT).with_suffix("") return ".".join(rel.parts) def _parse_ast(path: pathlib.Path) -> ast.AST | None: try: return ast.parse(path.read_text(encoding="utf-8"), filename=str(path)) except Exception as exc: return ast.parse("", filename=f"{path} [parse failed: {exc}]") def _check_import_all(paths: Sequence[pathlib.Path]) -> List[Finding]: findings: List[Finding] = [] for path in paths: tree = _parse_ast(path) for node in ast.walk(tree): if isinstance(node, ast.ImportFrom) and any(alias.name == "*" for alias in node.names): findings.append( Finding( severity="ERROR", code="import-star", path=str(path.relative_to(PROJECT_ROOT)), detail=f"wildcard import from '{node.module or ''}'", ) ) return findings def _check_settings_imports(paths: Sequence[pathlib.Path]) -> List[Finding]: findings: List[Finding] = [] sys.path.insert(0, str(BACKEND_ROOT)) settings = importlib.import_module("core.settings") available = set(dir(settings)) for path in paths: tree = _parse_ast(path) for node in ast.walk(tree): if not isinstance(node, ast.ImportFrom) or node.module != "core.settings": continue for alias in node.names: if alias.name == "*": continue if alias.name not in available: findings.append( Finding( severity="ERROR", code="settings-missing-symbol", path=str(path.relative_to(PROJECT_ROOT)), detail=f"imports missing symbol '{alias.name}' from core.settings", ) ) return findings def _check_importability(paths: Sequence[pathlib.Path]) -> List[Finding]: findings: List[Finding] = [] sys.path.insert(0, str(BACKEND_ROOT)) for path in paths: if path.name == "__init__.py": continue module_name = _module_name_from_path(path) try: importlib.import_module(module_name) except Exception as exc: findings.append( Finding( severity="ERROR", code="module-import-failed", path=str(path.relative_to(PROJECT_ROOT)), detail=f"{module_name}: {exc.__class__.__name__}: {exc}", ) ) return findings def _check_runtime_global_refs(paths: Sequence[pathlib.Path]) -> List[Finding]: findings: List[Finding] = [] sys.path.insert(0, str(BACKEND_ROOT)) builtin_names = set(dir(builtins)) module_names = { _module_name_from_path(path) for path in paths if path.name != "__init__.py" } for module_name in sorted(module_names): try: module = importlib.import_module(module_name) except Exception: continue module_globals = module.__dict__ def _check_function(obj: object, qualname: str) -> None: if module_name.startswith("models.") and qualname.endswith(".__init__"): return try: instructions = list(dis.get_instructions(obj)) except TypeError: return for ins in instructions: if ins.opname not in {"LOAD_GLOBAL", "LOAD_NAME"}: continue ref_name = str(ins.argval or "").strip() if not ref_name or ref_name in builtin_names or ref_name in module_globals: continue findings.append( Finding( severity="ERROR", code="runtime-missing-global", path=module_name.replace(".", "/") + ".py", detail=f"{qualname} uses missing global '{ref_name}' ({ins.opname})", ) ) for _attr_name, obj in vars(module).items(): if inspect.isfunction(obj) and obj.__module__ == module.__name__: _check_function(obj, obj.__qualname__) elif inspect.isclass(obj) and obj.__module__ == module.__name__: for _method_name, method in vars(obj).items(): if inspect.isfunction(method): _check_function(method, f"{obj.__qualname__}.{method.__name__}") deduped: dict[tuple[str, str, str], Finding] = {} for finding in findings: key = (finding.code, finding.path, finding.detail) deduped[key] = finding return list(deduped.values()) def _check_app_factory() -> List[Finding]: findings: List[Finding] = [] sys.path.insert(0, str(BACKEND_ROOT)) try: from app_factory import create_app except Exception as exc: return [ Finding( severity="ERROR", code="app-factory-import-failed", path="backend/app_factory.py", detail=f"{exc.__class__.__name__}: {exc}", ) ] try: app = create_app() except Exception as exc: return [ Finding( severity="ERROR", code="app-factory-create-failed", path="backend/app_factory.py", detail=f"{exc.__class__.__name__}: {exc}", ) ] route_keys: set[tuple[str, str]] = set() duplicates: set[tuple[str, str]] = set() for route in getattr(app, "routes", []): path = str(getattr(route, "path", "") or "").strip() methods = sorted(getattr(route, "methods", []) or []) for method in methods: if method in {"HEAD", "OPTIONS"}: continue key = (method, path) if key in route_keys: duplicates.add(key) else: route_keys.add(key) for method, path in sorted(duplicates): findings.append( Finding( severity="ERROR", code="duplicate-route", path="backend/app_factory.py", detail=f"duplicate route registered for {method} {path}", ) ) return findings def _source_path_for_pyc(path: pathlib.Path) -> pathlib.Path | None: if path.name == "__init__.cpython-312.pyc": return path.parent.parent / "__init__.py" if path.parent.name != "__pycache__": return None stem = path.name.split(".cpython-", 1)[0] return path.parent.parent / f"{stem}.py" def _check_pyc_without_source(pyc_paths: Sequence[pathlib.Path]) -> List[Finding]: findings: List[Finding] = [] for path in pyc_paths: source = _source_path_for_pyc(path) if source is None: continue if source.exists(): continue findings.append( Finding( severity="ERROR", code="pyc-without-source", path=str(path.relative_to(PROJECT_ROOT)), detail=f"compiled module has no source file at {source.relative_to(PROJECT_ROOT)}", ) ) return findings def _check_file_sizes(paths: Sequence[pathlib.Path]) -> List[Finding]: findings: List[Finding] = [] for path in paths: rel = path.relative_to(BACKEND_ROOT) line_count = sum(1 for _ in path.open("r", encoding="utf-8")) if rel.parts[:1] == ("api",) and line_count > ROUTER_MAX_LINES: findings.append( Finding( severity="WARN", code="router-too-large", path=str(path.relative_to(PROJECT_ROOT)), detail=f"{line_count} lines exceeds router limit {ROUTER_MAX_LINES}", ) ) if rel.parts[:1] == ("services",) and line_count > SERVICE_MAX_LINES: findings.append( Finding( severity="WARN", code="service-too-large", path=str(path.relative_to(PROJECT_ROOT)), detail=f"{line_count} lines exceeds service limit {SERVICE_MAX_LINES}", ) ) return findings def _print_findings(findings: Sequence[Finding]) -> None: if not findings: print("PASS backend integrity audit") return for finding in findings: print(f"[{finding.severity}] {finding.code} {finding.path} :: {finding.detail}") def main() -> int: py_files = list(_iter_backend_py_files()) pyc_files = list(_iter_backend_pyc_files()) findings: List[Finding] = [] findings.extend(_check_import_all(py_files)) findings.extend(_check_settings_imports(py_files)) findings.extend(_check_importability(py_files)) findings.extend(_check_runtime_global_refs(py_files)) findings.extend(_check_app_factory()) findings.extend(_check_pyc_without_source(pyc_files)) findings.extend(_check_file_sizes(py_files)) findings.sort(key=lambda item: (item.severity != "ERROR", item.code, item.path)) _print_findings(findings) return 1 if any(item.severity == "ERROR" for item in findings) else 0 if __name__ == "__main__": raise SystemExit(main())