imetting_backend/test/test_token_expiration.py

296 lines
11 KiB
Python
Raw Permalink Normal View History

2025-08-29 08:37:29 +00:00
#!/usr/bin/env python3
"""
JWT Token 过期测试脚本
用于测试JWT token的过期撤销机制可以模拟指定用户的token失效
运行方法:
cd /Users/jiliu/工作/projects/imeeting/backend
source venv/bin/activate # 激活虚拟环境
python test/test_token_expiration.py
功能
1. 登录指定用户并获取token
2. 验证token有效性
3. 撤销指定用户的所有token模拟失效
4. 验证撤销后token失效
期望结果在网页上登录的用户执行失效命令后网页会自动登出
"""
import sys
import os
import requests
import json
import time
from datetime import datetime
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
BASE_URL = "http://127.0.0.1:8000"
# 禁用代理以避免本地请求被代理
PROXIES = {'http': None, 'https': None}
def invalidate_user_tokens():
"""模拟指定用户的token失效"""
print("模拟用户Token失效工具")
print("=" * 40)
# 获取要失效的用户名
target_username = input("请输入要失效token的用户名 (默认: mula): ").strip()
if not target_username:
target_username = "mula"
# 获取管理员凭据来执行失效操作
admin_username = input("请输入管理员用户名 (默认: mula): ").strip()
admin_password = input("请输入管理员密码 (默认: 781126): ").strip()
if not admin_username:
admin_username = "mula"
if not admin_password:
admin_password = "781126"
try:
# 1. 管理员登录获取token
print(f"\n步骤1: 管理员登录 ({admin_username})")
admin_login_data = {
"username": admin_username,
"password": admin_password
}
response = requests.post(f"{BASE_URL}/api/auth/login", json=admin_login_data, proxies=PROXIES)
if response.status_code != 200:
print(f"❌ 管理员登录失败")
print(f"状态码: {response.status_code}")
print(f"响应内容: {response.text}")
return
admin_data = response.json()
admin_token = admin_data["token"]
admin_headers = {"Authorization": f"Bearer {admin_token}"}
print(f"✅ 管理员登录成功: {admin_data['username']} ({admin_data['caption']})")
# 2. 如果目标用户不是管理员先登录目标用户验证token存在
if target_username != admin_username:
print(f"\n步骤2: 验证目标用户 ({target_username}) 是否存在")
target_password = input(f"请输入 {target_username} 的密码 (用于验证): ").strip()
if not target_password:
print("❌ 需要提供目标用户的密码来验证")
return
target_login_data = {
"username": target_username,
"password": target_password
}
response = requests.post(f"{BASE_URL}/api/auth/login", json=target_login_data, proxies=PROXIES)
if response.status_code != 200:
print(f"❌ 目标用户登录失败,无法验证用户存在")
return
target_data = response.json()
print(f"✅ 目标用户验证成功: {target_data['username']} ({target_data['caption']})")
target_user_id = target_data['user_id']
else:
target_user_id = admin_data['user_id']
# 3. 撤销目标用户的所有token
print(f"\n步骤3: 撤销用户 {target_username} (ID: {target_user_id}) 的所有token")
# 使用管理员权限调用新的admin API
response = requests.post(f"{BASE_URL}/api/auth/admin/revoke-user-tokens/{target_user_id}",
headers=admin_headers, proxies=PROXIES)
if response.status_code == 200:
result = response.json()
print(f"✅ Token撤销成功: {result.get('message', '已撤销所有token')}")
# 4. 验证token是否真的失效了
print(f"\n步骤4: 验证token失效")
if target_username != admin_username:
# 尝试使用目标用户的token访问protected API
target_token = target_data["token"]
target_headers = {"Authorization": f"Bearer {target_token}"}
response = requests.get(f"{BASE_URL}/api/auth/me", headers=target_headers, proxies=PROXIES)
if response.status_code == 401:
print(f"✅ 验证成功:用户 {target_username} 的token已失效")
else:
print(f"❌ 验证失败:用户 {target_username} 的token仍然有效")
else:
# 如果目标用户就是管理员验证当前管理员token是否失效
response = requests.get(f"{BASE_URL}/api/auth/me", headers=admin_headers, proxies=PROXIES)
if response.status_code == 401:
print(f"✅ 验证成功:用户 {target_username} 的token已失效")
else:
print(f"❌ 验证失败:用户 {target_username} 的token仍然有效")
print(f"\n🌟 操作完成!")
print(f"如果用户 {target_username} 在网页上已登录,现在应该会自动登出。")
print(f"你可以在网页上验证是否自动跳转到登录页面。")
else:
print(f"❌ Token撤销失败: {response.status_code}")
print(f"响应内容: {response.text}")
except requests.exceptions.ConnectionError:
print("❌ 无法连接到后端服务器,请确保服务器正在运行")
except Exception as e:
print(f"❌ 操作失败: {e}")
def test_token_expiration():
"""测试token过期机制"""
print("JWT Token 过期测试")
print("=" * 40)
# 1. 登录获取token
username = input("请输入用户名 (默认: test): ").strip()
password = input("请输入密码 (默认: test): ").strip()
# 使用默认值如果输入为空
if not username:
username = "test"
if not password:
password = "test"
login_data = {
"username": username,
"password": password
}
try:
# 登录
print(f"正在尝试登录用户: {login_data['username']}")
response = requests.post(f"{BASE_URL}/api/auth/login", json=login_data, proxies=PROXIES)
if response.status_code != 200:
print(f"❌ 登录失败")
print(f"状态码: {response.status_code}")
print(f"响应内容: {response.text}")
print(f"请求URL: {BASE_URL}/api/auth/login")
print("请检查:")
print("1. 后端服务是否正在运行")
print("2. 用户名和密码是否正确")
print("3. 数据库连接是否正常")
return
user_data = response.json()
token = user_data["token"]
print(f"✅ 登录成功获得token: {token[:20]}...")
# 2. 测试token有效性
headers = {"Authorization": f"Bearer {token}"}
print("\n测试1: 验证token有效性")
response = requests.get(f"{BASE_URL}/api/auth/me", headers=headers, proxies=PROXIES)
if response.status_code == 200:
user_info = response.json()
print(f"✅ Token有效用户: {user_info.get('username')}")
else:
print(f"❌ Token无效: {response.status_code}")
return
# 3. 测试受保护的API
print("\n测试2: 访问受保护的API")
response = requests.get(f"{BASE_URL}/api/meetings", headers=headers, proxies=PROXIES)
if response.status_code == 200:
print("✅ 成功访问会议列表API")
else:
print(f"❌ 访问受保护API失败: {response.status_code}")
# 4. 登出token
print("\n测试3: 登出token")
response = requests.post(f"{BASE_URL}/api/auth/logout", headers=headers, proxies=PROXIES)
if response.status_code == 200:
print("✅ 登出成功")
# 5. 验证登出后token失效
print("\n测试4: 验证登出后token失效")
response = requests.get(f"{BASE_URL}/api/auth/me", headers=headers, proxies=PROXIES)
if response.status_code == 401:
print("✅ Token已失效登出成功")
else:
print(f"❌ Token仍然有效登出失败: {response.status_code}")
else:
print(f"❌ 登出失败: {response.status_code}")
except requests.exceptions.ConnectionError:
print("❌ 无法连接到后端服务器,请确保服务器正在运行")
except Exception as e:
print(f"❌ 测试失败: {e}")
def check_token_format():
"""检查token格式是否为JWT"""
token = input("\n请粘贴JWT token (或按Enter跳过): ").strip()
if not token:
return
print(f"\nJWT格式检查:")
# JWT应该有三个部分用.分隔
parts = token.split('.')
if len(parts) != 3:
print("❌ 不是有效的JWT格式 (应该有3个部分用.分隔)")
return
try:
import base64
import json
# 解码header
header_padding = parts[0] + '=' * (4 - len(parts[0]) % 4)
header = json.loads(base64.urlsafe_b64decode(header_padding))
# 解码payload
payload_padding = parts[1] + '=' * (4 - len(parts[1]) % 4)
payload = json.loads(base64.urlsafe_b64decode(payload_padding))
print("✅ JWT格式有效")
print(f"算法: {header.get('alg')}")
print(f"类型: {header.get('typ')}")
print(f"用户ID: {payload.get('user_id')}")
print(f"用户名: {payload.get('username')}")
if 'exp' in payload:
exp_time = datetime.fromtimestamp(payload['exp'])
print(f"过期时间: {exp_time}")
if datetime.now() > exp_time:
print("❌ Token已过期")
else:
print("✅ Token未过期")
except Exception as e:
print(f"❌ JWT解码失败: {e}")
if __name__ == "__main__":
print("JWT Token 测试工具")
print("=" * 50)
print(f"工作目录: {os.getcwd()}")
print(f"测试脚本路径: {__file__}")
print()
print("请选择功能:")
print("1. 模拟指定用户Token失效 (推荐)")
print("2. 完整Token过期测试")
print("3. JWT格式检查")
choice = input("\n请输入选项 (1-3, 默认: 1): ").strip()
if choice == "2":
test_token_expiration()
check_token_format()
elif choice == "3":
check_token_format()
else:
# 默认选择1
invalidate_user_tokens()
print("\n=== 测试完成 ===")
print("如果测试失败,请检查:")
print("1. 确保后端服务正在运行: python main.py")
print("2. 确保在 backend 目录下运行测试")
print("3. 确保Redis服务正在运行")
print("4. 如果选择了选项1请在网页上验证用户是否自动登出")