#!/usr/bin/env python3 import argparse import re import sys from pathlib import Path import pymysql def parse_env_file(env_path: Path): data = {} for raw in env_path.read_text(encoding='utf-8').splitlines(): line = raw.strip() if not line or line.startswith('#'): continue if '=' not in line: continue k, v = line.split('=', 1) data[k.strip()] = v.strip() return data def split_sql_statements(sql_text: str): statements = [] buf = [] in_single = False in_double = False in_line_comment = False in_block_comment = False i = 0 while i < len(sql_text): ch = sql_text[i] nxt = sql_text[i + 1] if i + 1 < len(sql_text) else '' if in_line_comment: if ch == '\n': in_line_comment = False buf.append(ch) i += 1 continue if in_block_comment: if ch == '*' and nxt == '/': in_block_comment = False i += 2 else: i += 1 continue if not in_single and not in_double: if ch == '-' and nxt == '-': in_line_comment = True i += 2 continue if ch == '#': in_line_comment = True i += 1 continue if ch == '/' and nxt == '*': in_block_comment = True i += 2 continue if ch == "'" and not in_double: in_single = not in_single buf.append(ch) i += 1 continue if ch == '"' and not in_single: in_double = not in_double buf.append(ch) i += 1 continue if ch == ';' and not in_single and not in_double: stmt = ''.join(buf).strip() if stmt: statements.append(stmt) buf = [] i += 1 continue buf.append(ch) i += 1 tail = ''.join(buf).strip() if tail: statements.append(tail) return statements def main(): parser = argparse.ArgumentParser(description='Run SQL migration from file') parser.add_argument('--env', default='backend/.env', help='Path to .env file') parser.add_argument('--sql', required=True, help='Path to SQL file') args = parser.parse_args() env = parse_env_file(Path(args.env)) sql_path = Path(args.sql) if not sql_path.exists(): print(f'[ERROR] SQL file not found: {sql_path}') return 1 sql_text = sql_path.read_text(encoding='utf-8') statements = split_sql_statements(sql_text) if not statements: print('[ERROR] No SQL statements found') return 1 conn = pymysql.connect( host=env.get('DB_HOST', '127.0.0.1'), port=int(env.get('DB_PORT', '3306')), user=env.get('DB_USER', 'root'), password=env.get('DB_PASSWORD', ''), database=env.get('DB_NAME', ''), charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor, autocommit=False, ) # duplicate column/index tolerated for idempotency rerun tolerated_errnos = {1060, 1061, 1831} try: with conn.cursor() as cur: print(f'[INFO] Running {len(statements)} statements from {sql_path}') for idx, stmt in enumerate(statements, start=1): normalized = re.sub(r'\s+', ' ', stmt).strip() head = normalized[:120] try: cur.execute(stmt) print(f'[OK] {idx:03d}: {head}') except pymysql.MySQLError as e: if e.args and e.args[0] in tolerated_errnos: print(f'[SKIP] {idx:03d}: errno={e.args[0]} {e.args[1]} | {head}') continue conn.rollback() print(f'[FAIL] {idx:03d}: errno={e.args[0] if e.args else "?"} {e}') print(f'[STMT] {head}') return 2 conn.commit() print('[INFO] Migration committed successfully') checks = [ "SELECT COUNT(*) AS cnt FROM menus", "SELECT COUNT(*) AS cnt FROM role_menu_permissions", "SELECT COUNT(*) AS cnt FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME='menus' AND COLUMN_NAME IN ('menu_level','tree_path','is_visible')", "SELECT COUNT(*) AS cnt FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME='role_menu_permissions' AND COLUMN_NAME IN ('granted_by','granted_at')", ] for q in checks: cur.execute(q) row = cur.fetchone() print(f'[CHECK] {q} => {row}') finally: conn.close() return 0 if __name__ == '__main__': sys.exit(main())