cosmo/backend/app/jobs/predefined.py

633 lines
24 KiB
Python
Raw Permalink Normal View History

2025-12-11 08:31:26 +00:00
"""
Predefined Scheduled Tasks
All registered tasks for scheduled execution
"""
import logging
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
2025-12-26 01:21:15 +00:00
from sqlalchemy import select, func
2025-12-11 08:31:26 +00:00
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.dialects.postgresql import insert
from app.jobs.registry import task_registry
from app.models.db.celestial_body import CelestialBody
from app.models.db.position import Position
2025-12-26 01:21:15 +00:00
from app.models.db.celestial_event import CelestialEvent
2025-12-11 08:31:26 +00:00
from app.services.horizons import HorizonsService
2025-12-26 01:21:15 +00:00
from app.services.nasa_sbdb_service import nasa_sbdb_service
from app.services.event_service import event_service
from app.services.planetary_events_service import planetary_events_service
2025-12-11 08:31:26 +00:00
logger = logging.getLogger(__name__)
@task_registry.register(
name="sync_solar_system_positions",
description="同步太阳系天体位置数据从NASA Horizons API获取指定天体的位置数据并保存到数据库",
category="data_sync",
parameters=[
{
"name": "body_ids",
"type": "array",
"description": "要同步的天体ID列表例如['10', '199', '299']。如果不指定,则同步所有活跃的太阳系天体",
"required": False,
"default": None
},
{
"name": "days",
"type": "integer",
"description": "同步天数,从今天开始向未来延伸的天数",
"required": False,
"default": 7
},
{
"name": "source",
"type": "string",
"description": "数据源标记,用于标识数据来源",
"required": False,
"default": "nasa_horizons_cron"
}
]
)
async def sync_solar_system_positions(
db: AsyncSession,
logger: logging.Logger,
params: Dict[str, Any]
) -> Dict[str, Any]:
"""
Sync solar system body positions from NASA Horizons
Args:
db: Database session
logger: Logger instance
params: Task parameters
- body_ids: List of body IDs to sync (optional, defaults to all active)
- days: Number of days to sync (default: 7)
- source: Source tag for the data (default: "nasa_horizons_cron")
Returns:
Summary of sync operation
"""
2025-12-26 01:21:15 +00:00
# Parse parameters with type conversion (params come from JSON, may be strings)
2025-12-11 08:31:26 +00:00
body_ids = params.get("body_ids")
2025-12-26 01:21:15 +00:00
days = int(params.get("days", 7))
source = str(params.get("source", "nasa_horizons_cron"))
2025-12-11 08:31:26 +00:00
logger.info(f"Starting solar system position sync: days={days}, source={source}")
# Get list of bodies to sync
if body_ids:
# Use specified body IDs
result = await db.execute(
select(CelestialBody).where(
CelestialBody.id.in_(body_ids),
CelestialBody.is_active == True
)
)
bodies = result.scalars().all()
logger.info(f"Syncing {len(bodies)} specified bodies")
else:
# Get all active solar system bodies
# Typically solar system bodies include planets, dwarf planets, major satellites, comets, and probes
2025-12-11 08:31:26 +00:00
result = await db.execute(
select(CelestialBody).where(
CelestialBody.is_active == True,
CelestialBody.system_id == 1,
CelestialBody.type.in_([
'planet', 'dwarf_planet', 'satellite',
'comet', 'probe', 'asteroid','star'
])
2025-12-11 08:31:26 +00:00
)
)
bodies = result.scalars().all()
logger.info(f"Syncing all {len(bodies)} active solar system bodies")
if not bodies:
logger.warning("No bodies found to sync")
return {
"success": True,
"bodies_synced": 0,
"total_positions": 0,
"message": "No bodies found"
}
# Initialize services
horizons = HorizonsService()
# Sync positions for each body
total_positions = 0
synced_bodies = []
failed_bodies = []
start_time = datetime.utcnow()
end_time = start_time + timedelta(days=days)
for body in bodies:
# Use savepoint for this body's operations
async with db.begin_nested(): # Creates a SAVEPOINT
try:
logger.debug(f"Fetching positions for {body.name} ({body.id})")
# Fetch positions from NASA Horizons
positions = await horizons.get_body_positions(
body_id=body.id,
start_time=start_time,
end_time=end_time,
step="1d" # Daily positions
)
# Save positions to database (upsert logic)
count = 0
for pos in positions:
# Use PostgreSQL's INSERT ... ON CONFLICT to handle duplicates
stmt = insert(Position).values(
body_id=body.id,
time=pos.time,
x=pos.x,
y=pos.y,
z=pos.z,
vx=getattr(pos, 'vx', None),
vy=getattr(pos, 'vy', None),
vz=getattr(pos, 'vz', None),
source=source
)
# On conflict (body_id, time), update the existing record
stmt = stmt.on_conflict_do_update(
index_elements=['body_id', 'time'],
set_={
'x': pos.x,
'y': pos.y,
'z': pos.z,
'vx': getattr(pos, 'vx', None),
'vy': getattr(pos, 'vy', None),
'vz': getattr(pos, 'vz', None),
'source': source
}
)
await db.execute(stmt)
count += 1
# Savepoint will auto-commit if no exception
total_positions += count
synced_bodies.append(body.name)
logger.debug(f"Saved {count} positions for {body.name}")
except Exception as e:
# Savepoint will auto-rollback on exception
logger.error(f"Failed to sync {body.name}: {str(e)}")
failed_bodies.append({"body": body.name, "error": str(e)})
# Continue to next body
# Summary
result = {
"success": len(failed_bodies) == 0,
"bodies_synced": len(synced_bodies),
"total_positions": total_positions,
"synced_bodies": synced_bodies,
"failed_bodies": failed_bodies,
"time_range": f"{start_time.date()} to {end_time.date()}",
"source": source
}
logger.info(f"Sync completed: {len(synced_bodies)} bodies, {total_positions} positions")
return result
@task_registry.register(
2025-12-26 01:21:15 +00:00
name="fetch_close_approach_events",
description="从NASA SBDB获取小行星/彗星近距离飞掠事件,并保存到数据库",
2025-12-11 08:31:26 +00:00
category="data_sync",
parameters=[
{
2025-12-26 01:21:15 +00:00
"name": "body_ids",
2025-12-11 08:31:26 +00:00
"type": "array",
2025-12-26 01:21:15 +00:00
"description": "要查询的天体ID列表例如['399', '499']表示地球和火星。如果不指定,默认只查询地球(399)",
2025-12-11 08:31:26 +00:00
"required": False,
"default": None
},
{
"name": "days_ahead",
"type": "integer",
2025-12-26 01:21:15 +00:00
"description": "向未来查询的天数例如30表示查询未来30天内的事件",
2025-12-11 08:31:26 +00:00
"required": False,
"default": 30
2025-12-26 01:21:15 +00:00
},
{
"name": "dist_max",
"type": "string",
"description": "最大距离AU例如'30'表示30天文单位内的飞掠",
"required": False,
"default": "30"
},
{
"name": "limit",
"type": "integer",
"description": "每个天体最大返回事件数量",
"required": False,
"default": 100
},
{
"name": "clean_old_events",
"type": "boolean",
"description": "是否清理已过期的旧事件",
"required": False,
"default": True
2025-12-11 08:31:26 +00:00
}
]
)
2025-12-26 01:21:15 +00:00
async def fetch_close_approach_events(
2025-12-11 08:31:26 +00:00
db: AsyncSession,
logger: logging.Logger,
params: Dict[str, Any]
) -> Dict[str, Any]:
"""
2025-12-26 01:21:15 +00:00
Fetch close approach events from NASA SBDB and save to database
2025-12-11 08:31:26 +00:00
2025-12-26 01:21:15 +00:00
This task queries the NASA Small-Body Database (SBDB) for upcoming
close approach events (asteroid/comet flybys) and stores them in
the celestial_events table.
Note: Uses tomorrow's date as the query start date to avoid fetching
events that have already occurred today.
2025-12-11 08:31:26 +00:00
Args:
db: Database session
logger: Logger instance
params: Task parameters
2025-12-26 01:21:15 +00:00
- body_ids: List of body IDs to query (default: ['399'] for Earth)
- days_ahead: Number of days to query ahead from tomorrow (default: 30)
- dist_max: Maximum approach distance in AU (default: '30')
- limit: Maximum number of events per body (default: 100)
- clean_old_events: Clean old events before inserting (default: True)
2025-12-11 08:31:26 +00:00
Returns:
2025-12-26 01:21:15 +00:00
Summary of fetch operation
2025-12-11 08:31:26 +00:00
"""
2025-12-26 01:21:15 +00:00
# Parse parameters with type conversion (params come from JSON, may be strings)
body_ids = params.get("body_ids") or ["399"] # Default to Earth
days_ahead = int(params.get("days_ahead", 30))
dist_max = str(params.get("dist_max", "30")) # Keep as string for API
limit = int(params.get("limit", 100))
clean_old_events = bool(params.get("clean_old_events", True))
logger.info(f"Fetching close approach events: body_ids={body_ids}, days={days_ahead}, dist_max={dist_max}AU")
# Calculate date range - use tomorrow as start date to avoid past events
tomorrow = datetime.utcnow() + timedelta(days=1)
date_min = tomorrow.strftime("%Y-%m-%d")
date_max = (tomorrow + timedelta(days=days_ahead)).strftime("%Y-%m-%d")
# Statistics
total_events_fetched = 0
total_events_saved = 0
total_events_failed = 0
body_results = []
# Process each body
for body_id in body_ids:
try:
# Query celestial_bodies table to find the target body
body_result = await db.execute(
select(CelestialBody).where(CelestialBody.id == body_id)
)
target_body = body_result.scalar_one_or_none()
if not target_body:
logger.warning(f"Body '{body_id}' not found in celestial_bodies table, skipping")
body_results.append({
"body_id": body_id,
"success": False,
"error": "Body not found in database"
})
continue
target_body_id = target_body.id
approach_body_name = target_body.name
# Use short_name from database if available (for NASA SBDB API)
# NASA SBDB API uses abbreviated names for planets (e.g., Juptr for Jupiter)
api_body_name = target_body.short_name if target_body.short_name else approach_body_name
logger.info(f"Processing events for: {target_body.name} (ID: {target_body_id}, API name: {api_body_name})")
# Clean old events if requested
if clean_old_events:
try:
cutoff_date = datetime.utcnow()
deleted_count = await event_service.delete_events_for_body_before(
body_id=target_body_id,
before_time=cutoff_date,
db=db
)
logger.info(f"Cleaned {deleted_count} old events for {target_body.name}")
except Exception as e:
logger.warning(f"Failed to clean old events for {target_body.name}: {e}")
# Fetch events from NASA SBDB
sbdb_events = await nasa_sbdb_service.get_close_approaches(
date_min=date_min,
date_max=date_max,
dist_max=dist_max,
body=api_body_name, # Use mapped API name
limit=limit,
fullname=True
)
logger.info(f"Retrieved {len(sbdb_events)} events from NASA SBDB for {target_body.name}")
total_events_fetched += len(sbdb_events)
if not sbdb_events:
body_results.append({
"body_id": target_body_id,
"body_name": target_body.name,
"events_saved": 0,
"message": "No events found"
})
continue
# Parse and save events
saved_count = 0
failed_count = 0
for sbdb_event in sbdb_events:
try:
# Parse SBDB event to CelestialEvent format
parsed_event = nasa_sbdb_service.parse_event_to_celestial_event(
sbdb_event,
approach_body=approach_body_name
)
if not parsed_event:
logger.warning(f"Failed to parse SBDB event: {sbdb_event.get('des', 'Unknown')}")
failed_count += 1
continue
# Create event data
event_data = {
"body_id": target_body_id,
"title": parsed_event["title"],
"event_type": parsed_event["event_type"],
"event_time": parsed_event["event_time"],
"description": parsed_event["description"],
"details": parsed_event["details"],
"source": parsed_event["source"]
}
event = CelestialEvent(**event_data)
db.add(event)
await db.flush()
saved_count += 1
logger.debug(f"Saved event: {event.title}")
except Exception as e:
logger.error(f"Failed to save event {sbdb_event.get('des', 'Unknown')}: {e}")
failed_count += 1
# Commit events for this body
await db.commit()
total_events_saved += saved_count
total_events_failed += failed_count
body_results.append({
"body_id": target_body_id,
"body_name": target_body.name,
"events_fetched": len(sbdb_events),
"events_saved": saved_count,
"events_failed": failed_count
})
logger.info(f"Saved {saved_count}/{len(sbdb_events)} events for {target_body.name}")
except Exception as e:
logger.error(f"Error processing body {body_id}: {e}")
body_results.append({
"body_id": body_id,
"success": False,
"error": str(e)
})
# Summary
result = {
"success": True,
"total_bodies_processed": len(body_ids),
"total_events_fetched": total_events_fetched,
"total_events_saved": total_events_saved,
"total_events_failed": total_events_failed,
"date_range": f"{date_min} to {date_max}",
"dist_max_au": dist_max,
"body_results": body_results
2025-12-11 08:31:26 +00:00
}
2025-12-26 01:21:15 +00:00
logger.info(f"Task completed: {total_events_saved} events saved for {len(body_ids)} bodies")
return result
@task_registry.register(
name="calculate_planetary_events",
description="计算太阳系主要天体的合、冲等事件使用Skyfield进行天文计算",
category="data_sync",
parameters=[
{
"name": "body_ids",
"type": "array",
"description": "要计算事件的天体ID列表例如['199', '299', '499']。如果不指定,则计算所有主要行星(水星到海王星)",
"required": False,
"default": None
},
{
"name": "days_ahead",
"type": "integer",
"description": "向未来计算的天数",
"required": False,
"default": 365
},
{
"name": "calculate_close_approaches",
"type": "boolean",
"description": "是否同时计算行星之间的近距离接近事件",
"required": False,
"default": False
},
{
"name": "threshold_degrees",
"type": "number",
"description": "近距离接近的角度阈值仅当calculate_close_approaches为true时有效",
"required": False,
"default": 5.0
},
{
"name": "clean_old_events",
"type": "boolean",
"description": "是否清理已过期的旧事件",
"required": False,
"default": True
}
]
)
async def calculate_planetary_events(
db: AsyncSession,
logger: logging.Logger,
params: Dict[str, Any]
) -> Dict[str, Any]:
"""
Calculate planetary events (conjunctions, oppositions) using Skyfield
This task uses the Skyfield library to calculate astronomical events
for major solar system bodies, including conjunctions () and oppositions ().
Args:
db: Database session
logger: Logger instance
params: Task parameters
- body_ids: List of body IDs to calculate (default: all major planets)
- days_ahead: Number of days to calculate ahead (default: 365)
- calculate_close_approaches: Also calculate planet-planet close approaches (default: False)
- threshold_degrees: Angle threshold for close approaches (default: 5.0)
- clean_old_events: Clean old events before calculating (default: True)
Returns:
Summary of calculation operation
"""
# Parse parameters with type conversion (params come from JSON, may be strings)
body_ids = params.get("body_ids")
days_ahead = int(params.get("days_ahead", 365))
calculate_close_approaches = bool(params.get("calculate_close_approaches", False))
threshold_degrees = float(params.get("threshold_degrees", 5.0))
clean_old_events = bool(params.get("clean_old_events", True))
logger.info(f"Starting planetary event calculation: days_ahead={days_ahead}, close_approaches={calculate_close_approaches}")
# Statistics
total_events_calculated = 0
total_events_saved = 0
total_events_failed = 0
try:
# Calculate oppositions and conjunctions
logger.info("Calculating oppositions and conjunctions...")
events = planetary_events_service.calculate_oppositions_conjunctions(
body_ids=body_ids,
days_ahead=days_ahead
)
logger.info(f"Calculated {len(events)} opposition/conjunction events")
total_events_calculated += len(events)
# Optionally calculate close approaches between planet pairs
if calculate_close_approaches:
logger.info("Calculating planetary close approaches...")
# Define interesting planet pairs
planet_pairs = [
('199', '299'), # Mercury - Venus
('299', '499'), # Venus - Mars
('499', '599'), # Mars - Jupiter
('599', '699'), # Jupiter - Saturn
]
close_approach_events = planetary_events_service.calculate_planetary_distances(
body_pairs=planet_pairs,
days_ahead=days_ahead,
threshold_degrees=threshold_degrees
)
logger.info(f"Calculated {len(close_approach_events)} close approach events")
events.extend(close_approach_events)
total_events_calculated += len(close_approach_events)
# Save events to database
logger.info(f"Saving {len(events)} events to database...")
for event_data in events:
try:
# Check if body exists in database
body_result = await db.execute(
select(CelestialBody).where(CelestialBody.id == event_data['body_id'])
)
body = body_result.scalar_one_or_none()
if not body:
logger.warning(f"Body {event_data['body_id']} not found in database, skipping event")
total_events_failed += 1
continue
# Clean old events for this body if requested (only once per body)
if clean_old_events:
cutoff_date = datetime.utcnow()
deleted_count = await event_service.delete_events_for_body_before(
body_id=event_data['body_id'],
before_time=cutoff_date,
db=db
)
if deleted_count > 0:
logger.debug(f"Cleaned {deleted_count} old events for {body.name}")
# Only clean once per body
clean_old_events = False
# Check if event already exists (to avoid duplicates)
# Truncate event_time to minute precision for comparison
event_time_minute = event_data['event_time'].replace(second=0, microsecond=0)
existing_event = await db.execute(
select(CelestialEvent).where(
CelestialEvent.body_id == event_data['body_id'],
CelestialEvent.event_type == event_data['event_type'],
func.date_trunc('minute', CelestialEvent.event_time) == event_time_minute
)
)
existing = existing_event.scalar_one_or_none()
if existing:
logger.debug(f"Event already exists, skipping: {event_data['title']}")
continue
# Create and save event
event = CelestialEvent(
body_id=event_data['body_id'],
title=event_data['title'],
event_type=event_data['event_type'],
event_time=event_data['event_time'],
description=event_data['description'],
details=event_data['details'],
source=event_data['source']
)
db.add(event)
await db.flush()
total_events_saved += 1
logger.debug(f"Saved event: {event.title}")
except Exception as e:
logger.error(f"Failed to save event {event_data.get('title', 'Unknown')}: {e}")
total_events_failed += 1
# Commit all events
await db.commit()
result = {
"success": True,
"total_events_calculated": total_events_calculated,
"total_events_saved": total_events_saved,
"total_events_failed": total_events_failed,
"calculation_period_days": days_ahead,
"close_approaches_enabled": calculate_close_approaches,
}
logger.info(f"Task completed: {total_events_saved} events saved, {total_events_failed} failed")
return result
except Exception as e:
logger.error(f"Error in planetary event calculation: {e}")
await db.rollback()
return {
"success": False,
"error": str(e),
"total_events_calculated": total_events_calculated,
"total_events_saved": total_events_saved,
"total_events_failed": total_events_failed
}