imetting/backend/app/services/admin_dashboard_service.py

594 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from app.core.response import create_api_response
from app.core.database import get_db_connection
from app.services.jwt_service import jwt_service
from app.core.config import AUDIO_DIR, REDIS_CONFIG
from app.services.async_transcription_service import AsyncTranscriptionService
from app.services.async_meeting_service import async_meeting_service
from datetime import datetime
from typing import Dict, List
import os
import redis
# Redis 客户端
redis_client = redis.Redis(**REDIS_CONFIG)
# 常量定义
AUDIO_FILE_EXTENSIONS = ('.wav', '.mp3', '.m4a', '.aac', '.flac', '.ogg', '.mpeg', '.mp4', '.webm')
BYTES_TO_GB = 1024 ** 3
transcription_service = AsyncTranscriptionService()
def _build_status_condition(status: str) -> str:
"""构建任务状态查询条件"""
if status == 'running':
return "AND (t.status = 'pending' OR t.status = 'processing')"
elif status == 'completed':
return "AND t.status = 'completed'"
elif status == 'failed':
return "AND t.status = 'failed'"
return ""
def _get_task_stats_query() -> str:
"""获取任务统计的 SQL 查询"""
return """
SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 'pending' OR status = 'processing' THEN 1 ELSE 0 END) as running,
SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
"""
def _get_online_user_count(redis_client) -> int:
"""从 Redis 获取在线用户数"""
try:
token_keys = redis_client.keys("token:*")
user_ids = set()
for key in token_keys:
if isinstance(key, bytes):
key = key.decode("utf-8", errors="ignore")
parts = key.split(':')
if len(parts) >= 2:
user_ids.add(parts[1])
return len(user_ids)
except Exception as e:
print(f"获取在线用户数失败: {e}")
return 0
def _table_exists(cursor, table_name: str) -> bool:
cursor.execute(
"""
SELECT COUNT(*) AS cnt
FROM information_schema.tables
WHERE table_schema = DATABASE() AND table_name = %s
""",
(table_name,),
)
return (cursor.fetchone() or {}).get("cnt", 0) > 0
def _calculate_audio_storage() -> Dict[str, float]:
"""计算音频文件存储统计"""
audio_files_count = 0
audio_total_size = 0
try:
if os.path.exists(AUDIO_DIR):
for root, _, files in os.walk(AUDIO_DIR):
for file in files:
file_extension = os.path.splitext(file)[1].lower()
if file_extension in AUDIO_FILE_EXTENSIONS:
audio_files_count += 1
file_path = os.path.join(root, file)
try:
audio_total_size += os.path.getsize(file_path)
except OSError:
continue
except Exception as e:
print(f"统计音频文件失败: {e}")
return {
"audio_file_count": audio_files_count,
"audio_files_count": audio_files_count,
"audio_total_size_gb": round(audio_total_size / BYTES_TO_GB, 2)
}
async def get_dashboard_stats(current_user=None):
"""获取管理员 Dashboard 统计数据"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
# 1. 用户统计
today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
total_users = 0
today_new_users = 0
if _table_exists(cursor, "sys_users"):
cursor.execute("SELECT COUNT(*) as total FROM sys_users")
total_users = (cursor.fetchone() or {}).get("total", 0)
cursor.execute(
"SELECT COUNT(*) as count FROM sys_users WHERE created_at >= %s",
(today_start,),
)
today_new_users = (cursor.fetchone() or {}).get("count", 0)
online_users = _get_online_user_count(redis_client)
# 2. 会议统计
total_meetings = 0
today_new_meetings = 0
if _table_exists(cursor, "meetings"):
cursor.execute("SELECT COUNT(*) as total FROM meetings")
total_meetings = (cursor.fetchone() or {}).get("total", 0)
cursor.execute(
"SELECT COUNT(*) as count FROM meetings WHERE created_at >= %s",
(today_start,),
)
today_new_meetings = (cursor.fetchone() or {}).get("count", 0)
# 3. 任务统计
task_stats_query = _get_task_stats_query()
# 转录任务
if _table_exists(cursor, "transcript_tasks"):
cursor.execute(f"{task_stats_query} FROM transcript_tasks")
transcription_stats = cursor.fetchone() or {'total': 0, 'running': 0, 'completed': 0, 'failed': 0}
else:
transcription_stats = {'total': 0, 'running': 0, 'completed': 0, 'failed': 0}
# 总结任务
if _table_exists(cursor, "llm_tasks"):
cursor.execute(f"{task_stats_query} FROM llm_tasks")
summary_stats = cursor.fetchone() or {'total': 0, 'running': 0, 'completed': 0, 'failed': 0}
else:
summary_stats = {'total': 0, 'running': 0, 'completed': 0, 'failed': 0}
# 知识库任务
if _table_exists(cursor, "knowledge_base_tasks"):
cursor.execute(f"{task_stats_query} FROM knowledge_base_tasks")
kb_stats = cursor.fetchone() or {'total': 0, 'running': 0, 'completed': 0, 'failed': 0}
else:
kb_stats = {'total': 0, 'running': 0, 'completed': 0, 'failed': 0}
# 4. 音频存储统计
storage_stats = _calculate_audio_storage()
# 组装返回数据
stats = {
"users": {
"total": total_users,
"today_new": today_new_users,
"online": online_users
},
"meetings": {
"total": total_meetings,
"today_new": today_new_meetings
},
"tasks": {
"transcription": {
"total": transcription_stats['total'] or 0,
"running": transcription_stats['running'] or 0,
"completed": transcription_stats['completed'] or 0,
"failed": transcription_stats['failed'] or 0
},
"summary": {
"total": summary_stats['total'] or 0,
"running": summary_stats['running'] or 0,
"completed": summary_stats['completed'] or 0,
"failed": summary_stats['failed'] or 0
},
"knowledge_base": {
"total": kb_stats['total'] or 0,
"running": kb_stats['running'] or 0,
"completed": kb_stats['completed'] or 0,
"failed": kb_stats['failed'] or 0
}
},
"storage": storage_stats
}
return create_api_response(code="200", message="获取统计数据成功", data=stats)
except Exception as e:
print(f"获取Dashboard统计数据失败: {e}")
return create_api_response(code="500", message=f"获取统计数据失败: {str(e)}")
async def get_online_users(current_user=None):
"""获取在线用户列表"""
try:
token_keys = redis_client.keys("token:*")
# 提取用户ID并去重
user_tokens = {}
for key in token_keys:
if isinstance(key, bytes):
key = key.decode("utf-8", errors="ignore")
parts = key.split(':')
if len(parts) >= 3:
user_id = int(parts[1])
token = parts[2]
if user_id not in user_tokens:
user_tokens[user_id] = []
user_tokens[user_id].append({'token': token, 'key': key})
# 查询用户信息
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
online_users_list = []
for user_id, tokens in user_tokens.items():
cursor.execute(
"SELECT user_id, username, caption, email, role_id FROM sys_users WHERE user_id = %s",
(user_id,)
)
user = cursor.fetchone()
if user:
ttl_seconds = redis_client.ttl(tokens[0]['key'])
online_users_list.append({
**user,
'token_count': len(tokens),
'ttl_seconds': ttl_seconds,
'ttl_hours': round(ttl_seconds / 3600, 1) if ttl_seconds > 0 else 0
})
# 按用户ID排序
online_users_list.sort(key=lambda x: x['user_id'])
return create_api_response(
code="200",
message="获取在线用户列表成功",
data={"users": online_users_list, "total": len(online_users_list)}
)
except Exception as e:
print(f"获取在线用户列表失败: {e}")
return create_api_response(code="500", message=f"获取在线用户列表失败: {str(e)}")
async def kick_user(user_id: int, current_user=None):
"""踢出用户(撤销该用户的所有 token"""
try:
revoked_count = jwt_service.revoke_all_user_tokens(user_id)
if revoked_count > 0:
return create_api_response(
code="200",
message=f"已踢出用户,撤销了 {revoked_count} 个 token",
data={"user_id": user_id, "revoked_count": revoked_count}
)
else:
return create_api_response(
code="404",
message="该用户当前不在线或未找到 token"
)
except Exception as e:
print(f"踢出用户失败: {e}")
return create_api_response(code="500", message=f"踢出用户失败: {str(e)}")
async def monitor_tasks(
task_type: str = 'all',
status: str = 'all',
limit: int = 20,
current_user=None
):
"""监控任务进度"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
tasks = []
status_condition = _build_status_condition(status)
# 转录任务
if task_type in ['all', 'transcription']:
query = f"""
SELECT
t.task_id,
'transcription' as task_type,
t.meeting_id,
m.title as meeting_title,
t.status,
t.progress,
t.error_message,
t.created_at,
t.completed_at,
u.username as creator_name
FROM transcript_tasks t
LEFT JOIN meetings m ON t.meeting_id = m.meeting_id
LEFT JOIN sys_users u ON m.user_id = u.user_id
WHERE 1=1 {status_condition}
ORDER BY t.created_at DESC
LIMIT %s
"""
cursor.execute(query, (limit,))
tasks.extend(cursor.fetchall())
# 总结任务
if task_type in ['all', 'summary']:
query = f"""
SELECT
t.task_id,
'summary' as task_type,
t.meeting_id,
m.title as meeting_title,
t.status,
t.progress,
t.result,
t.error_message,
t.created_at,
t.completed_at,
u.username as creator_name
FROM llm_tasks t
LEFT JOIN meetings m ON t.meeting_id = m.meeting_id
LEFT JOIN sys_users u ON m.user_id = u.user_id
WHERE 1=1 {status_condition}
ORDER BY t.created_at DESC
LIMIT %s
"""
cursor.execute(query, (limit,))
tasks.extend(cursor.fetchall())
# 知识库任务
if task_type in ['all', 'knowledge_base']:
query = f"""
SELECT
t.task_id,
'knowledge_base' as task_type,
t.kb_id as meeting_id,
k.title as meeting_title,
t.status,
t.progress,
t.error_message,
t.created_at,
t.updated_at,
u.username as creator_name
FROM knowledge_base_tasks t
LEFT JOIN knowledge_bases k ON t.kb_id = k.kb_id
LEFT JOIN sys_users u ON k.creator_id = u.user_id
WHERE 1=1 {status_condition}
ORDER BY t.created_at DESC
LIMIT %s
"""
cursor.execute(query, (limit,))
tasks.extend(cursor.fetchall())
# 按创建时间排序并限制返回数量
tasks.sort(key=lambda x: x['created_at'], reverse=True)
tasks = tasks[:limit]
return create_api_response(
code="200",
message="获取任务监控数据成功",
data={"tasks": tasks, "total": len(tasks)}
)
except Exception as e:
print(f"获取任务监控数据失败: {e}")
import traceback
traceback.print_exc()
return create_api_response(code="500", message=f"获取任务监控数据失败: {str(e)}")
def _parse_optional_int(value):
if value in (None, "", "None"):
return None
try:
return int(value)
except (TypeError, ValueError):
return None
async def retry_task(task_type: str, task_id: str, current_user=None):
"""重试或恢复后台任务。"""
try:
normalized_type = (task_type or "").strip().lower()
if normalized_type == "summary":
return _retry_summary_task(task_id)
if normalized_type == "transcription":
return _retry_transcription_task(task_id)
return create_api_response(code="400", message="不支持的任务类型")
except Exception as e:
print(f"重试任务失败: {e}")
return create_api_response(code="500", message=f"重试任务失败: {str(e)}")
def _retry_summary_task(task_id: str):
task_data = async_meeting_service._get_task_from_db(task_id)
if not task_data:
return create_api_response(code="404", message="总结任务不存在")
status = str(task_data.get("status") or "").lower()
meeting_id = _parse_optional_int(task_data.get("meeting_id"))
if not meeting_id:
return create_api_response(code="400", message="总结任务缺少关联会议")
if status in {"pending", "processing"}:
async_meeting_service._resume_task_if_needed(task_id, task_data)
status_info = async_meeting_service.get_task_status(task_id)
return create_api_response(
code="200",
message="总结任务已尝试恢复",
data={"task_id": task_id, "status": status_info.get("status"), "progress": status_info.get("progress", 0)},
)
if status == "completed":
return create_api_response(code="400", message="总结任务已完成,无需重试")
prompt_id = _parse_optional_int(task_data.get("prompt_id"))
user_prompt = "" if task_data.get("user_prompt") in (None, "None") else str(task_data.get("user_prompt"))
new_task_id, _ = async_meeting_service.enqueue_summary_generation(
meeting_id,
user_prompt=user_prompt,
prompt_id=prompt_id,
model_code=None,
)
return create_api_response(
code="200",
message="总结任务已重新提交",
data={"task_id": new_task_id, "previous_task_id": task_id, "status": "pending", "meeting_id": meeting_id},
)
def _retry_transcription_task(task_id: str):
task_data = transcription_service._get_task_from_db(task_id)
if not task_data:
return create_api_response(code="404", message="转录任务不存在")
status = str(task_data.get("status") or "").lower()
meeting_id = _parse_optional_int(task_data.get("meeting_id"))
if not meeting_id:
return create_api_response(code="400", message="转录任务缺少关联会议")
if status in {"pending", "processing"}:
status_info = transcription_service.get_task_status(task_id)
return create_api_response(
code="200",
message="转录任务状态已刷新",
data={"task_id": task_id, "status": status_info.get("status"), "progress": status_info.get("progress", 0)},
)
if status == "completed":
return create_api_response(code="400", message="转录任务已完成,无需重试")
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT file_path FROM audio_files WHERE meeting_id = %s LIMIT 1", (meeting_id,))
audio_file = cursor.fetchone()
cursor.execute("SELECT prompt_id FROM meetings WHERE meeting_id = %s LIMIT 1", (meeting_id,))
meeting = cursor.fetchone()
cursor.close()
if not audio_file or not audio_file.get("file_path"):
return create_api_response(code="400", message="会议缺少可用音频,无法重试转录")
new_task_id = transcription_service.start_transcription(meeting_id, audio_file["file_path"])
async_meeting_service.enqueue_transcription_monitor(
meeting_id,
new_task_id,
_parse_optional_int((meeting or {}).get("prompt_id")),
None,
)
return create_api_response(
code="200",
message="转录任务已重新提交",
data={"task_id": new_task_id, "previous_task_id": task_id, "status": "pending", "meeting_id": meeting_id},
)
async def get_system_resources(current_user=None):
"""获取服务器资源使用情况"""
try:
import psutil
# CPU 使用率
cpu_percent = psutil.cpu_percent(interval=1)
cpu_count = psutil.cpu_count()
# 内存使用情况
memory = psutil.virtual_memory()
memory_total_gb = round(memory.total / BYTES_TO_GB, 2)
memory_used_gb = round(memory.used / BYTES_TO_GB, 2)
# 磁盘使用情况
disk = psutil.disk_usage('/')
disk_total_gb = round(disk.total / BYTES_TO_GB, 2)
disk_used_gb = round(disk.used / BYTES_TO_GB, 2)
resources = {
"cpu": {
"percent": cpu_percent,
"count": cpu_count
},
"memory": {
"total_gb": memory_total_gb,
"used_gb": memory_used_gb,
"percent": memory.percent
},
"disk": {
"total_gb": disk_total_gb,
"used_gb": disk_used_gb,
"percent": disk.percent
},
"timestamp": datetime.now().isoformat()
}
return create_api_response(code="200", message="获取系统资源成功", data=resources)
except ImportError:
return create_api_response(
code="500",
message="psutil 库未安装,请运行: pip install psutil"
)
except Exception as e:
print(f"获取系统资源失败: {e}")
return create_api_response(code="500", message=f"获取系统资源失败: {str(e)}")
async def get_user_stats(current_user=None):
"""获取用户统计列表"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
# 查询所有用户及其会议统计和最后登录时间(排除没有会议的用户)
query = """
SELECT
u.user_id,
u.username,
u.caption,
u.created_at,
(SELECT MAX(created_at) FROM user_logs
WHERE user_id = u.user_id AND action_type = 'login') as last_login_time,
COUNT(DISTINCT m.meeting_id) as meeting_count,
COALESCE(SUM(af.duration), 0) as total_duration_seconds
FROM sys_users u
INNER JOIN meetings m ON u.user_id = m.user_id
LEFT JOIN audio_files af ON m.meeting_id = af.meeting_id
GROUP BY u.user_id, u.username, u.caption, u.created_at
HAVING meeting_count > 0
ORDER BY u.user_id ASC
"""
cursor.execute(query)
users = cursor.fetchall()
# 格式化返回数据
users_list = []
for user in users:
total_seconds = int(user['total_duration_seconds']) if user['total_duration_seconds'] else 0
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
users_list.append({
'user_id': user['user_id'],
'username': user['username'],
'caption': user['caption'],
'created_at': user['created_at'].isoformat() if user['created_at'] else None,
'last_login_time': user['last_login_time'].isoformat() if user['last_login_time'] else None,
'meeting_count': user['meeting_count'],
'total_duration_seconds': total_seconds,
'total_duration_formatted': f"{hours}h {minutes}m" if total_seconds > 0 else '-'
})
return create_api_response(
code="200",
message="获取用户统计成功",
data={"users": users_list, "total": len(users_list)}
)
except Exception as e:
print(f"获取用户统计失败: {e}")
import traceback
traceback.print_exc()
return create_api_response(code="500", message=f"获取用户统计失败: {str(e)}")