cosmo/backend/app/services/nasa_worker.py

344 lines
14 KiB
Python
Raw Permalink Normal View History

2025-12-26 01:21:15 +00:00
"""
Worker functions for background tasks
"""
import logging
import asyncio
2025-12-26 01:21:15 +00:00
import httpx
from datetime import datetime, timedelta
from sqlalchemy.ext.asyncio import AsyncSession
2025-12-11 08:31:26 +00:00
from typing import List, Optional
from app.database import AsyncSessionLocal
from app.services.task_service import task_service
from app.services.db_service import celestial_body_service, position_service
from app.services.horizons import horizons_service
2025-12-11 08:31:26 +00:00
from app.services.orbit_service import orbit_service
2025-12-26 01:21:15 +00:00
from app.services.event_service import event_service
from app.models.schemas.social import CelestialEventCreate
logger = logging.getLogger(__name__)
async def download_positions_task(task_id: int, body_ids: List[str], dates: List[str]):
"""
Background task worker for downloading NASA positions
"""
logger.info(f"Task {task_id}: Starting download for {len(body_ids)} bodies and {len(dates)} dates")
async with AsyncSessionLocal() as db:
try:
# Mark as running
2025-12-26 01:21:15 +00:00
await task_service.update_task(db, task_id, progress=0, status="running")
total_operations = len(body_ids) * len(dates)
current_op = 0
success_count = 0
failed_count = 0
results = []
for body_id in body_ids:
# Check body
body = await celestial_body_service.get_body_by_id(body_id, db)
if not body:
results.append({"body_id": body_id, "error": "Body not found"})
failed_count += len(dates)
current_op += len(dates)
continue
body_result = {
"body_id": body_id,
"body_name": body.name,
"dates": []
}
for date_str in dates:
try:
target_date = datetime.strptime(date_str, "%Y-%m-%d")
# Check existing
existing = await position_service.get_positions(
body_id=body_id,
start_time=target_date,
end_time=target_date.replace(hour=23, minute=59, second=59),
session=db
)
if existing and len(existing) > 0:
body_result["dates"].append({"date": date_str, "status": "skipped"})
success_count += 1
else:
# Download
positions = await horizons_service.get_body_positions(
body_id=body_id,
start_time=target_date,
end_time=target_date,
step="1d"
)
if positions and len(positions) > 0:
pos_data = [{
"time": target_date,
"x": positions[0].x,
"y": positions[0].y,
"z": positions[0].z,
"vx": getattr(positions[0], 'vx', None),
"vy": getattr(positions[0], 'vy', None),
"vz": getattr(positions[0], 'vz', None),
}]
await position_service.save_positions(
body_id=body_id,
positions=pos_data,
source="nasa_horizons",
session=db
)
body_result["dates"].append({"date": date_str, "status": "success"})
success_count += 1
else:
body_result["dates"].append({"date": date_str, "status": "failed", "error": "No data"})
failed_count += 1
# Sleep slightly to prevent rate limiting and allow context switching
# await asyncio.sleep(0.1)
except Exception as e:
logger.error(f"Error processing {body_id} on {date_str}: {e}")
body_result["dates"].append({"date": date_str, "status": "error", "error": str(e)})
failed_count += 1
# Update progress
current_op += 1
progress = int((current_op / total_operations) * 100)
# Only update DB every 5% or so to reduce load, but update Redis frequently
# For now, update every item for simplicity
2025-12-26 01:21:15 +00:00
await task_service.update_task(db, task_id, progress=progress)
results.append(body_result)
# Complete
final_result = {
"total_success": success_count,
"total_failed": failed_count,
"details": results
}
2025-12-26 01:21:15 +00:00
await task_service.update_task(db, task_id, status="completed", progress=100, result=final_result)
logger.info(f"Task {task_id} completed successfully")
except Exception as e:
logger.error(f"Task {task_id} failed critically: {e}")
2025-12-26 01:21:15 +00:00
await task_service.update_task(db, task_id, status="failed", error_message=str(e))
2025-12-11 08:31:26 +00:00
async def generate_orbits_task(task_id: int, body_ids: Optional[List[str]] = None):
"""
Background task to generate orbits
Args:
task_id: ID of the task record to update
body_ids: List of body IDs to generate. If None, generates for all bodies with orbital params.
"""
logger.info(f"🚀 Starting background orbit generation task {task_id}")
async with AsyncSessionLocal() as db:
try:
await task_service.update_task(
db, task_id, status="running", started_at=datetime.utcnow(), progress=0
)
bodies_to_process = []
if body_ids:
for bid in body_ids:
body = await celestial_body_service.get_body_by_id(bid, db)
if body:
bodies_to_process.append(body)
else:
bodies_to_process = await celestial_body_service.get_all_bodies(db)
valid_bodies = []
for body in bodies_to_process:
extra_data = body.extra_data or {}
if extra_data.get("orbit_period_days"):
valid_bodies.append(body)
elif body_ids and body.id in body_ids:
logger.warning(f"Body {body.name} ({body.id}) missing 'orbit_period_days', skipping.")
total_bodies = len(valid_bodies)
if total_bodies == 0:
await task_service.update_task(
db, task_id, status="completed", progress=100,
result={"message": "No bodies with 'orbit_period_days' found to process"}
)
return
success_count = 0
failure_count = 0
results = []
for i, body in enumerate(valid_bodies):
try:
progress = int((i / total_bodies) * 100)
await task_service.update_task(db, task_id, progress=progress)
extra_data = body.extra_data or {}
period = float(extra_data.get("orbit_period_days"))
color = extra_data.get("orbit_color", "#CCCCCC")
orbit = await orbit_service.generate_orbit(
body_id=body.id,
body_name=body.name_zh or body.name,
period_days=period,
color=color,
session=db,
horizons_service=horizons_service
)
results.append({
"body_id": body.id,
"body_name": body.name_zh or body.name,
"status": "success",
"num_points": orbit.num_points
})
success_count += 1
except Exception as e:
logger.error(f"Failed to generate orbit for {body.name}: {e}")
results.append({
"body_id": body.id,
"body_name": body.name_zh or body.name,
"status": "failed",
"error": str(e)
})
failure_count += 1
await task_service.update_task(
db,
task_id,
status="completed",
progress=100,
completed_at=datetime.utcnow(),
result={
"total": total_bodies,
"success": success_count,
"failed": failure_count,
"details": results
}
)
logger.info(f"🏁 Orbit generation task {task_id} completed")
except Exception as e:
logger.error(f"Task {task_id} failed: {e}")
await task_service.update_task(
db, task_id, status="failed", error_message=str(e), completed_at=datetime.utcnow()
)
2025-12-26 01:21:15 +00:00
async def fetch_celestial_events_task(task_id: int):
"""
Background task to fetch celestial events (Close Approaches) from NASA SBDB
"""
logger.info(f"🚀 Starting celestial event fetch task {task_id}")
url = "https://ssd-api.jpl.nasa.gov/cad.api"
# Fetch data for next 60 days, close approach < 0.05 AU (approx 7.5M km)
params = {
"dist-max": "0.05",
"date-min": datetime.utcnow().strftime("%Y-%m-%d"),
"date-max": (datetime.utcnow() + timedelta(days=60)).strftime("%Y-%m-%d"),
"body": "ALL"
}
async with AsyncSessionLocal() as db:
try:
await task_service.update_task(db, task_id, status="running", progress=10)
async with httpx.AsyncClient(timeout=30) as client:
logger.info(f"Querying NASA SBDB CAD API: {url}")
response = await client.get(url, params=params)
if response.status_code != 200:
raise Exception(f"NASA API returned {response.status_code}: {response.text}")
data = response.json()
count = int(data.get("count", 0))
fields = data.get("fields", [])
data_rows = data.get("data", [])
logger.info(f"Fetched {count} close approach events")
# Map fields to indices
try:
idx_des = fields.index("des")
idx_cd = fields.index("cd")
idx_dist = fields.index("dist")
idx_v_rel = fields.index("v_rel")
except ValueError as e:
raise Exception(f"Missing expected field in NASA response: {e}")
processed_count = 0
saved_count = 0
# Get all active bodies to match against
all_bodies = await celestial_body_service.get_all_bodies(db)
# Map name/designation to body_id.
# NASA 'des' (designation) might match our 'name' or 'id'
# Simple lookup: dictionary of name -> id
body_map = {b.name.lower(): b.id for b in all_bodies}
# Also map id -> id just in case
for b in all_bodies:
body_map[b.id.lower()] = b.id
for row in data_rows:
des = row[idx_des].strip()
date_str = row[idx_cd] # YYYY-MMM-DD HH:MM
dist = row[idx_dist]
v_rel = row[idx_v_rel]
# Try to find matching body
# NASA des often looks like "2024 XK" or "433" (Eros)
# We try exact match first
target_id = body_map.get(des.lower())
if target_id:
# Found a match! Create event.
# NASA date format: 2025-Dec-18 12:00
try:
event_time = datetime.strptime(date_str, "%Y-%b-%d %H:%M")
except ValueError:
# Fallback if format differs slightly
event_time = datetime.utcnow()
event_data = CelestialEventCreate(
body_id=target_id,
title=f"Close Approach: {des}",
event_type="approach",
event_time=event_time,
description=f"Close approach to Earth at distance {dist} AU with relative velocity {v_rel} km/s",
details={
"nominal_dist_au": float(dist),
"v_rel_kms": float(v_rel),
"designation": des
},
source="nasa_sbdb"
)
# Ideally check for duplicates here (e.g. by body_id + event_time)
# For now, just create
await event_service.create_event(event_data, db)
saved_count += 1
processed_count += 1
await task_service.update_task(
db, task_id, status="completed", progress=100,
result={
"fetched": count,
"processed": processed_count,
"saved": saved_count,
"message": f"Successfully fetched {count} events, saved {saved_count} matched events."
}
)
except Exception as e:
logger.error(f"Task {task_id} failed: {e}")
await task_service.update_task(db, task_id, status="failed", error_message=str(e))