cosmo/DUPLICATE_FIX.md

290 lines
6.6 KiB
Markdown
Raw Normal View History

2025-12-02 13:25:28 +00:00
# 数据库重复数据问题修复
## 问题描述
在测试过程中发现两个数据库表存在重复数据问题:
### 1. positions 表重复数据
```
120192 399 2025-11-29 05:00:00 0.387... 0.907... -5.638e-05 nasa_horizons 2025-11-29 05:24:23.173
120193 399 2025-11-29 05:00:00 0.387... 0.907... -5.638e-05 nasa_horizons 2025-11-29 05:24:23.175
```
**原因**: 同一个天体在同一时刻有多条位置记录。
### 2. nasa_cache 表重复键错误
```
duplicate key value violates unique constraint "nasa_cache_pkey"
Key (cache_key)=(136199:2025-11-29T05:00:00+00:00:2025-11-29T05:00:00+00:00:1h) already exists.
```
**原因**: 尝试插入已存在的缓存键。
---
## 根本原因
### 并发竞态条件
当多个请求同时查询相同的时间点时:
```
时间线:
T1: 请求 A 查询 body_id=399, time=2025-11-29 05:00:00
T2: 请求 B 查询 body_id=399, time=2025-11-29 05:00:00
T3: 请求 A 检查数据库 -> 未找到 -> 准备插入
T4: 请求 B 检查数据库 -> 未找到 -> 准备插入
T5: 请求 A 插入记录(成功)
T6: 请求 B 插入记录(冲突!)❌
```
### 原始代码问题
#### save_positions (旧版本)
```python
# ❌ 问题:直接添加,不检查是否存在
for pos_data in positions:
position = Position(...)
s.add(position) # 可能重复
await s.commit()
```
#### save_response (旧版本)
```python
# ❌ 问题SELECT + INSERT 不是原子操作
cache = await s.execute(select(...)).scalar_one_or_none()
if not cache:
cache = NasaCache(...)
s.add(cache) # 可能在 SELECT 和 INSERT 之间被插入
await s.commit()
```
---
## 解决方案
使用 PostgreSQL 的 **UPSERT** 操作(`INSERT ... ON CONFLICT`),将检查和插入变为原子操作。
### 1. 修复 save_positions
**文件**: `backend/app/services/db_service.py`
```python
async def save_positions(...):
from sqlalchemy.dialects.postgresql import insert
for pos_data in positions:
# 使用 UPSERT
stmt = insert(Position).values(
body_id=body_id,
time=pos_data["time"],
x=pos_data["x"],
y=pos_data["y"],
z=pos_data["z"],
...
)
# 遇到冲突时更新
stmt = stmt.on_conflict_do_update(
index_elements=['body_id', 'time'], # 唯一约束
set_={
'x': pos_data["x"],
'y': pos_data["y"],
'z': pos_data["z"],
...
}
)
await s.execute(stmt)
```
**关键点**:
-`on_conflict_do_update` 原子操作
- ✅ 基于 `(body_id, time)` 唯一约束
- ✅ 冲突时更新而不是报错
---
### 2. 修复 save_response
**文件**: `backend/app/services/db_service.py`
```python
async def save_response(...):
from sqlalchemy.dialects.postgresql import insert
# 使用 UPSERT
stmt = insert(NasaCache).values(
cache_key=cache_key,
body_id=body_id,
start_time=start_naive,
end_time=end_naive,
step=step,
data=response_data,
expires_at=now_naive + timedelta(days=ttl_days)
)
# 遇到冲突时更新
stmt = stmt.on_conflict_do_update(
index_elements=['cache_key'], # 主键
set_={
'data': response_data,
'created_at': now_naive,
'expires_at': now_naive + timedelta(days=ttl_days)
}
).returning(NasaCache)
result = await s.execute(stmt)
cache = result.scalar_one()
```
**关键点**:
-`on_conflict_do_update` 原子操作
- ✅ 基于 `cache_key` 主键
- ✅ 冲突时更新数据和过期时间
---
## 数据库唯一约束验证
确保数据库表有正确的唯一约束:
### positions 表
```sql
-- 检查唯一约束
SELECT constraint_name, constraint_type
FROM information_schema.table_constraints
WHERE table_name = 'positions'
AND constraint_type = 'UNIQUE';
-- 如果没有,创建唯一约束
ALTER TABLE positions
ADD CONSTRAINT positions_body_time_unique
UNIQUE (body_id, time);
```
### nasa_cache 表
```sql
-- 检查主键
SELECT constraint_name, constraint_type
FROM information_schema.table_constraints
WHERE table_name = 'nasa_cache'
AND constraint_type = 'PRIMARY KEY';
-- cache_key 应该是主键,已有唯一约束
```
---
## 清理现有重复数据
执行 SQL 脚本清理重复数据:
```bash
psql -U postgres -d cosmo -f backend/scripts/cleanup_duplicates.sql
```
**脚本功能**:
1. 删除 positions 表中的重复记录(保留最新的)
2. 删除 nasa_cache 表中的重复记录(保留最新的)
3. 验证清理结果
---
## 验证修复效果
### 1. 重启后端服务
```bash
cd backend
python3 app/main.py
```
### 2. 测试并发请求
在两个终端同时执行相同的请求:
```bash
# 终端 1
curl "http://localhost:8000/api/celestial/positions?start_time=2025-11-29T12:00:00Z&end_time=2025-11-29T12:00:00Z&step=1h"
# 终端 2同时执行
curl "http://localhost:8000/api/celestial/positions?start_time=2025-11-29T12:00:00Z&end_time=2025-11-29T12:00:00Z&step=1h"
```
**预期结果**:
- ✅ 两个请求都成功返回
- ✅ 没有重复数据错误
- ✅ 数据库中只有一条记录
### 3. 验证数据库
```sql
-- 检查是否还有重复
SELECT body_id, time, COUNT(*)
FROM positions
GROUP BY body_id, time
HAVING COUNT(*) > 1;
-- 应返回 0 行
SELECT cache_key, COUNT(*)
FROM nasa_cache
GROUP BY cache_key
HAVING COUNT(*) > 1;
-- 应返回 0 行
```
---
## 性能优势
### UPSERT vs SELECT + INSERT
| 操作 | SELECT + INSERT | UPSERT |
|------|----------------|--------|
| 数据库往返次数 | 2 次SELECT + INSERT | 1 次 |
| 锁定时间 | 长(两个操作) | 短(单个操作) |
| 并发安全 | ❌ 不安全 | ✅ 安全 |
| 性能 | 慢 | 快 |
### 示例
假设 10 个并发请求:
**旧方法**:
- 10 个 SELECT可能都返回 NULL
- 10 个 INSERT 尝试9 个失败)
- 总数据库操作20 次
**新方法**:
- 10 个 UPSERT1 个 INSERT9 个 UPDATE
- 总数据库操作10 次
- 性能提升:**50%** ⚡
---
## 总结
### ✅ 已修复
1. **positions 表**: 使用 UPSERT 避免重复插入
2. **nasa_cache 表**: 使用 UPSERT 避免重复插入
3. **并发安全**: 原子操作避免竞态条件
4. **性能提升**: 减少数据库往返次数
### 🎯 后续建议
1. **定期清理**: 每天检查并清理潜在的重复数据
2. **监控告警**: 监控唯一约束冲突次数
3. **压力测试**: 测试高并发场景下的数据一致性
---
**文档版本**: v1.0
**最后更新**: 2025-11-29
**相关文件**:
- `backend/app/services/db_service.py` (修改)
- `backend/scripts/cleanup_duplicates.sql` (新增)