imetting_backend/test/test_worker_thread.py

37 lines
823 B
Python
Raw Normal View History

2025-09-09 03:35:56 +00:00
#!/usr/bin/env python3
"""
测试worker线程是否正常工作
"""
import sys
import os
import time
import threading
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app.services.async_meeting_service import AsyncMeetingService
2025-09-09 03:35:56 +00:00
# 创建服务实例
service = AsyncMeetingService()
2025-09-09 03:35:56 +00:00
# 直接调用处理任务方法测试
print("测试直接调用_process_tasks方法...")
# 设置worker_running为True
service.worker_running = True
# 创建线程并启动
thread = threading.Thread(target=service._process_tasks)
thread.daemon = False # 不设置为daemon确保能看到输出
thread.start()
print(f"线程是否活动: {thread.is_alive()}")
print("等待5秒...")
# 等待一段时间
time.sleep(5)
# 停止worker
service.worker_running = False
thread.join(timeout=10)
print("测试完成")