cosmo/backend/app/services/horizons.py

355 lines
14 KiB
Python
Raw Permalink Normal View History

"""
NASA JPL Horizons data query service
"""
from datetime import datetime, timedelta
from astropy.time import Time
import logging
2025-11-30 05:26:01 +00:00
import re
import httpx
2025-12-03 05:40:44 +00:00
import os
2025-12-26 01:21:15 +00:00
import json
from sqlalchemy.ext.asyncio import AsyncSession
2025-11-29 15:09:31 +00:00
from app.models.celestial import Position, CelestialBody
2025-12-03 05:40:44 +00:00
from app.config import settings
2025-12-26 01:21:15 +00:00
from app.services.redis_cache import redis_cache
logger = logging.getLogger(__name__)
class HorizonsService:
"""Service for querying NASA JPL Horizons system"""
def __init__(self):
"""Initialize the service"""
self.location = "@sun" # Heliocentric coordinates
# Proxy is handled via settings.proxy_dict in each request
2025-12-03 05:40:44 +00:00
async def get_object_data_raw(self, body_id: str) -> str:
"""
Get raw object data (terminal style text) from Horizons
Args:
body_id: JPL Horizons ID
Returns:
Raw text response from NASA
"""
url = "https://ssd.jpl.nasa.gov/api/horizons.api"
# Ensure ID is quoted for COMMAND
cmd_val = f"'{body_id}'" if not body_id.startswith("'") else body_id
2025-12-03 05:40:44 +00:00
params = {
"format": "text",
"COMMAND": cmd_val,
"OBJ_DATA": "YES",
"MAKE_EPHEM": "NO",
"EPHEM_TYPE": "VECTORS",
"CENTER": "@sun"
}
try:
2025-12-03 05:40:44 +00:00
# Configure proxy if available
client_kwargs = {"timeout": settings.nasa_api_timeout}
2025-12-03 05:40:44 +00:00
if settings.proxy_dict:
client_kwargs["proxies"] = settings.proxy_dict
logger.info(f"Using proxy for NASA API: {settings.proxy_dict}")
async with httpx.AsyncClient(**client_kwargs) as client:
logger.info(f"Fetching raw data for body {body_id} with timeout {settings.nasa_api_timeout}s")
2025-12-03 05:40:44 +00:00
response = await client.get(url, params=params)
if response.status_code != 200:
raise Exception(f"NASA API returned status {response.status_code}")
return response.text
except Exception as e:
2025-12-11 08:31:26 +00:00
logger.error(f"Error fetching raw data for {body_id}: {repr(e)}")
raise
2026-01-01 18:08:18 +00:00
async def search_body_by_name(self, name: str, db: AsyncSession = None) -> dict:
"""
Search for a celestial body in Horizons by name.
Args:
name: Name to search (e.g. 'Ceres', 'Halley')
db: Database session (optional, for future caching)
Returns:
Dict with success/error and data
"""
url = "https://ssd.jpl.nasa.gov/api/horizons.api"
# Using a wildcard search command
cmd_val = f"'{name}*'"
params = {
"format": "text",
"COMMAND": cmd_val,
"OBJ_DATA": "YES",
"MAKE_EPHEM": "NO"
}
try:
client_kwargs = {"timeout": settings.nasa_api_timeout}
if settings.proxy_dict:
client_kwargs["proxies"] = settings.proxy_dict
async with httpx.AsyncClient(**client_kwargs) as client:
logger.info(f"Searching Horizons for: {name}")
response = await client.get(url, params=params)
if response.status_code != 200:
return {"success": False, "error": f"NASA API Error: {response.status_code}"}
text = response.text
# Case 1: Direct match (Horizon returns data directly)
# Look for "Target body name:" or similar indicators of a resolved body
if "Target body name:" in text or "Physical properties" in text:
# Extract ID and Name
# Pattern: "Target body name: 1 Ceres (A801 AA)" or similar
match = re.search(r"Target body name:\s*(.*?)\s*\{", text)
if not match:
match = re.search(r"Target body name:\s*(.*?)\n", text)
full_name = match.group(1).strip() if match else name
# Try to extract ID from the text, usually in the header or COMMAND output
# This is tricky with raw text, but let's try to extract from the command echo
# Or we can just use the input name if we can't find a better ID
# Ideally we want the numeric ID (e.g. '1' for Ceres, '399' for Earth)
# If it's a direct match, the ID might not be explicitly listed as "ID = ..."
# But often the name contains it, e.g. "1 Ceres"
body_id = name # Fallback
# Try to parse "1 Ceres" -> id=1
id_match = re.search(r"^(\d+)\s+([a-zA-Z]+)", full_name)
if id_match:
body_id = id_match.group(1)
clean_name = id_match.group(2)
else:
clean_name = full_name
return {
"success": True,
"id": body_id,
"name": clean_name,
"full_name": full_name
}
# Case 2: Multiple matches (Ambiguous)
# Horizons returns a list of matches
if "Multiple major-bodies match" in text or "Matching small-bodies" in text:
# We need to parse the list and pick the best match or return the first one
# For now, let's try to find the most likely match (exact name)
# Pattern for small bodies: "record # epoch-yr primary desig >name<"
# or " ID Name Designation"
# Simple heuristic: Look for lines containing the name
lines = text.split('\n')
best_match = None
for line in lines:
if name.lower() in line.lower():
# Try to extract ID (first column usually)
parts = line.strip().split()
if parts and parts[0].isdigit() or (parts[0].startswith('-') and parts[0][1:].isdigit()):
best_match = {
"id": parts[0],
"name": name,
"full_name": line.strip()
}
break
if best_match:
return {
"success": True,
"id": best_match["id"],
"name": best_match["name"],
"full_name": best_match["full_name"]
}
return {"success": False, "error": "Multiple matches found, please be more specific"}
# Case 3: No match
if "No matches found" in text:
return {"success": False, "error": "No celestial body found with that name"}
# Fallback for unknown response format
return {"success": False, "error": "Could not parse NASA response"}
except Exception as e:
logger.error(f"Search error for {name}: {repr(e)}")
return {"success": False, "error": str(e)}
async def get_body_positions(
self,
body_id: str,
start_time: datetime | None = None,
end_time: datetime | None = None,
step: str = "1d",
) -> list[Position]:
"""
Get positions for a celestial body over a time range
Args:
body_id: JPL Horizons ID (e.g., '-31' for Voyager 1)
start_time: Start datetime (default: now)
end_time: End datetime (default: now)
step: Time step (e.g., '1d' for 1 day, '1h' for 1 hour)
Returns:
List of Position objects
"""
2025-12-26 01:21:15 +00:00
# Set default times and format for cache key
if start_time is None:
start_time = datetime.utcnow()
if end_time is None:
end_time = start_time
start_str_cache = start_time.strftime('%Y-%m-%d')
end_str_cache = end_time.strftime('%Y-%m-%d')
# 1. Try to fetch from Redis cache
cache_key = f"nasa:horizons:positions:{body_id}:{start_str_cache}:{end_str_cache}:{step}"
cached_data = await redis_cache.get(cache_key)
if cached_data:
logger.info(f"Cache HIT for {body_id} positions ({start_str_cache}-{end_str_cache})")
# Deserialize cached JSON data back to Position objects
positions_data = json.loads(cached_data)
positions = []
for item in positions_data:
# Ensure 'time' is converted back to datetime object
item['time'] = datetime.fromisoformat(item['time'])
positions.append(Position(**item))
return positions
logger.info(f"Cache MISS for {body_id} positions ({start_str_cache}-{end_str_cache}). Fetching from NASA.")
2025-12-26 01:21:15 +00:00
try:
# Format time for Horizons API
if start_time.date() == end_time.date():
start_str = start_time.strftime('%Y-%m-%d')
end_time_adjusted = start_time + timedelta(days=1)
end_str = end_time_adjusted.strftime('%Y-%m-%d')
else:
start_str = start_time.strftime('%Y-%m-%d')
end_str = end_time.strftime('%Y-%m-%d')
logger.info(f"Querying Horizons (httpx) for body {body_id} from {start_str} to {end_str}")
url = "https://ssd.jpl.nasa.gov/api/horizons.api"
cmd_val = f"'{body_id}'" if not body_id.startswith("'") else body_id
params = {
"format": "text",
"COMMAND": cmd_val,
"OBJ_DATA": "NO",
"MAKE_EPHEM": "YES",
"EPHEM_TYPE": "VECTORS",
"CENTER": self.location,
"START_TIME": start_str,
"STOP_TIME": end_str,
"STEP_SIZE": step,
"CSV_FORMAT": "YES",
"OUT_UNITS": "AU-D"
}
# Configure proxy if available
client_kwargs = {"timeout": settings.nasa_api_timeout}
if settings.proxy_dict:
client_kwargs["proxies"] = settings.proxy_dict
logger.info(f"Using proxy for NASA API: {settings.proxy_dict}")
async with httpx.AsyncClient(**client_kwargs) as client:
response = await client.get(url, params=params)
if response.status_code != 200:
raise Exception(f"NASA API returned status {response.status_code}")
2025-12-26 01:21:15 +00:00
positions = self._parse_vectors(response.text)
# 2. Cache the result before returning
if positions:
# Serialize Position objects to list of dicts for JSON storage
# Convert datetime to ISO format string for JSON serialization
positions_data_to_cache = []
for p in positions:
pos_dict = p.dict()
# Convert datetime to ISO string
if isinstance(pos_dict.get('time'), datetime):
pos_dict['time'] = pos_dict['time'].isoformat()
positions_data_to_cache.append(pos_dict)
# Use a TTL of 7 days (604800 seconds) for now, can be made configurable
await redis_cache.set(cache_key, json.dumps(positions_data_to_cache), ttl_seconds=604800)
logger.info(f"Cache SET for {body_id} positions ({start_str_cache}-{end_str_cache}) with TTL 7 days.")
return positions
except Exception as e:
2025-12-11 08:31:26 +00:00
logger.error(f"Error querying Horizons for body {body_id}: {repr(e)}")
raise
def _parse_vectors(self, text: str) -> list[Position]:
"""
Parse Horizons CSV output for vector data
Format looks like:
$$SOE
2460676.500000000, A.D. 2025-Jan-01 00:00:00.0000, 9.776737278236609E-01, -1.726677228793678E-01, -1.636678733289160E-05, ...
$$EOE
"""
positions = []
# Extract data block between $$SOE and $$EOE
match = re.search(r'\$\$SOE(.*?)\$\$EOE', text, re.DOTALL)
if not match:
logger.warning("No data block ($$SOE...$$EOE) found in Horizons response")
2025-12-26 01:21:15 +00:00
logger.debug(f"Response snippet: {text[:500]}...")
return []
2025-12-26 01:21:15 +00:00
data_block = match.group(1).strip()
lines = data_block.split('\n')
2025-12-26 01:21:15 +00:00
for line in lines:
parts = [p.strip() for p in line.split(',')]
if len(parts) < 5:
continue
2025-12-26 01:21:15 +00:00
try:
# Index 0: JD, 1: Date, 2: X, 3: Y, 4: Z, 5: VX, 6: VY, 7: VZ
jd_str = parts[0]
time_obj = Time(float(jd_str), format="jd").datetime
2025-12-26 01:21:15 +00:00
x = float(parts[2])
y = float(parts[3])
z = float(parts[4])
2025-12-26 01:21:15 +00:00
# Velocity if available (indices 5, 6, 7)
vx = float(parts[5]) if len(parts) > 5 else None
vy = float(parts[6]) if len(parts) > 6 else None
vz = float(parts[7]) if len(parts) > 7 else None
2025-12-26 01:21:15 +00:00
pos = Position(
time=time_obj,
2025-12-26 01:21:15 +00:00
x=x,
y=y,
z=z,
vx=vx,
vy=vy,
vz=vz
)
positions.append(pos)
2025-12-26 01:21:15 +00:00
except (ValueError, IndexError) as e:
logger.warning(f"Failed to parse line: {line}. Error: {e}")
continue
return positions
2025-12-26 01:21:15 +00:00
# Global singleton instance
horizons_service = HorizonsService()