309 lines
10 KiB
Python
309 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import ast
|
|
import builtins
|
|
import dis
|
|
import importlib
|
|
import inspect
|
|
import pathlib
|
|
import pkgutil
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from typing import Iterable, List, Sequence
|
|
|
|
|
|
PROJECT_ROOT = pathlib.Path(__file__).resolve().parents[1]
|
|
BACKEND_ROOT = PROJECT_ROOT / "backend"
|
|
|
|
ROUTER_MAX_LINES = 400
|
|
SERVICE_MAX_LINES = 500
|
|
|
|
|
|
@dataclass
|
|
class Finding:
|
|
severity: str
|
|
code: str
|
|
path: str
|
|
detail: str
|
|
|
|
|
|
def _iter_backend_py_files() -> Iterable[pathlib.Path]:
|
|
for path in sorted(BACKEND_ROOT.rglob("*.py")):
|
|
if "venv" in path.parts:
|
|
continue
|
|
yield path
|
|
|
|
|
|
def _iter_backend_pyc_files() -> Iterable[pathlib.Path]:
|
|
for path in sorted(BACKEND_ROOT.rglob("*.pyc")):
|
|
if "venv" in path.parts:
|
|
continue
|
|
yield path
|
|
|
|
|
|
def _module_name_from_path(path: pathlib.Path) -> str:
|
|
rel = path.relative_to(BACKEND_ROOT).with_suffix("")
|
|
return ".".join(rel.parts)
|
|
|
|
|
|
def _parse_ast(path: pathlib.Path) -> ast.AST | None:
|
|
try:
|
|
return ast.parse(path.read_text(encoding="utf-8"), filename=str(path))
|
|
except Exception as exc:
|
|
return ast.parse("", filename=f"{path} [parse failed: {exc}]")
|
|
|
|
|
|
def _check_import_all(paths: Sequence[pathlib.Path]) -> List[Finding]:
|
|
findings: List[Finding] = []
|
|
for path in paths:
|
|
tree = _parse_ast(path)
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.ImportFrom) and any(alias.name == "*" for alias in node.names):
|
|
findings.append(
|
|
Finding(
|
|
severity="ERROR",
|
|
code="import-star",
|
|
path=str(path.relative_to(PROJECT_ROOT)),
|
|
detail=f"wildcard import from '{node.module or ''}'",
|
|
)
|
|
)
|
|
return findings
|
|
|
|
|
|
def _check_settings_imports(paths: Sequence[pathlib.Path]) -> List[Finding]:
|
|
findings: List[Finding] = []
|
|
sys.path.insert(0, str(BACKEND_ROOT))
|
|
settings = importlib.import_module("core.settings")
|
|
available = set(dir(settings))
|
|
for path in paths:
|
|
tree = _parse_ast(path)
|
|
for node in ast.walk(tree):
|
|
if not isinstance(node, ast.ImportFrom) or node.module != "core.settings":
|
|
continue
|
|
for alias in node.names:
|
|
if alias.name == "*":
|
|
continue
|
|
if alias.name not in available:
|
|
findings.append(
|
|
Finding(
|
|
severity="ERROR",
|
|
code="settings-missing-symbol",
|
|
path=str(path.relative_to(PROJECT_ROOT)),
|
|
detail=f"imports missing symbol '{alias.name}' from core.settings",
|
|
)
|
|
)
|
|
return findings
|
|
|
|
|
|
def _check_importability(paths: Sequence[pathlib.Path]) -> List[Finding]:
|
|
findings: List[Finding] = []
|
|
sys.path.insert(0, str(BACKEND_ROOT))
|
|
for path in paths:
|
|
if path.name == "__init__.py":
|
|
continue
|
|
module_name = _module_name_from_path(path)
|
|
try:
|
|
importlib.import_module(module_name)
|
|
except Exception as exc:
|
|
findings.append(
|
|
Finding(
|
|
severity="ERROR",
|
|
code="module-import-failed",
|
|
path=str(path.relative_to(PROJECT_ROOT)),
|
|
detail=f"{module_name}: {exc.__class__.__name__}: {exc}",
|
|
)
|
|
)
|
|
return findings
|
|
|
|
|
|
def _check_runtime_global_refs(paths: Sequence[pathlib.Path]) -> List[Finding]:
|
|
findings: List[Finding] = []
|
|
sys.path.insert(0, str(BACKEND_ROOT))
|
|
builtin_names = set(dir(builtins))
|
|
|
|
module_names = {
|
|
_module_name_from_path(path)
|
|
for path in paths
|
|
if path.name != "__init__.py"
|
|
}
|
|
|
|
for module_name in sorted(module_names):
|
|
try:
|
|
module = importlib.import_module(module_name)
|
|
except Exception:
|
|
continue
|
|
|
|
module_globals = module.__dict__
|
|
|
|
def _check_function(obj: object, qualname: str) -> None:
|
|
if module_name.startswith("models.") and qualname.endswith(".__init__"):
|
|
return
|
|
try:
|
|
instructions = list(dis.get_instructions(obj))
|
|
except TypeError:
|
|
return
|
|
for ins in instructions:
|
|
if ins.opname not in {"LOAD_GLOBAL", "LOAD_NAME"}:
|
|
continue
|
|
ref_name = str(ins.argval or "").strip()
|
|
if not ref_name or ref_name in builtin_names or ref_name in module_globals:
|
|
continue
|
|
findings.append(
|
|
Finding(
|
|
severity="ERROR",
|
|
code="runtime-missing-global",
|
|
path=module_name.replace(".", "/") + ".py",
|
|
detail=f"{qualname} uses missing global '{ref_name}' ({ins.opname})",
|
|
)
|
|
)
|
|
|
|
for _attr_name, obj in vars(module).items():
|
|
if inspect.isfunction(obj) and obj.__module__ == module.__name__:
|
|
_check_function(obj, obj.__qualname__)
|
|
elif inspect.isclass(obj) and obj.__module__ == module.__name__:
|
|
for _method_name, method in vars(obj).items():
|
|
if inspect.isfunction(method):
|
|
_check_function(method, f"{obj.__qualname__}.{method.__name__}")
|
|
|
|
deduped: dict[tuple[str, str, str], Finding] = {}
|
|
for finding in findings:
|
|
key = (finding.code, finding.path, finding.detail)
|
|
deduped[key] = finding
|
|
return list(deduped.values())
|
|
|
|
|
|
def _check_app_factory() -> List[Finding]:
|
|
findings: List[Finding] = []
|
|
sys.path.insert(0, str(BACKEND_ROOT))
|
|
try:
|
|
from app_factory import create_app
|
|
except Exception as exc:
|
|
return [
|
|
Finding(
|
|
severity="ERROR",
|
|
code="app-factory-import-failed",
|
|
path="backend/app_factory.py",
|
|
detail=f"{exc.__class__.__name__}: {exc}",
|
|
)
|
|
]
|
|
|
|
try:
|
|
app = create_app()
|
|
except Exception as exc:
|
|
return [
|
|
Finding(
|
|
severity="ERROR",
|
|
code="app-factory-create-failed",
|
|
path="backend/app_factory.py",
|
|
detail=f"{exc.__class__.__name__}: {exc}",
|
|
)
|
|
]
|
|
|
|
route_keys: set[tuple[str, str]] = set()
|
|
duplicates: set[tuple[str, str]] = set()
|
|
for route in getattr(app, "routes", []):
|
|
path = str(getattr(route, "path", "") or "").strip()
|
|
methods = sorted(getattr(route, "methods", []) or [])
|
|
for method in methods:
|
|
if method in {"HEAD", "OPTIONS"}:
|
|
continue
|
|
key = (method, path)
|
|
if key in route_keys:
|
|
duplicates.add(key)
|
|
else:
|
|
route_keys.add(key)
|
|
|
|
for method, path in sorted(duplicates):
|
|
findings.append(
|
|
Finding(
|
|
severity="ERROR",
|
|
code="duplicate-route",
|
|
path="backend/app_factory.py",
|
|
detail=f"duplicate route registered for {method} {path}",
|
|
)
|
|
)
|
|
return findings
|
|
|
|
|
|
def _source_path_for_pyc(path: pathlib.Path) -> pathlib.Path | None:
|
|
if path.name == "__init__.cpython-312.pyc":
|
|
return path.parent.parent / "__init__.py"
|
|
if path.parent.name != "__pycache__":
|
|
return None
|
|
stem = path.name.split(".cpython-", 1)[0]
|
|
return path.parent.parent / f"{stem}.py"
|
|
|
|
|
|
def _check_pyc_without_source(pyc_paths: Sequence[pathlib.Path]) -> List[Finding]:
|
|
findings: List[Finding] = []
|
|
for path in pyc_paths:
|
|
source = _source_path_for_pyc(path)
|
|
if source is None:
|
|
continue
|
|
if source.exists():
|
|
continue
|
|
findings.append(
|
|
Finding(
|
|
severity="ERROR",
|
|
code="pyc-without-source",
|
|
path=str(path.relative_to(PROJECT_ROOT)),
|
|
detail=f"compiled module has no source file at {source.relative_to(PROJECT_ROOT)}",
|
|
)
|
|
)
|
|
return findings
|
|
|
|
|
|
def _check_file_sizes(paths: Sequence[pathlib.Path]) -> List[Finding]:
|
|
findings: List[Finding] = []
|
|
for path in paths:
|
|
rel = path.relative_to(BACKEND_ROOT)
|
|
line_count = sum(1 for _ in path.open("r", encoding="utf-8"))
|
|
if rel.parts[:1] == ("api",) and line_count > ROUTER_MAX_LINES:
|
|
findings.append(
|
|
Finding(
|
|
severity="WARN",
|
|
code="router-too-large",
|
|
path=str(path.relative_to(PROJECT_ROOT)),
|
|
detail=f"{line_count} lines exceeds router limit {ROUTER_MAX_LINES}",
|
|
)
|
|
)
|
|
if rel.parts[:1] == ("services",) and line_count > SERVICE_MAX_LINES:
|
|
findings.append(
|
|
Finding(
|
|
severity="WARN",
|
|
code="service-too-large",
|
|
path=str(path.relative_to(PROJECT_ROOT)),
|
|
detail=f"{line_count} lines exceeds service limit {SERVICE_MAX_LINES}",
|
|
)
|
|
)
|
|
return findings
|
|
|
|
|
|
def _print_findings(findings: Sequence[Finding]) -> None:
|
|
if not findings:
|
|
print("PASS backend integrity audit")
|
|
return
|
|
for finding in findings:
|
|
print(f"[{finding.severity}] {finding.code} {finding.path} :: {finding.detail}")
|
|
|
|
|
|
def main() -> int:
|
|
py_files = list(_iter_backend_py_files())
|
|
pyc_files = list(_iter_backend_pyc_files())
|
|
findings: List[Finding] = []
|
|
findings.extend(_check_import_all(py_files))
|
|
findings.extend(_check_settings_imports(py_files))
|
|
findings.extend(_check_importability(py_files))
|
|
findings.extend(_check_runtime_global_refs(py_files))
|
|
findings.extend(_check_app_factory())
|
|
findings.extend(_check_pyc_without_source(pyc_files))
|
|
findings.extend(_check_file_sizes(py_files))
|
|
findings.sort(key=lambda item: (item.severity != "ERROR", item.code, item.path))
|
|
_print_findings(findings)
|
|
return 1 if any(item.severity == "ERROR" for item in findings) else 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|