cosmo/backend/scripts/migrate_interstellar_data.py

343 lines
12 KiB
Python
Raw Normal View History

2025-12-08 10:55:38 +00:00
#!/usr/bin/env python3
"""
迁移 static_data 中的 interstellar 数据到 star_systems celestial_bodies
包含自动中文名翻译功能
"""
import asyncio
import sys
from pathlib import Path
# 添加项目根目录到 Python 路径
sys.path.insert(0, str(Path(__file__).parent.parent))
from sqlalchemy import select, func, update
from sqlalchemy.dialects.postgresql import insert
from app.database import AsyncSessionLocal
from app.models.db.static_data import StaticData
from app.models.db.star_system import StarSystem
from app.models.db.celestial_body import CelestialBody
# 恒星名称中文翻译字典(常见恒星)
STAR_NAME_ZH = {
'Proxima Cen': '比邻星',
"Barnard's star": '巴纳德星',
'eps Eri': '天苑四',
'Lalande 21185': '莱兰21185',
'61 Cyg A': '天鹅座61 A',
'61 Cyg B': '天鹅座61 B',
'tau Cet': '天仓五',
'Kapteyn': '开普敦星',
'Lacaille 9352': '拉卡伊9352',
'Ross 128': '罗斯128',
'Wolf 359': '狼359',
'Sirius': '天狼星',
'Alpha Centauri': '南门二',
'TRAPPIST-1': 'TRAPPIST-1',
'Kepler-442': '开普勒-442',
'Kepler-452': '开普勒-452',
'Gliese 581': '格利泽581',
'Gliese 667C': '格利泽667C',
'HD 40307': 'HD 40307',
}
# 常见恒星系后缀翻译
SYSTEM_SUFFIX_ZH = {
'System': '系统',
'system': '系统',
}
def translate_star_name(english_name: str) -> str:
"""
翻译恒星名称为中文
优先使用字典否则保留英文名
"""
# 直接匹配
if english_name in STAR_NAME_ZH:
return STAR_NAME_ZH[english_name]
# 移除常见后缀尝试匹配
base_name = english_name.replace(' A', '').replace(' B', '').replace(' C', '').strip()
if base_name in STAR_NAME_ZH:
suffix = english_name.replace(base_name, '').strip()
return STAR_NAME_ZH[base_name] + suffix
# Kepler/TRAPPIST 等编号星
if english_name.startswith('Kepler-'):
return f'开普勒-{english_name.split("-")[1]}'
if english_name.startswith('TRAPPIST-'):
return f'TRAPPIST-{english_name.split("-")[1]}'
if english_name.startswith('Gliese '):
return f'格利泽{english_name.split(" ")[1]}'
if english_name.startswith('GJ '):
return f'GJ {english_name.split(" ")[1]}'
if english_name.startswith('HD '):
return f'HD {english_name.split(" ")[1]}'
if english_name.startswith('HIP '):
return f'HIP {english_name.split(" ")[1]}'
# 默认返回英文名
return english_name
def translate_system_name(english_name: str) -> str:
"""翻译恒星系名称"""
if ' System' in english_name:
star_name = english_name.replace(' System', '').strip()
star_name_zh = translate_star_name(star_name)
return f'{star_name_zh}系统'
return translate_star_name(english_name)
def translate_planet_name(english_name: str) -> str:
"""
翻译系外行星名称
格式恒星名 + 行星字母
"""
# 分离恒星名和行星字母
parts = english_name.rsplit(' ', 1)
if len(parts) == 2:
star_name, planet_letter = parts
star_name_zh = translate_star_name(star_name)
return f'{star_name_zh} {planet_letter}'
return english_name
async def deduplicate_planets(planets: list) -> list:
"""
去除重复的行星记录
保留字段最完整的记录
"""
if not planets:
return []
planet_map = {}
for planet in planets:
name = planet.get('name', '')
if not name:
continue
if name not in planet_map:
planet_map[name] = planet
else:
# 比较字段完整度
existing = planet_map[name]
existing_fields = sum(1 for v in existing.values() if v is not None and v != '')
current_fields = sum(1 for v in planet.values() if v is not None and v != '')
if current_fields > existing_fields:
planet_map[name] = planet
return list(planet_map.values())
async def migrate_star_systems():
"""迁移恒星系统数据"""
async with AsyncSessionLocal() as session:
print("=" * 60)
print("开始迁移系外恒星系数据...")
print("=" * 60)
# 读取所有 interstellar 数据
result = await session.execute(
select(StaticData)
.where(StaticData.category == 'interstellar')
.order_by(StaticData.name)
)
interstellar_data = result.scalars().all()
print(f"\n📊 共找到 {len(interstellar_data)} 个恒星系统")
migrated_systems = 0
migrated_planets = 0
skipped_systems = 0
for star_data in interstellar_data:
try:
data = star_data.data
star_name = star_data.name
# 翻译中文名
star_name_zh = translate_star_name(star_name)
system_name = f"{star_name} System"
system_name_zh = translate_system_name(system_name)
# 创建恒星系统记录
system = StarSystem(
name=system_name,
name_zh=system_name_zh,
host_star_name=star_name,
distance_pc=data.get('distance_pc'),
distance_ly=data.get('distance_ly'),
ra=data.get('ra'),
dec=data.get('dec'),
position_x=data.get('position', {}).get('x') if 'position' in data else None,
position_y=data.get('position', {}).get('y') if 'position' in data else None,
position_z=data.get('position', {}).get('z') if 'position' in data else None,
spectral_type=data.get('spectral_type'),
radius_solar=data.get('radius_solar'),
mass_solar=data.get('mass_solar'),
temperature_k=data.get('temperature_k'),
magnitude=data.get('magnitude'),
color=data.get('color', '#FFFFFF'),
planet_count=0, # 将在迁移行星后更新
description=f"距离地球 {data.get('distance_ly', 0):.2f} 光年的恒星系统。"
)
session.add(system)
await session.flush() # 获取 system.id
print(f"\n✅ 恒星系: {system_name} ({system_name_zh})")
print(f" 距离: {data.get('distance_pc', 0):.2f} pc (~{data.get('distance_ly', 0):.2f} ly)")
# 处理行星数据
planets = data.get('planets', [])
if planets:
# 去重
unique_planets = await deduplicate_planets(planets)
print(f" 行星: {len(planets)} 条记录 → {len(unique_planets)} 颗独立行星(去重 {len(planets) - len(unique_planets)} 条)")
# 迁移行星
for planet_data in unique_planets:
planet_name = planet_data.get('name', '')
if not planet_name:
continue
planet_name_zh = translate_planet_name(planet_name)
# 创建系外行星记录
planet = CelestialBody(
id=f"exo-{system.id}-{planet_name.replace(' ', '-')}", # 生成唯一ID
name=planet_name,
name_zh=planet_name_zh,
type='planet',
system_id=system.id,
description=f"{system_name_zh}的系外行星。",
extra_data={
'semi_major_axis_au': planet_data.get('semi_major_axis_au'),
'period_days': planet_data.get('period_days'),
'eccentricity': planet_data.get('eccentricity'),
'radius_earth': planet_data.get('radius_earth'),
'mass_earth': planet_data.get('mass_earth'),
'temperature_k': planet_data.get('temperature_k'),
}
)
session.add(planet)
migrated_planets += 1
print(f"{planet_name} ({planet_name_zh})")
# 更新恒星系的行星数量
system.planet_count = len(unique_planets)
migrated_systems += 1
# 每100个系统提交一次
if migrated_systems % 100 == 0:
await session.commit()
print(f"\n💾 已提交 {migrated_systems} 个恒星系统...")
except Exception as e:
print(f"\n❌ 错误:迁移 {star_name} 失败 - {str(e)[:200]}")
skipped_systems += 1
# 简单回滚,继续下一个
try:
await session.rollback()
except:
pass
continue
# 最终提交
await session.commit()
print("\n" + "=" * 60)
print("迁移完成!")
print("=" * 60)
print(f"✅ 成功迁移恒星系: {migrated_systems}")
print(f"✅ 成功迁移行星: {migrated_planets}")
print(f"⚠️ 跳过的恒星系: {skipped_systems}")
print(f"📊 平均每个恒星系: {migrated_planets / migrated_systems:.1f} 颗行星")
async def update_solar_system_count():
"""更新太阳系的天体数量"""
async with AsyncSessionLocal() as session:
result = await session.execute(
select(func.count(CelestialBody.id))
.where(CelestialBody.system_id == 1)
)
count = result.scalar()
await session.execute(
update(StarSystem)
.where(StarSystem.id == 1)
.values(planet_count=count - 1) # 减去太阳本身
)
await session.commit()
print(f"\n✅ 更新太阳系天体数量: {count} (不含太阳: {count - 1})")
async def verify_migration():
"""验证迁移结果"""
async with AsyncSessionLocal() as session:
print("\n" + "=" * 60)
print("验证迁移结果...")
print("=" * 60)
# 统计恒星系
result = await session.execute(select(func.count(StarSystem.id)))
system_count = result.scalar()
print(f"\n📊 恒星系统总数: {system_count}")
# 统计各系统的行星数量
result = await session.execute(
select(StarSystem.name, StarSystem.name_zh, StarSystem.planet_count)
.order_by(StarSystem.planet_count.desc())
.limit(10)
)
print("\n🏆 行星最多的恒星系前10:")
for name, name_zh, count in result:
print(f" {name} ({name_zh}): {count} 颗行星")
# 统计天体类型分布
result = await session.execute(
select(CelestialBody.type, CelestialBody.system_id, func.count(CelestialBody.id))
.group_by(CelestialBody.type, CelestialBody.system_id)
.order_by(CelestialBody.system_id, CelestialBody.type)
)
print("\n📈 天体类型分布:")
for type_, system_id, count in result:
system_name = "太阳系" if system_id == 1 else f"系外恒星系"
print(f" {system_name} - {type_}: {count}")
async def main():
"""主函数"""
print("\n" + "=" * 60)
print("Cosmo 系外恒星系数据迁移工具")
print("=" * 60)
try:
# 执行迁移
await migrate_star_systems()
# 更新太阳系统计
await update_solar_system_count()
# 验证结果
await verify_migration()
print("\n✅ 所有操作完成!")
except Exception as e:
print(f"\n❌ 迁移失败: {str(e)}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())