from app.core.response import create_api_response from app.core.database import get_db_connection from app.services.jwt_service import jwt_service from app.core.config import AUDIO_DIR, REDIS_CONFIG from app.services.async_transcription_service import AsyncTranscriptionService from app.services.async_meeting_service import async_meeting_service from datetime import datetime from typing import Dict, List import os import redis # Redis 客户端 redis_client = redis.Redis(**REDIS_CONFIG) # 常量定义 AUDIO_FILE_EXTENSIONS = ('.wav', '.mp3', '.m4a', '.aac', '.flac', '.ogg', '.mpeg', '.mp4', '.webm') BYTES_TO_GB = 1024 ** 3 transcription_service = AsyncTranscriptionService() def _build_status_condition(status: str) -> str: """构建任务状态查询条件""" if status == 'running': return "AND (t.status = 'pending' OR t.status = 'processing')" elif status == 'completed': return "AND t.status = 'completed'" elif status == 'failed': return "AND t.status = 'failed'" return "" def _get_task_stats_query() -> str: """获取任务统计的 SQL 查询""" return """ SELECT COUNT(*) as total, SUM(CASE WHEN status = 'pending' OR status = 'processing' THEN 1 ELSE 0 END) as running, SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed, SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed """ def _get_online_user_count(redis_client) -> int: """从 Redis 获取在线用户数""" try: token_keys = redis_client.keys("token:*") user_ids = set() for key in token_keys: if isinstance(key, bytes): key = key.decode("utf-8", errors="ignore") parts = key.split(':') if len(parts) >= 2: user_ids.add(parts[1]) return len(user_ids) except Exception as e: print(f"获取在线用户数失败: {e}") return 0 def _table_exists(cursor, table_name: str) -> bool: cursor.execute( """ SELECT COUNT(*) AS cnt FROM information_schema.tables WHERE table_schema = DATABASE() AND table_name = %s """, (table_name,), ) return (cursor.fetchone() or {}).get("cnt", 0) > 0 def _calculate_audio_storage() -> Dict[str, float]: """计算音频文件存储统计""" audio_files_count = 0 audio_total_size = 0 try: if os.path.exists(AUDIO_DIR): for root, _, files in os.walk(AUDIO_DIR): for file in files: file_extension = os.path.splitext(file)[1].lower() if file_extension in AUDIO_FILE_EXTENSIONS: audio_files_count += 1 file_path = os.path.join(root, file) try: audio_total_size += os.path.getsize(file_path) except OSError: continue except Exception as e: print(f"统计音频文件失败: {e}") return { "audio_file_count": audio_files_count, "audio_files_count": audio_files_count, "audio_total_size_gb": round(audio_total_size / BYTES_TO_GB, 2) } async def get_dashboard_stats(current_user=None): """获取管理员 Dashboard 统计数据""" try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) # 1. 用户统计 today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) total_users = 0 today_new_users = 0 if _table_exists(cursor, "sys_users"): cursor.execute("SELECT COUNT(*) as total FROM sys_users") total_users = (cursor.fetchone() or {}).get("total", 0) cursor.execute( "SELECT COUNT(*) as count FROM sys_users WHERE created_at >= %s", (today_start,), ) today_new_users = (cursor.fetchone() or {}).get("count", 0) online_users = _get_online_user_count(redis_client) # 2. 会议统计 total_meetings = 0 today_new_meetings = 0 if _table_exists(cursor, "meetings"): cursor.execute("SELECT COUNT(*) as total FROM meetings") total_meetings = (cursor.fetchone() or {}).get("total", 0) cursor.execute( "SELECT COUNT(*) as count FROM meetings WHERE created_at >= %s", (today_start,), ) today_new_meetings = (cursor.fetchone() or {}).get("count", 0) # 3. 任务统计 task_stats_query = _get_task_stats_query() # 转录任务 if _table_exists(cursor, "transcript_tasks"): cursor.execute(f"{task_stats_query} FROM transcript_tasks") transcription_stats = cursor.fetchone() or {'total': 0, 'running': 0, 'completed': 0, 'failed': 0} else: transcription_stats = {'total': 0, 'running': 0, 'completed': 0, 'failed': 0} # 总结任务 if _table_exists(cursor, "llm_tasks"): cursor.execute(f"{task_stats_query} FROM llm_tasks") summary_stats = cursor.fetchone() or {'total': 0, 'running': 0, 'completed': 0, 'failed': 0} else: summary_stats = {'total': 0, 'running': 0, 'completed': 0, 'failed': 0} # 知识库任务 if _table_exists(cursor, "knowledge_base_tasks"): cursor.execute(f"{task_stats_query} FROM knowledge_base_tasks") kb_stats = cursor.fetchone() or {'total': 0, 'running': 0, 'completed': 0, 'failed': 0} else: kb_stats = {'total': 0, 'running': 0, 'completed': 0, 'failed': 0} # 4. 音频存储统计 storage_stats = _calculate_audio_storage() # 组装返回数据 stats = { "users": { "total": total_users, "today_new": today_new_users, "online": online_users }, "meetings": { "total": total_meetings, "today_new": today_new_meetings }, "tasks": { "transcription": { "total": transcription_stats['total'] or 0, "running": transcription_stats['running'] or 0, "completed": transcription_stats['completed'] or 0, "failed": transcription_stats['failed'] or 0 }, "summary": { "total": summary_stats['total'] or 0, "running": summary_stats['running'] or 0, "completed": summary_stats['completed'] or 0, "failed": summary_stats['failed'] or 0 }, "knowledge_base": { "total": kb_stats['total'] or 0, "running": kb_stats['running'] or 0, "completed": kb_stats['completed'] or 0, "failed": kb_stats['failed'] or 0 } }, "storage": storage_stats } return create_api_response(code="200", message="获取统计数据成功", data=stats) except Exception as e: print(f"获取Dashboard统计数据失败: {e}") return create_api_response(code="500", message=f"获取统计数据失败: {str(e)}") async def get_online_users(current_user=None): """获取在线用户列表""" try: token_keys = redis_client.keys("token:*") # 提取用户ID并去重 user_tokens = {} for key in token_keys: if isinstance(key, bytes): key = key.decode("utf-8", errors="ignore") parts = key.split(':') if len(parts) >= 3: user_id = int(parts[1]) token = parts[2] if user_id not in user_tokens: user_tokens[user_id] = [] user_tokens[user_id].append({'token': token, 'key': key}) # 查询用户信息 with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) online_users_list = [] for user_id, tokens in user_tokens.items(): cursor.execute( "SELECT user_id, username, caption, email, role_id FROM sys_users WHERE user_id = %s", (user_id,) ) user = cursor.fetchone() if user: ttl_seconds = redis_client.ttl(tokens[0]['key']) online_users_list.append({ **user, 'token_count': len(tokens), 'ttl_seconds': ttl_seconds, 'ttl_hours': round(ttl_seconds / 3600, 1) if ttl_seconds > 0 else 0 }) # 按用户ID排序 online_users_list.sort(key=lambda x: x['user_id']) return create_api_response( code="200", message="获取在线用户列表成功", data={"users": online_users_list, "total": len(online_users_list)} ) except Exception as e: print(f"获取在线用户列表失败: {e}") return create_api_response(code="500", message=f"获取在线用户列表失败: {str(e)}") async def kick_user(user_id: int, current_user=None): """踢出用户(撤销该用户的所有 token)""" try: revoked_count = jwt_service.revoke_all_user_tokens(user_id) if revoked_count > 0: return create_api_response( code="200", message=f"已踢出用户,撤销了 {revoked_count} 个 token", data={"user_id": user_id, "revoked_count": revoked_count} ) else: return create_api_response( code="404", message="该用户当前不在线或未找到 token" ) except Exception as e: print(f"踢出用户失败: {e}") return create_api_response(code="500", message=f"踢出用户失败: {str(e)}") async def monitor_tasks( task_type: str = 'all', status: str = 'all', limit: int = 20, current_user=None ): """监控任务进度""" try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) tasks = [] status_condition = _build_status_condition(status) # 转录任务 if task_type in ['all', 'transcription']: query = f""" SELECT t.task_id, 'transcription' as task_type, t.meeting_id, m.title as meeting_title, t.status, t.progress, t.error_message, t.created_at, t.completed_at, u.username as creator_name FROM transcript_tasks t LEFT JOIN meetings m ON t.meeting_id = m.meeting_id LEFT JOIN sys_users u ON m.user_id = u.user_id WHERE 1=1 {status_condition} ORDER BY t.created_at DESC LIMIT %s """ cursor.execute(query, (limit,)) tasks.extend(cursor.fetchall()) # 总结任务 if task_type in ['all', 'summary']: query = f""" SELECT t.task_id, 'summary' as task_type, t.meeting_id, m.title as meeting_title, t.status, t.progress, t.result, t.error_message, t.created_at, t.completed_at, u.username as creator_name FROM llm_tasks t LEFT JOIN meetings m ON t.meeting_id = m.meeting_id LEFT JOIN sys_users u ON m.user_id = u.user_id WHERE 1=1 {status_condition} ORDER BY t.created_at DESC LIMIT %s """ cursor.execute(query, (limit,)) tasks.extend(cursor.fetchall()) # 知识库任务 if task_type in ['all', 'knowledge_base']: query = f""" SELECT t.task_id, 'knowledge_base' as task_type, t.kb_id as meeting_id, k.title as meeting_title, t.status, t.progress, t.error_message, t.created_at, t.updated_at, u.username as creator_name FROM knowledge_base_tasks t LEFT JOIN knowledge_bases k ON t.kb_id = k.kb_id LEFT JOIN sys_users u ON k.creator_id = u.user_id WHERE 1=1 {status_condition} ORDER BY t.created_at DESC LIMIT %s """ cursor.execute(query, (limit,)) tasks.extend(cursor.fetchall()) # 按创建时间排序并限制返回数量 tasks.sort(key=lambda x: x['created_at'], reverse=True) tasks = tasks[:limit] return create_api_response( code="200", message="获取任务监控数据成功", data={"tasks": tasks, "total": len(tasks)} ) except Exception as e: print(f"获取任务监控数据失败: {e}") import traceback traceback.print_exc() return create_api_response(code="500", message=f"获取任务监控数据失败: {str(e)}") def _parse_optional_int(value): if value in (None, "", "None"): return None try: return int(value) except (TypeError, ValueError): return None async def retry_task(task_type: str, task_id: str, current_user=None): """重试或恢复后台任务。""" try: normalized_type = (task_type or "").strip().lower() if normalized_type == "summary": return _retry_summary_task(task_id) if normalized_type == "transcription": return _retry_transcription_task(task_id) return create_api_response(code="400", message="不支持的任务类型") except Exception as e: print(f"重试任务失败: {e}") return create_api_response(code="500", message=f"重试任务失败: {str(e)}") def _retry_summary_task(task_id: str): task_data = async_meeting_service._get_task_from_db(task_id) if not task_data: return create_api_response(code="404", message="总结任务不存在") status = str(task_data.get("status") or "").lower() meeting_id = _parse_optional_int(task_data.get("meeting_id")) if not meeting_id: return create_api_response(code="400", message="总结任务缺少关联会议") if status in {"pending", "processing"}: async_meeting_service._resume_task_if_needed(task_id, task_data) status_info = async_meeting_service.get_task_status(task_id) return create_api_response( code="200", message="总结任务已尝试恢复", data={"task_id": task_id, "status": status_info.get("status"), "progress": status_info.get("progress", 0)}, ) if status == "completed": return create_api_response(code="400", message="总结任务已完成,无需重试") prompt_id = _parse_optional_int(task_data.get("prompt_id")) user_prompt = "" if task_data.get("user_prompt") in (None, "None") else str(task_data.get("user_prompt")) model_code = "" if task_data.get("model_code") in (None, "None") else str(task_data.get("model_code")) if not model_code: redis_task_data = async_meeting_service.redis_client.hgetall(f"llm_task:{task_id}") or {} model_code = redis_task_data.get("model_code") or "" new_task_id, _ = async_meeting_service.enqueue_summary_generation( meeting_id, user_prompt=user_prompt, prompt_id=prompt_id, model_code=model_code or None, ) return create_api_response( code="200", message="总结任务已重新提交", data={"task_id": new_task_id, "previous_task_id": task_id, "status": "pending", "meeting_id": meeting_id}, ) def _retry_transcription_task(task_id: str): task_data = transcription_service._get_task_from_db(task_id) if not task_data: return create_api_response(code="404", message="转录任务不存在") status = str(task_data.get("status") or "").lower() meeting_id = _parse_optional_int(task_data.get("meeting_id")) if not meeting_id: return create_api_response(code="400", message="转录任务缺少关联会议") if status in {"pending", "processing"}: status_info = transcription_service.get_task_status(task_id) return create_api_response( code="200", message="转录任务状态已刷新", data={"task_id": task_id, "status": status_info.get("status"), "progress": status_info.get("progress", 0)}, ) if status == "completed": return create_api_response(code="400", message="转录任务已完成,无需重试") with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) cursor.execute("SELECT file_path FROM audio_files WHERE meeting_id = %s LIMIT 1", (meeting_id,)) audio_file = cursor.fetchone() cursor.execute("SELECT prompt_id FROM meetings WHERE meeting_id = %s LIMIT 1", (meeting_id,)) meeting = cursor.fetchone() cursor.close() if not audio_file or not audio_file.get("file_path"): return create_api_response(code="400", message="会议缺少可用音频,无法重试转录") new_task_id = transcription_service.start_transcription(meeting_id, audio_file["file_path"]) async_meeting_service.enqueue_transcription_monitor( meeting_id, new_task_id, _parse_optional_int((meeting or {}).get("prompt_id")), None, ) return create_api_response( code="200", message="转录任务已重新提交", data={"task_id": new_task_id, "previous_task_id": task_id, "status": "pending", "meeting_id": meeting_id}, ) async def get_system_resources(current_user=None): """获取服务器资源使用情况""" try: import psutil # CPU 使用率 cpu_percent = psutil.cpu_percent(interval=1) cpu_count = psutil.cpu_count() # 内存使用情况 memory = psutil.virtual_memory() memory_total_gb = round(memory.total / BYTES_TO_GB, 2) memory_used_gb = round(memory.used / BYTES_TO_GB, 2) # 磁盘使用情况 disk = psutil.disk_usage('/') disk_total_gb = round(disk.total / BYTES_TO_GB, 2) disk_used_gb = round(disk.used / BYTES_TO_GB, 2) resources = { "cpu": { "percent": cpu_percent, "count": cpu_count }, "memory": { "total_gb": memory_total_gb, "used_gb": memory_used_gb, "percent": memory.percent }, "disk": { "total_gb": disk_total_gb, "used_gb": disk_used_gb, "percent": disk.percent }, "timestamp": datetime.now().isoformat() } return create_api_response(code="200", message="获取系统资源成功", data=resources) except ImportError: return create_api_response( code="500", message="psutil 库未安装,请运行: pip install psutil" ) except Exception as e: print(f"获取系统资源失败: {e}") return create_api_response(code="500", message=f"获取系统资源失败: {str(e)}") async def get_user_stats(current_user=None): """获取用户统计列表""" try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) # 查询所有用户及其会议统计和最后登录时间(排除没有会议的用户) query = """ SELECT u.user_id, u.username, u.caption, u.created_at, (SELECT MAX(created_at) FROM user_logs WHERE user_id = u.user_id AND action_type = 'login') as last_login_time, COUNT(DISTINCT m.meeting_id) as meeting_count, COALESCE(SUM(af.duration), 0) as total_duration_seconds FROM sys_users u INNER JOIN meetings m ON u.user_id = m.user_id LEFT JOIN audio_files af ON m.meeting_id = af.meeting_id GROUP BY u.user_id, u.username, u.caption, u.created_at HAVING meeting_count > 0 ORDER BY u.user_id ASC """ cursor.execute(query) users = cursor.fetchall() # 格式化返回数据 users_list = [] for user in users: total_seconds = int(user['total_duration_seconds']) if user['total_duration_seconds'] else 0 hours = total_seconds // 3600 minutes = (total_seconds % 3600) // 60 users_list.append({ 'user_id': user['user_id'], 'username': user['username'], 'caption': user['caption'], 'created_at': user['created_at'].isoformat() if user['created_at'] else None, 'last_login_time': user['last_login_time'].isoformat() if user['last_login_time'] else None, 'meeting_count': user['meeting_count'], 'total_duration_seconds': total_seconds, 'total_duration_formatted': f"{hours}h {minutes}m" if total_seconds > 0 else '-' }) return create_api_response( code="200", message="获取用户统计成功", data={"users": users_list, "total": len(users_list)} ) except Exception as e: print(f"获取用户统计失败: {e}") import traceback traceback.print_exc() return create_api_response(code="500", message=f"获取用户统计失败: {str(e)}")