imetting/backend/scripts/run_sql_migration_connector.py

140 lines
3.6 KiB
Python
Raw Normal View History

2026-03-26 06:55:12 +00:00
#!/usr/bin/env python3
import argparse
import sys
from pathlib import Path
import mysql.connector
def parse_env_file(env_path: Path):
data = {}
for raw in env_path.read_text(encoding="utf-8").splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
data[k.strip()] = v.strip()
return data
def split_sql_statements(sql_text: str):
statements = []
buf = []
in_single = False
in_double = False
in_line_comment = False
in_block_comment = False
i = 0
while i < len(sql_text):
ch = sql_text[i]
nxt = sql_text[i + 1] if i + 1 < len(sql_text) else ""
if in_line_comment:
if ch == "\n":
in_line_comment = False
buf.append(ch)
i += 1
continue
if in_block_comment:
if ch == "*" and nxt == "/":
in_block_comment = False
i += 2
else:
i += 1
continue
if not in_single and not in_double:
if ch == "-" and nxt == "-":
in_line_comment = True
i += 2
continue
if ch == "#":
in_line_comment = True
i += 1
continue
if ch == "/" and nxt == "*":
in_block_comment = True
i += 2
continue
if ch == "'" and not in_double:
in_single = not in_single
buf.append(ch)
i += 1
continue
if ch == '"' and not in_single:
in_double = not in_double
buf.append(ch)
i += 1
continue
if ch == ";" and not in_single and not in_double:
stmt = "".join(buf).strip()
if stmt:
statements.append(stmt)
buf = []
i += 1
continue
buf.append(ch)
i += 1
tail = "".join(buf).strip()
if tail:
statements.append(tail)
return statements
def main():
parser = argparse.ArgumentParser(description="Run SQL migration using mysql-connector")
parser.add_argument("--env", default="backend/.env", help="Path to .env file")
parser.add_argument("--sql", required=True, help="Path to SQL file")
args = parser.parse_args()
env = parse_env_file(Path(args.env))
sql_path = Path(args.sql)
if not sql_path.exists():
print(f"[ERROR] SQL file not found: {sql_path}")
return 1
sql_text = sql_path.read_text(encoding="utf-8")
statements = split_sql_statements(sql_text)
if not statements:
print("[ERROR] No SQL statements found")
return 1
conn = mysql.connector.connect(
host=env.get("DB_HOST", "127.0.0.1"),
port=int(env.get("DB_PORT", "3306")),
user=env.get("DB_USER", "root"),
password=env.get("DB_PASSWORD", ""),
database=env.get("DB_NAME", ""),
)
try:
cur = conn.cursor(dictionary=True)
print(f"[INFO] Running {len(statements)} statements from {sql_path}")
for idx, stmt in enumerate(statements, start=1):
head = " ".join(stmt.split())[:120]
cur.execute(stmt)
if cur.with_rows:
cur.fetchall()
print(f"[OK] {idx:03d}: {head}")
conn.commit()
print("[INFO] Migration committed successfully")
except Exception as e:
conn.rollback()
print(f"[FAIL] {e}")
return 2
finally:
conn.close()
return 0
if __name__ == "__main__":
sys.exit(main())