imetting/backend/test/test_worker_thread.py

37 lines
823 B
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
测试worker线程是否正常工作
"""
import sys
import os
import time
import threading
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app.services.async_meeting_service import AsyncMeetingService
# 创建服务实例
service = AsyncMeetingService()
# 直接调用处理任务方法测试
print("测试直接调用_process_tasks方法...")
# 设置worker_running为True
service.worker_running = True
# 创建线程并启动
thread = threading.Thread(target=service._process_tasks)
thread.daemon = False # 不设置为daemon确保能看到输出
thread.start()
print(f"线程是否活动: {thread.is_alive()}")
print("等待5秒...")
# 等待一段时间
time.sleep(5)
# 停止worker
service.worker_running = False
thread.join(timeout=10)
print("测试完成")