dashboard-nanobot/backend/core/database.py

264 lines
8.8 KiB
Python

from sqlalchemy import inspect, text
from sqlmodel import SQLModel, Session, create_engine
from core.settings import (
DATABASE_ECHO,
DATABASE_MAX_OVERFLOW,
DATABASE_POOL_RECYCLE,
DATABASE_POOL_SIZE,
DATABASE_POOL_TIMEOUT,
DATABASE_URL,
)
# Ensure table models are registered in SQLModel metadata before create_all.
from models import bot as _bot_models # noqa: F401
from models import platform as _platform_models # noqa: F401
from models import skill as _skill_models # noqa: F401
from models import topic as _topic_models # noqa: F401
_engine_kwargs = {
"echo": DATABASE_ECHO,
"pool_pre_ping": True,
"pool_size": DATABASE_POOL_SIZE,
"max_overflow": DATABASE_MAX_OVERFLOW,
"pool_timeout": DATABASE_POOL_TIMEOUT,
"pool_recycle": DATABASE_POOL_RECYCLE,
}
engine = create_engine(DATABASE_URL, **_engine_kwargs)
BOT_INSTANCE_TABLE = "bot_instance"
BOT_MESSAGE_TABLE = "bot_message"
BOT_IMAGE_TABLE = "bot_image"
BOT_REQUEST_USAGE_TABLE = "bot_request_usage"
BOT_ACTIVITY_EVENT_TABLE = "bot_activity_event"
SYS_SETTING_TABLE = "sys_setting"
POSTGRES_MIGRATION_LOCK_KEY = 2026031801
def _quote_ident(name: str) -> str:
return f'"{str(name).replace(chr(34), chr(34) * 2)}"'
def _acquire_migration_lock():
if engine.dialect.name == "postgresql":
conn = engine.connect()
conn.execute(text("SELECT pg_advisory_lock(:key)"), {"key": POSTGRES_MIGRATION_LOCK_KEY})
return conn
return None
def _release_migration_lock(lock_conn) -> None:
if lock_conn is None:
return
try:
if engine.dialect.name == "postgresql":
lock_conn.execute(text("SELECT pg_advisory_unlock(:key)"), {"key": POSTGRES_MIGRATION_LOCK_KEY})
finally:
lock_conn.close()
def _ensure_botinstance_columns() -> None:
required_columns = {
"current_state": "TEXT DEFAULT 'IDLE'",
"last_action": "TEXT",
"image_tag": "TEXT DEFAULT 'nanobot-base:v0.1.4'",
"access_password": "TEXT DEFAULT ''",
"enabled": "BOOLEAN NOT NULL DEFAULT TRUE",
}
inspector = inspect(engine)
if not inspector.has_table(BOT_INSTANCE_TABLE):
return
with engine.connect() as conn:
existing = {
str(row.get("name"))
for row in inspect(conn).get_columns(BOT_INSTANCE_TABLE)
if row.get("name")
}
for col, ddl in required_columns.items():
if col in existing:
continue
conn.execute(text(f"ALTER TABLE {BOT_INSTANCE_TABLE} ADD COLUMN {col} {ddl}"))
if "enabled" in existing:
conn.execute(text(f"UPDATE {BOT_INSTANCE_TABLE} SET enabled = TRUE WHERE enabled IS NULL"))
conn.commit()
def _ensure_sys_setting_columns() -> None:
required_columns = {
"name": "TEXT NOT NULL DEFAULT ''",
"category": "TEXT NOT NULL DEFAULT 'general'",
"description": "TEXT NOT NULL DEFAULT ''",
"value_type": "TEXT NOT NULL DEFAULT 'json'",
"is_public": "BOOLEAN NOT NULL DEFAULT FALSE",
"sort_order": "INTEGER NOT NULL DEFAULT 100",
}
inspector = inspect(engine)
if not inspector.has_table(SYS_SETTING_TABLE):
return
with engine.connect() as conn:
existing = {
str(row.get("name"))
for row in inspect(conn).get_columns(SYS_SETTING_TABLE)
if row.get("name")
}
for col, ddl in required_columns.items():
if col in existing:
continue
conn.execute(text(f"ALTER TABLE {SYS_SETTING_TABLE} ADD COLUMN {col} {ddl}"))
conn.commit()
def _ensure_bot_request_usage_columns() -> None:
required_columns = {
"message_id": "INTEGER",
"provider": "TEXT",
"model": "TEXT",
}
inspector = inspect(engine)
if not inspector.has_table(BOT_REQUEST_USAGE_TABLE):
return
with engine.connect() as conn:
existing = {
str(row.get("name"))
for row in inspect(conn).get_columns(BOT_REQUEST_USAGE_TABLE)
if row.get("name")
}
for col, ddl in required_columns.items():
if col in existing:
continue
conn.execute(text(f"ALTER TABLE {BOT_REQUEST_USAGE_TABLE} ADD COLUMN {col} {ddl}"))
conn.commit()
def _ensure_topic_columns() -> None:
required_columns = {
"topic_topic": {
"name": "TEXT NOT NULL DEFAULT ''",
"description": "TEXT NOT NULL DEFAULT ''",
"is_active": "BOOLEAN NOT NULL DEFAULT TRUE",
"is_default_fallback": "BOOLEAN NOT NULL DEFAULT FALSE",
"routing_json": "TEXT NOT NULL DEFAULT '{}'",
"view_schema_json": "TEXT NOT NULL DEFAULT '{}'",
"created_at": "TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP",
"updated_at": "TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP",
},
"topic_item": {
"title": "TEXT NOT NULL DEFAULT ''",
"level": "TEXT NOT NULL DEFAULT 'info'",
"tags_json": "TEXT",
"view_json": "TEXT",
"source": "TEXT NOT NULL DEFAULT 'mcp'",
"dedupe_key": "TEXT",
"is_read": "BOOLEAN NOT NULL DEFAULT FALSE",
"created_at": "TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP",
},
}
inspector = inspect(engine)
with engine.connect() as conn:
for table_name, cols in required_columns.items():
if not inspector.has_table(table_name):
continue
existing = {
str(row.get("name"))
for row in inspector.get_columns(table_name)
if row.get("name")
}
for col, ddl in cols.items():
if col in existing:
continue
conn.execute(text(f"ALTER TABLE {table_name} ADD COLUMN {col} {ddl}"))
conn.commit()
def _ensure_platform_indexes() -> None:
inspector = inspect(engine)
with engine.connect() as conn:
if inspector.has_table(BOT_ACTIVITY_EVENT_TABLE):
try:
conn.execute(
text(
f"""
CREATE INDEX IF NOT EXISTS idx_bot_activity_event_bot_id_request_present
ON {BOT_ACTIVITY_EVENT_TABLE} (bot_id)
WHERE request_id IS NOT NULL AND request_id <> ''
"""
)
)
except Exception:
# Fall back silently when the current database dialect does not support partial indexes.
conn.execute(
text(
f"""
CREATE INDEX IF NOT EXISTS idx_bot_activity_event_bot_id
ON {BOT_ACTIVITY_EVENT_TABLE} (bot_id)
"""
)
)
if inspector.has_table(BOT_REQUEST_USAGE_TABLE):
conn.execute(
text(
f"""
CREATE INDEX IF NOT EXISTS idx_bot_request_usage_started_at_bot_id
ON {BOT_REQUEST_USAGE_TABLE} (started_at, bot_id)
"""
)
)
conn.commit()
def align_postgres_sequences() -> None:
if engine.dialect.name != "postgresql":
return
sequence_targets = [
(BOT_MESSAGE_TABLE, "id"),
(BOT_REQUEST_USAGE_TABLE, "id"),
(BOT_ACTIVITY_EVENT_TABLE, "id"),
("skill_market_item", "id"),
("bot_skill_install", "id"),
]
with engine.connect() as conn:
for table_name, column_name in sequence_targets:
seq_name = conn.execute(
text("SELECT pg_get_serial_sequence(:table_name, :column_name)"),
{"table_name": table_name, "column_name": column_name},
).scalar()
if not seq_name:
continue
max_id = conn.execute(
text(f'SELECT COALESCE(MAX("{column_name}"), 0) FROM "{table_name}"')
).scalar()
max_id = int(max_id or 0)
conn.execute(
text("SELECT setval(:seq_name, :next_value, :is_called)"),
{
"seq_name": seq_name,
"next_value": max_id if max_id > 0 else 1,
"is_called": max_id > 0,
},
)
conn.commit()
def init_database() -> None:
lock_conn = _acquire_migration_lock()
try:
SQLModel.metadata.create_all(engine)
_ensure_sys_setting_columns()
_ensure_bot_request_usage_columns()
_ensure_botinstance_columns()
_ensure_topic_columns()
_ensure_platform_indexes()
align_postgres_sequences()
finally:
_release_migration_lock(lock_conn)
def get_session():
with Session(engine) as session:
yield session