Files
erp/test_api_auth.py
iven 841766b168
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled
fix(用户管理): 修复用户列表页面加载失败问题
修复用户列表页面加载失败导致测试超时的问题,确保页面元素正确渲染
2026-04-19 08:46:28 +08:00

581 lines
24 KiB
Python

#!/usr/bin/env python3
"""ERP Auth Module API Integration Test Suite"""
import urllib.request
import json
import time
import sys
import os
BASE = "http://localhost:3000"
def api(method, path, data=None, tok=None):
url = BASE + path
headers = {"Content-Type": "application/json"}
if tok:
headers["Authorization"] = "Bearer " + tok
body = json.dumps(data).encode("utf-8") if data else None
req = urllib.request.Request(url, data=body, headers=headers, method=method)
start = int(time.time() * 1000)
try:
with urllib.request.urlopen(req) as resp:
elapsed = int(time.time() * 1000) - start
return resp.status, json.loads(resp.read().decode("utf-8")), elapsed
except urllib.error.HTTPError as e:
elapsed = int(time.time() * 1000) - start
try:
rbody = json.loads(e.read().decode("utf-8"))
except Exception:
rbody = {"raw": str(e)}
return e.code, rbody, elapsed
except Exception as e:
elapsed = int(time.time() * 1000) - start
return 0, {"error": str(e)}, elapsed
results = []
created_resources = {"users": [], "roles": [], "organizations": [], "departments": [], "positions": []}
def log(tid, name, code, ms, detail, expect_pass=True):
if expect_pass:
status = "PASS" if 200 <= code < 300 else "FAIL"
else:
status = "PASS" if code >= 400 else "FAIL"
line = f" [{status}] {tid} {name}: HTTP {code}, {ms}ms | {detail[:120]}"
print(line)
results.append({"id": tid, "name": name, "status": status, "code": code, "ms": ms, "detail": detail})
def login():
code, body, ms = api("POST", "/api/v1/auth/login", {
"username": "admin", "password": "Admin@2026"
})
if code == 200:
return body["data"]["access_token"], body["data"]["refresh_token"]
else:
print(f"LOGIN FAILED: {code} {body}")
sys.exit(1)
# ============================================================
print("=" * 70)
print(" ERP Auth Module API Integration Test Report")
print("=" * 70)
# Login first
print("\n[Auth] Logging in as admin...")
token, refresh_token = login()
log("AUTH-01", "POST /auth/login", 200, 0, "Obtained access + refresh tokens")
print(f" Token length: {len(token)}, Refresh length: {len(refresh_token)}")
# ============================================================
# PART 1: Auth Endpoints
# ============================================================
print("\n" + "-" * 70)
print("PART 1: Auth Endpoints (Public + Protected)")
print("-" * 70)
# 1.1 Login with wrong password
code, body, ms = api("POST", "/api/v1/auth/login", {"username": "admin", "password": "wrong"})
log("1.1", "Login - wrong password", code, ms, f"Expected 401, got {code}", expect_pass=False)
# 1.2 Login with missing fields
code, body, ms = api("POST", "/api/v1/auth/login", {"username": "admin"})
log("1.2", "Login - missing password field", code, ms, f"Expected 422, got {code}", expect_pass=False)
# 1.3 Login with empty body
code, body, ms = api("POST", "/api/v1/auth/login", {})
log("1.3", "Login - empty body", code, ms, f"Expected 422, got {code}", expect_pass=False)
# 1.4 SQL injection attempt
code, body, ms = api("POST", "/api/v1/auth/login", {"username": "admin' OR 1=1 --", "password": "x"})
log("1.4", "Login - SQL injection attempt", code, ms, f"Expected 401, got {code}", expect_pass=False)
# 1.5 Refresh token
code, body, ms = api("POST", "/api/v1/auth/refresh", {"refresh_token": refresh_token})
if code == 200:
new_refresh = body["data"]["refresh_token"]
log("1.5", "Refresh token - valid", code, ms, f"Got new tokens, expires_in={body['data'].get('expires_in')}")
else:
new_refresh = refresh_token
log("1.5", "Refresh token - valid", code, ms, f"FAIL: {body}", expect_pass=False)
# 1.6 Reuse old refresh token
code, body, ms = api("POST", "/api/v1/auth/refresh", {"refresh_token": refresh_token})
if code != 200:
log("1.6", "Refresh - reuse old token", code, ms, f"Correctly rejected old token (HTTP {code})", expect_pass=False)
else:
log("1.6", "Refresh - reuse old token", code, ms, f"SECURITY: Old refresh token still accepted!", expect_pass=False)
# 1.7 Invalid refresh token
code, body, ms = api("POST", "/api/v1/auth/refresh", {"refresh_token": "invalid.token"})
log("1.7", "Refresh - invalid token", code, ms, f"Expected 401, got {code}", expect_pass=False)
# 1.8 Logout without token
code, body, ms = api("POST", "/api/v1/auth/logout")
log("1.8", "Logout - no token", code, ms, f"Expected 401, got {code}", expect_pass=False)
# ============================================================
# PART 2: User Management
# ============================================================
print("\n" + "-" * 70)
print("PART 2: User Management (CRUD)")
print("-" * 70)
# 2.1 GET /users list
code, body, ms = api("GET", "/api/v1/users", tok=token)
data = body.get("data", {})
log("2.1", "GET /users - list all", code, ms,
f"total={data.get('total')}, page={data.get('page')}, items={len(data.get('items', []))}")
# 2.2 GET /users with pagination
code, body, ms = api("GET", "/api/v1/users?page=1&page_size=1", tok=token)
data = body.get("data", {})
log("2.2", "GET /users?page=1&page_size=1", code, ms,
f"total={data.get('total')}, page={data.get('page')}, page_size={data.get('page_size')}, items={len(data.get('items',[]))}")
# 2.3 Create user
code, body, ms = api("POST", "/api/v1/users", {
"username": "test_user_api",
"password": "Test@2026!",
"display_name": "API Test User",
"email": "test@api.com",
"phone": "13800138000"
}, tok=token)
ud = body.get("data", {})
test_user_id = ud.get("id", "") if ud else ""
if test_user_id:
created_resources["users"].append(test_user_id)
pw_leaked = "password" in ud
log("2.3", "POST /users - create user", code, ms,
f"id={test_user_id[:20]}..., pw_in_resp={pw_leaked}, status={ud.get('status')}, version={ud.get('version')}")
else:
log("2.3", "POST /users - create user", code, ms, f"FAIL: {json.dumps(body, ensure_ascii=False)[:200]}", expect_pass=False)
# 2.4 Duplicate username
code, body, ms = api("POST", "/api/v1/users", {
"username": "test_user_api",
"password": "Test@2026!",
"display_name": "Dup"
}, tok=token)
log("2.4", "POST /users - duplicate username", code, ms, f"Expected 400/409, got {code}", expect_pass=False)
# 2.5 Missing required fields
code, body, ms = api("POST", "/api/v1/users", {"display_name": "No user"}, tok=token)
log("2.5", "POST /users - missing fields", code, ms, f"Expected 422, got {code}", expect_pass=False)
# 2.6 Short password
code, body, ms = api("POST", "/api/v1/users", {
"username": "shortpwd", "password": "12", "display_name": "Short"
}, tok=token)
log("2.6", "POST /users - short password", code, ms, f"Expected validation error, got {code}", expect_pass=False)
if test_user_id:
# 2.7 GET single user
code, body, ms = api("GET", f"/api/v1/users/{test_user_id}", tok=token)
ud = body.get("data", {})
if ud:
pw_leak = "password" in ud
log("2.7", "GET /users/{id}", code, ms,
f"found={ud.get('id')==test_user_id}, pw_leaked={pw_leak}, email={ud.get('email')}")
else:
log("2.7", "GET /users/{id}", code, ms, f"FAIL: {body}", expect_pass=False)
# 2.8 GET nonexistent user
code, body, ms = api("GET", "/api/v1/users/00000000-0000-0000-0000-000000000000", tok=token)
log("2.8", "GET /users/nonexistent", code, ms, f"Expected 404, got {code}", expect_pass=False)
# 2.9 PUT - update user
code, body, ms = api("PUT", f"/api/v1/users/{test_user_id}", {
"display_name": "Updated User",
"email": "updated@api.com",
"version": 1
}, tok=token)
ud = body.get("data", {})
if ud:
log("2.9", "PUT /users/{id} - update", code, ms,
f"email={ud.get('email')}, version={ud.get('version')}")
else:
log("2.9", "PUT /users/{id} - update", code, ms, f"FAIL: {body}", expect_pass=False)
# 2.10 Optimistic lock conflict
code, body, ms = api("PUT", f"/api/v1/users/{test_user_id}", {
"display_name": "Conflict", "version": 999
}, tok=token)
log("2.10", "PUT /users/{id} - version conflict", code, ms,
f"Expected 409/400, got {code}", expect_pass=False)
# 2.11 Assign roles
code_r, body_r, _ = api("GET", "/api/v1/roles", tok=token)
roles = body_r.get("data", {}).get("items", [])
if roles:
rid = roles[0]["id"]
code, body, ms = api("POST", f"/api/v1/users/{test_user_id}/roles", {"role_ids": [rid]}, tok=token)
log("2.11", "POST /users/{id}/roles - assign role", code, ms, f"Assigned role {rid[:20]}...")
else:
log("2.11", "POST /users/{id}/roles", 0, 0, "SKIP: no roles available")
else:
print(" [SKIP] Cannot test user GET/PUT/DELETE - creation failed")
# ============================================================
# PART 3: Role Management
# ============================================================
print("\n" + "-" * 70)
print("PART 3: Role Management (CRUD)")
print("-" * 70)
# 3.1 GET /roles list
code, body, ms = api("GET", "/api/v1/roles", tok=token)
data = body.get("data", {})
roles = data.get("items", [])
log("3.1", "GET /roles - list all", code, ms,
f"total={data.get('total')}, items={len(roles)}")
if roles:
print(f" Roles: {', '.join(r.get('code','?') for r in roles)}")
# 3.2 Create role
code, body, ms = api("POST", "/api/v1/roles", {
"name": "Test Role API",
"code": "test_role_api",
"description": "Role created by API test"
}, tok=token)
rd = body.get("data", {})
test_role_id = rd.get("id", "") if rd else ""
if test_role_id:
created_resources["roles"].append(test_role_id)
log("3.2", "POST /roles - create role", code, ms,
f"id={test_role_id[:20]}..., code={rd.get('code')}, is_system={rd.get('is_system')}")
else:
log("3.2", "POST /roles - create role", code, ms, f"FAIL: {json.dumps(body, ensure_ascii=False)[:200]}", expect_pass=False)
# 3.3 Duplicate role code
code, body, ms = api("POST", "/api/v1/roles", {
"name": "Dup Role", "code": "test_role_api"
}, tok=token)
log("3.3", "POST /roles - duplicate code", code, ms, f"Expected 400/409, got {code}", expect_pass=False)
# 3.4 Missing fields
code, body, ms = api("POST", "/api/v1/roles", {"name": "No code"}, tok=token)
log("3.4", "POST /roles - missing fields", code, ms, f"Expected 422, got {code}", expect_pass=False)
if test_role_id:
# 3.5 GET single role
code, body, ms = api("GET", f"/api/v1/roles/{test_role_id}", tok=token)
rd = body.get("data", {})
if rd:
log("3.5", "GET /roles/{id}", code, ms,
f"code={rd.get('code')}, name={rd.get('name')}, is_system={rd.get('is_system')}")
else:
log("3.5", "GET /roles/{id}", code, ms, f"FAIL: {body}", expect_pass=False)
# 3.6 Update role
code, body, ms = api("PUT", f"/api/v1/roles/{test_role_id}", {
"name": "Updated Test Role",
"description": "Updated description",
"version": 1
}, tok=token)
rd = body.get("data", {})
if rd:
log("3.6", "PUT /roles/{id} - update", code, ms,
f"name={rd.get('name')}, version={rd.get('version')}")
else:
log("3.6", "PUT /roles/{id} - update", code, ms, f"FAIL: {body}", expect_pass=False)
# 3.7 GET role permissions
code, body, ms = api("GET", f"/api/v1/roles/{test_role_id}/permissions", tok=token)
perms = body.get("data", [])
log("3.7", "GET /roles/{id}/permissions", code, ms,
f"permissions count={len(perms) if isinstance(perms, list) else 'N/A'}")
# 3.8 Assign permissions to role
code_p, body_p, _ = api("GET", "/api/v1/permissions", tok=token)
all_perms = body_p.get("data", {})
perm_items = all_perms.get("items", []) if isinstance(all_perms, dict) else all_perms
if perm_items and len(perm_items) > 0:
perm_ids = [p["id"] for p in perm_items[:3]]
code, body, ms = api("POST", f"/api/v1/roles/{test_role_id}/permissions", {
"permission_ids": perm_ids
}, tok=token)
log("3.8", "POST /roles/{id}/permissions - assign", code, ms,
f"Assigned {len(perm_ids)} permissions")
else:
log("3.8", "POST /roles/{id}/permissions", 0, 0, "SKIP: no permissions found")
else:
print(" [SKIP] Cannot test role GET/PUT/DELETE - creation failed")
# ============================================================
# PART 4: Permissions
# ============================================================
print("\n" + "-" * 70)
print("PART 4: Permissions")
print("-" * 70)
code, body, ms = api("GET", "/api/v1/permissions", tok=token)
data = body.get("data", {})
if isinstance(data, dict):
perms = data.get("items", [])
total = data.get("total", len(perms))
else:
perms = data if isinstance(data, list) else []
total = len(perms)
log("4.1", "GET /permissions - list all", code, ms, f"total={total}, items={len(perms)}")
if perms:
print(f" Sample: {perms[0].get('code', 'N/A')} - {perms[0].get('name', 'N/A')}")
# ============================================================
# PART 5: Organization Management
# ============================================================
print("\n" + "-" * 70)
print("PART 5: Organization Management (CRUD)")
print("-" * 70)
# 5.1 GET /organizations list
code, body, ms = api("GET", "/api/v1/organizations", tok=token)
data = body.get("data", {})
orgs = data.get("items", []) if isinstance(data, dict) else (data if isinstance(data, list) else [])
log("5.1", "GET /organizations - list", code, ms,
f"total={data.get('total') if isinstance(data, dict) else len(orgs)}, items={len(orgs)}")
# 5.2 Create organization
code, body, ms = api("POST", "/api/v1/organizations", {
"name": "Test Org API",
"code": "TEST_ORG_API",
"description": "Organization created by API test"
}, tok=token)
od = body.get("data", {})
test_org_id = od.get("id", "") if od else ""
if test_org_id:
created_resources["organizations"].append(test_org_id)
log("5.2", "POST /organizations - create", code, ms,
f"id={test_org_id[:20]}..., code={od.get('code')}")
else:
log("5.2", "POST /organizations - create", code, ms, f"FAIL: {json.dumps(body, ensure_ascii=False)[:200]}", expect_pass=False)
# 5.3 Duplicate org code
code, body, ms = api("POST", "/api/v1/organizations", {
"name": "Dup Org", "code": "TEST_ORG_API"
}, tok=token)
log("5.3", "POST /organizations - duplicate code", code, ms, f"Expected error, got {code}", expect_pass=False)
if test_org_id:
# 5.4 Update organization
code, body, ms = api("PUT", f"/api/v1/organizations/{test_org_id}", {
"name": "Updated Org",
"description": "Updated",
"version": 1
}, tok=token)
od = body.get("data", {})
if od:
log("5.4", "PUT /organizations/{id} - update", code, ms,
f"name={od.get('name')}, version={od.get('version')}")
else:
log("5.4", "PUT /organizations/{id} - update", code, ms, f"FAIL: {json.dumps(body, ensure_ascii=False)[:200]}", expect_pass=False)
# ============================================================
# PART 6: Department Management
# ============================================================
print("\n" + "-" * 70)
print("PART 6: Department Management (CRUD)")
print("-" * 70)
if test_org_id:
# 6.1 GET departments under org
code, body, ms = api("GET", f"/api/v1/organizations/{test_org_id}/departments", tok=token)
data = body.get("data", {})
depts = data.get("items", []) if isinstance(data, dict) else (data if isinstance(data, list) else [])
log("6.1", "GET /organizations/{id}/departments", code, ms,
f"items={len(depts) if isinstance(depts, list) else 'N/A'}")
# 6.2 Create department
code, body, ms = api("POST", f"/api/v1/organizations/{test_org_id}/departments", {
"name": "Test Dept API",
"code": "TEST_DEPT_API",
"description": "Department by API test"
}, tok=token)
dd = body.get("data", {})
test_dept_id = dd.get("id", "") if dd else ""
if test_dept_id:
created_resources["departments"].append(test_dept_id)
log("6.2", "POST /organizations/{id}/departments - create", code, ms,
f"id={test_dept_id[:20]}..., code={dd.get('code')}")
else:
log("6.2", "POST .../departments - create", code, ms, f"FAIL: {json.dumps(body, ensure_ascii=False)[:200]}", expect_pass=False)
if test_dept_id:
# 6.3 Update department
code, body, ms = api("PUT", f"/api/v1/departments/{test_dept_id}", {
"name": "Updated Dept",
"description": "Updated",
"version": 1
}, tok=token)
dd = body.get("data", {})
if dd:
log("6.3", "PUT /departments/{id} - update", code, ms,
f"name={dd.get('name')}, version={dd.get('version')}")
else:
log("6.3", "PUT /departments/{id} - update", code, ms, f"FAIL: {json.dumps(body, ensure_ascii=False)[:200]}", expect_pass=False)
# 6.4 GET departments with invalid org
code, body, ms = api("GET", "/api/v1/organizations/00000000-0000-0000-0000-000000000000/departments", tok=token)
data = body.get("data", {})
depts_inv = data.get("items", []) if isinstance(data, dict) else []
log("6.4", "GET /organizations/invalid/departments", code, ms,
f"items={len(depts_inv) if isinstance(depts_inv, list) else 'N/A'}, expected empty")
else:
print(" [SKIP] Cannot test departments - no org created")
test_dept_id = ""
# ============================================================
# PART 7: Position Management
# ============================================================
print("\n" + "-" * 70)
print("PART 7: Position Management (CRUD)")
print("-" * 70)
if test_dept_id:
# 7.1 GET positions under dept
code, body, ms = api("GET", f"/api/v1/departments/{test_dept_id}/positions", tok=token)
data = body.get("data", {})
positions = data.get("items", []) if isinstance(data, dict) else (data if isinstance(data, list) else [])
log("7.1", "GET /departments/{id}/positions", code, ms,
f"items={len(positions) if isinstance(positions, list) else 'N/A'}")
# 7.2 Create position
code, body, ms = api("POST", f"/api/v1/departments/{test_dept_id}/positions", {
"name": "Test Position API",
"code": "TEST_POS_API",
"description": "Position by API test"
}, tok=token)
pd = body.get("data", {})
test_pos_id = pd.get("id", "") if pd else ""
if test_pos_id:
created_resources["positions"].append(test_pos_id)
log("7.2", "POST .../positions - create", code, ms,
f"id={test_pos_id[:20]}..., code={pd.get('code')}")
else:
log("7.2", "POST .../positions - create", code, ms, f"FAIL: {json.dumps(body, ensure_ascii=False)[:200]}", expect_pass=False)
if test_pos_id:
# 7.3 Update position
code, body, ms = api("PUT", f"/api/v1/positions/{test_pos_id}", {
"name": "Updated Position",
"description": "Updated",
"version": 1
}, tok=token)
pd = body.get("data", {})
if pd:
log("7.3", "PUT /positions/{id} - update", code, ms,
f"name={pd.get('name')}, version={pd.get('version')}")
else:
log("7.3", "PUT /positions/{id} - update", code, ms, f"FAIL: {json.dumps(body, ensure_ascii=False)[:200]}", expect_pass=False)
# 7.4 Duplicate position code
code, body, ms = api("POST", f"/api/v1/departments/{test_dept_id}/positions", {
"name": "Dup Pos", "code": "TEST_POS_API"
}, tok=token)
log("7.4", "POST .../positions - duplicate code", code, ms, f"Expected error, got {code}", expect_pass=False)
else:
print(" [SKIP] Cannot test positions - no department created")
# ============================================================
# PART 8: Security & Permission Validation
# ============================================================
print("\n" + "-" * 70)
print("PART 8: Security & Permission Validation")
print("-" * 70)
# 8.1 No token
code, body, ms = api("GET", "/api/v1/users")
log("8.1", "GET /users - no token", code, ms, f"Expected 401, got {code}", expect_pass=False)
# 8.2 Invalid token
code, body, ms = api("GET", "/api/v1/users", tok="invalid.jwt.token")
log("8.2", "GET /users - invalid token", code, ms, f"Expected 401, got {code}", expect_pass=False)
# 8.3 Tampered token
tampered = token[:-5] + "XXXXX"
code, body, ms = api("GET", "/api/v1/users", tok=tampered)
log("8.3", "GET /users - tampered token", code, ms, f"Expected 401, got {code}", expect_pass=False)
# 8.4 Access other tenant data (tenant_id in URL)
code, body, ms = api("GET", "/api/v1/users?tenant_id=00000000-0000-0000-0000-000000000000", tok=token)
log("8.4", "GET /users - cross-tenant attempt", code, ms, f"Should ignore tenant_id param, got {code}")
# 8.5 POST /users - XSS attempt
code, body, ms = api("POST", "/api/v1/users", {
"username": "xss_user",
"password": "Test@2026!",
"display_name": "<script>alert('xss')</script>"
}, tok=token)
if code == 201:
xss_user_id = body.get("data", {}).get("id", "")
if xss_user_id:
created_resources["users"].append(xss_user_id)
log("8.5", "POST /users - XSS payload", code, ms,
f"Created with XSS payload - check if sanitized (id={xss_user_id[:20] if xss_user_id else 'N/A'})")
else:
log("8.5", "POST /users - XSS payload", code, ms, f"Blocked or error: {code}")
# ============================================================
# PART 9: Cleanup - Delete Created Resources
# ============================================================
print("\n" + "-" * 70)
print("PART 9: Cleanup - Deleting Created Resources (Soft Delete)")
print("-" * 70)
# Delete positions
for pid in created_resources["positions"]:
code, body, ms = api("DELETE", f"/api/v1/positions/{pid}", tok=token)
log("9.1", f"DELETE /positions/{pid[:20]}...", code, ms, f"code={code}")
# Delete departments
for did in created_resources["departments"]:
code, body, ms = api("DELETE", f"/api/v1/departments/{did}", tok=token)
log("9.2", f"DELETE /departments/{did[:20]}...", code, ms, f"code={code}")
# Delete test user(s)
for uid in created_resources["users"]:
code, body, ms = api("DELETE", f"/api/v1/users/{uid}", tok=token)
log("9.3", f"DELETE /users/{uid[:20]}...", code, ms, f"code={code}")
# Delete role (skip admin/viewer system roles)
for rid in created_resources["roles"]:
code, body, ms = api("DELETE", f"/api/v1/roles/{rid}", tok=token)
log("9.4", f"DELETE /roles/{rid[:20]}...", code, ms, f"code={code}")
# Delete organizations
for oid in created_resources["organizations"]:
code, body, ms = api("DELETE", f"/api/v1/organizations/{oid}", tok=token)
log("9.5", f"DELETE /organizations/{oid[:20]}...", code, ms, f"code={code}")
# ============================================================
# FINAL SUMMARY
# ============================================================
print("\n" + "=" * 70)
print(" TEST SUMMARY")
print("=" * 70)
total = len(results)
passed = sum(1 for r in results if r["status"] == "PASS")
failed = sum(1 for r in results if r["status"] == "FAIL")
print(f"\n Total Tests: {total}")
print(f" Passed: {passed}")
print(f" Failed: {failed}")
print(f" Pass Rate: {passed/total*100:.1f}%")
# Performance stats
all_ms = [r["ms"] for r in results if r["ms"] > 0]
if all_ms:
avg_ms = sum(all_ms) / len(all_ms)
max_ms = max(all_ms)
min_ms = min(all_ms)
print(f"\n Avg Response: {avg_ms:.0f}ms")
print(f" Max Response: {max_ms}ms")
print(f" Min Response: {min_ms}ms")
# Failed tests detail
if failed > 0:
print(f"\n FAILED TESTS:")
for r in results:
if r["status"] == "FAIL":
print(f" [{r['id']}] {r['name']}: HTTP {r['code']} | {r['detail'][:100]}")
print("\n" + "=" * 70)