#!/usr/bin/env python3 """ERP Auth Module API Integration Test Suite""" import urllib.request import json import time import sys import os BASE = "http://localhost:3000" def api(method, path, data=None, tok=None): url = BASE + path headers = {"Content-Type": "application/json"} if tok: headers["Authorization"] = "Bearer " + tok body = json.dumps(data).encode("utf-8") if data else None req = urllib.request.Request(url, data=body, headers=headers, method=method) start = int(time.time() * 1000) try: with urllib.request.urlopen(req) as resp: elapsed = int(time.time() * 1000) - start return resp.status, json.loads(resp.read().decode("utf-8")), elapsed except urllib.error.HTTPError as e: elapsed = int(time.time() * 1000) - start try: rbody = json.loads(e.read().decode("utf-8")) except Exception: rbody = {"raw": str(e)} return e.code, rbody, elapsed except Exception as e: elapsed = int(time.time() * 1000) - start return 0, {"error": str(e)}, elapsed results = [] created_resources = {"users": [], "roles": [], "organizations": [], "departments": [], "positions": []} def log(tid, name, code, ms, detail, expect_pass=True): if expect_pass: status = "PASS" if 200 <= code < 300 else "FAIL" else: status = "PASS" if code >= 400 else "FAIL" line = f" [{status}] {tid} {name}: HTTP {code}, {ms}ms | {detail[:120]}" print(line) results.append({"id": tid, "name": name, "status": status, "code": code, "ms": ms, "detail": detail}) def login(): code, body, ms = api("POST", "/api/v1/auth/login", { "username": "admin", "password": "Admin@2026" }) if code == 200: return body["data"]["access_token"], body["data"]["refresh_token"] else: print(f"LOGIN FAILED: {code} {body}") sys.exit(1) # ============================================================ print("=" * 70) print(" ERP Auth Module API Integration Test Report") print("=" * 70) # Login first print("\n[Auth] Logging in as admin...") token, refresh_token = login() log("AUTH-01", "POST /auth/login", 200, 0, "Obtained access + refresh tokens") print(f" Token length: {len(token)}, Refresh length: {len(refresh_token)}") # ============================================================ # PART 1: Auth Endpoints # ============================================================ print("\n" + "-" * 70) print("PART 1: Auth Endpoints (Public + Protected)") print("-" * 70) # 1.1 Login with wrong password code, body, ms = api("POST", "/api/v1/auth/login", {"username": "admin", "password": "wrong"}) log("1.1", "Login - wrong password", code, ms, f"Expected 401, got {code}", expect_pass=False) # 1.2 Login with missing fields code, body, ms = api("POST", "/api/v1/auth/login", {"username": "admin"}) log("1.2", "Login - missing password field", code, ms, f"Expected 422, got {code}", expect_pass=False) # 1.3 Login with empty body code, body, ms = api("POST", "/api/v1/auth/login", {}) log("1.3", "Login - empty body", code, ms, f"Expected 422, got {code}", expect_pass=False) # 1.4 SQL injection attempt code, body, ms = api("POST", "/api/v1/auth/login", {"username": "admin' OR 1=1 --", "password": "x"}) log("1.4", "Login - SQL injection attempt", code, ms, f"Expected 401, got {code}", expect_pass=False) # 1.5 Refresh token code, body, ms = api("POST", "/api/v1/auth/refresh", {"refresh_token": refresh_token}) if code == 200: new_refresh = body["data"]["refresh_token"] log("1.5", "Refresh token - valid", code, ms, f"Got new tokens, expires_in={body['data'].get('expires_in')}") else: new_refresh = refresh_token log("1.5", "Refresh token - valid", code, ms, f"FAIL: {body}", expect_pass=False) # 1.6 Reuse old refresh token code, body, ms = api("POST", "/api/v1/auth/refresh", {"refresh_token": refresh_token}) if code != 200: log("1.6", "Refresh - reuse old token", code, ms, f"Correctly rejected old token (HTTP {code})", expect_pass=False) else: log("1.6", "Refresh - reuse old token", code, ms, f"SECURITY: Old refresh token still accepted!", expect_pass=False) # 1.7 Invalid refresh token code, body, ms = api("POST", "/api/v1/auth/refresh", {"refresh_token": "invalid.token"}) log("1.7", "Refresh - invalid token", code, ms, f"Expected 401, got {code}", expect_pass=False) # 1.8 Logout without token code, body, ms = api("POST", "/api/v1/auth/logout") log("1.8", "Logout - no token", code, ms, f"Expected 401, got {code}", expect_pass=False) # ============================================================ # PART 2: User Management # ============================================================ print("\n" + "-" * 70) print("PART 2: User Management (CRUD)") print("-" * 70) # 2.1 GET /users list code, body, ms = api("GET", "/api/v1/users", tok=token) data = body.get("data", {}) log("2.1", "GET /users - list all", code, ms, f"total={data.get('total')}, page={data.get('page')}, items={len(data.get('items', []))}") # 2.2 GET /users with pagination code, body, ms = api("GET", "/api/v1/users?page=1&page_size=1", tok=token) data = body.get("data", {}) log("2.2", "GET /users?page=1&page_size=1", code, ms, f"total={data.get('total')}, page={data.get('page')}, page_size={data.get('page_size')}, items={len(data.get('items',[]))}") # 2.3 Create user code, body, ms = api("POST", "/api/v1/users", { "username": "test_user_api", "password": "Test@2026!", "display_name": "API Test User", "email": "test@api.com", "phone": "13800138000" }, tok=token) ud = body.get("data", {}) test_user_id = ud.get("id", "") if ud else "" if test_user_id: created_resources["users"].append(test_user_id) pw_leaked = "password" in ud log("2.3", "POST /users - create user", code, ms, f"id={test_user_id[:20]}..., pw_in_resp={pw_leaked}, status={ud.get('status')}, version={ud.get('version')}") else: log("2.3", "POST /users - create user", code, ms, f"FAIL: {json.dumps(body, ensure_ascii=False)[:200]}", expect_pass=False) # 2.4 Duplicate username code, body, ms = api("POST", "/api/v1/users", { "username": "test_user_api", "password": "Test@2026!", "display_name": "Dup" }, tok=token) log("2.4", "POST /users - duplicate username", code, ms, f"Expected 400/409, got {code}", expect_pass=False) # 2.5 Missing required fields code, body, ms = api("POST", "/api/v1/users", {"display_name": "No user"}, tok=token) log("2.5", "POST /users - missing fields", code, ms, f"Expected 422, got {code}", expect_pass=False) # 2.6 Short password code, body, ms = api("POST", "/api/v1/users", { "username": "shortpwd", "password": "12", "display_name": "Short" }, tok=token) log("2.6", "POST /users - short password", code, ms, f"Expected validation error, got {code}", expect_pass=False) if test_user_id: # 2.7 GET single user code, body, ms = api("GET", f"/api/v1/users/{test_user_id}", tok=token) ud = body.get("data", {}) if ud: pw_leak = "password" in ud log("2.7", "GET /users/{id}", code, ms, f"found={ud.get('id')==test_user_id}, pw_leaked={pw_leak}, email={ud.get('email')}") else: log("2.7", "GET /users/{id}", code, ms, f"FAIL: {body}", expect_pass=False) # 2.8 GET nonexistent user code, body, ms = api("GET", "/api/v1/users/00000000-0000-0000-0000-000000000000", tok=token) log("2.8", "GET /users/nonexistent", code, ms, f"Expected 404, got {code}", expect_pass=False) # 2.9 PUT - update user code, body, ms = api("PUT", f"/api/v1/users/{test_user_id}", { "display_name": "Updated User", "email": "updated@api.com", "version": 1 }, tok=token) ud = body.get("data", {}) if ud: log("2.9", "PUT /users/{id} - update", code, ms, f"email={ud.get('email')}, version={ud.get('version')}") else: log("2.9", "PUT /users/{id} - update", code, ms, f"FAIL: {body}", expect_pass=False) # 2.10 Optimistic lock conflict code, body, ms = api("PUT", f"/api/v1/users/{test_user_id}", { "display_name": "Conflict", "version": 999 }, tok=token) log("2.10", "PUT /users/{id} - version conflict", code, ms, f"Expected 409/400, got {code}", expect_pass=False) # 2.11 Assign roles code_r, body_r, _ = api("GET", "/api/v1/roles", tok=token) roles = body_r.get("data", {}).get("items", []) if roles: rid = roles[0]["id"] code, body, ms = api("POST", f"/api/v1/users/{test_user_id}/roles", {"role_ids": [rid]}, tok=token) log("2.11", "POST /users/{id}/roles - assign role", code, ms, f"Assigned role {rid[:20]}...") else: log("2.11", "POST /users/{id}/roles", 0, 0, "SKIP: no roles available") else: print(" [SKIP] Cannot test user GET/PUT/DELETE - creation failed") # ============================================================ # PART 3: Role Management # ============================================================ print("\n" + "-" * 70) print("PART 3: Role Management (CRUD)") print("-" * 70) # 3.1 GET /roles list code, body, ms = api("GET", "/api/v1/roles", tok=token) data = body.get("data", {}) roles = data.get("items", []) log("3.1", "GET /roles - list all", code, ms, f"total={data.get('total')}, items={len(roles)}") if roles: print(f" Roles: {', '.join(r.get('code','?') for r in roles)}") # 3.2 Create role code, body, ms = api("POST", "/api/v1/roles", { "name": "Test Role API", "code": "test_role_api", "description": "Role created by API test" }, tok=token) rd = body.get("data", {}) test_role_id = rd.get("id", "") if rd else "" if test_role_id: created_resources["roles"].append(test_role_id) log("3.2", "POST /roles - create role", code, ms, f"id={test_role_id[:20]}..., code={rd.get('code')}, is_system={rd.get('is_system')}") else: log("3.2", "POST /roles - create role", code, ms, f"FAIL: {json.dumps(body, ensure_ascii=False)[:200]}", expect_pass=False) # 3.3 Duplicate role code code, body, ms = api("POST", "/api/v1/roles", { "name": "Dup Role", "code": "test_role_api" }, tok=token) log("3.3", "POST /roles - duplicate code", code, ms, f"Expected 400/409, got {code}", expect_pass=False) # 3.4 Missing fields code, body, ms = api("POST", "/api/v1/roles", {"name": "No code"}, tok=token) log("3.4", "POST /roles - missing fields", code, ms, f"Expected 422, got {code}", expect_pass=False) if test_role_id: # 3.5 GET single role code, body, ms = api("GET", f"/api/v1/roles/{test_role_id}", tok=token) rd = body.get("data", {}) if rd: log("3.5", "GET /roles/{id}", code, ms, f"code={rd.get('code')}, name={rd.get('name')}, is_system={rd.get('is_system')}") else: log("3.5", "GET /roles/{id}", code, ms, f"FAIL: {body}", expect_pass=False) # 3.6 Update role code, body, ms = api("PUT", f"/api/v1/roles/{test_role_id}", { "name": "Updated Test Role", "description": "Updated description", "version": 1 }, tok=token) rd = body.get("data", {}) if rd: log("3.6", "PUT /roles/{id} - update", code, ms, f"name={rd.get('name')}, version={rd.get('version')}") else: log("3.6", "PUT /roles/{id} - update", code, ms, f"FAIL: {body}", expect_pass=False) # 3.7 GET role permissions code, body, ms = api("GET", f"/api/v1/roles/{test_role_id}/permissions", tok=token) perms = body.get("data", []) log("3.7", "GET /roles/{id}/permissions", code, ms, f"permissions count={len(perms) if isinstance(perms, list) else 'N/A'}") # 3.8 Assign permissions to role code_p, body_p, _ = api("GET", "/api/v1/permissions", tok=token) all_perms = body_p.get("data", {}) perm_items = all_perms.get("items", []) if isinstance(all_perms, dict) else all_perms if perm_items and len(perm_items) > 0: perm_ids = [p["id"] for p in perm_items[:3]] code, body, ms = api("POST", f"/api/v1/roles/{test_role_id}/permissions", { "permission_ids": perm_ids }, tok=token) log("3.8", "POST /roles/{id}/permissions - assign", code, ms, f"Assigned {len(perm_ids)} permissions") else: log("3.8", "POST /roles/{id}/permissions", 0, 0, "SKIP: no permissions found") else: print(" [SKIP] Cannot test role GET/PUT/DELETE - creation failed") # ============================================================ # PART 4: Permissions # ============================================================ print("\n" + "-" * 70) print("PART 4: Permissions") print("-" * 70) code, body, ms = api("GET", "/api/v1/permissions", tok=token) data = body.get("data", {}) if isinstance(data, dict): perms = data.get("items", []) total = data.get("total", len(perms)) else: perms = data if isinstance(data, list) else [] total = len(perms) log("4.1", "GET /permissions - list all", code, ms, f"total={total}, items={len(perms)}") if perms: print(f" Sample: {perms[0].get('code', 'N/A')} - {perms[0].get('name', 'N/A')}") # ============================================================ # PART 5: Organization Management # ============================================================ print("\n" + "-" * 70) print("PART 5: Organization Management (CRUD)") print("-" * 70) # 5.1 GET /organizations list code, body, ms = api("GET", "/api/v1/organizations", tok=token) data = body.get("data", {}) orgs = data.get("items", []) if isinstance(data, dict) else (data if isinstance(data, list) else []) log("5.1", "GET /organizations - list", code, ms, f"total={data.get('total') if isinstance(data, dict) else len(orgs)}, items={len(orgs)}") # 5.2 Create organization code, body, ms = api("POST", "/api/v1/organizations", { "name": "Test Org API", "code": "TEST_ORG_API", "description": "Organization created by API test" }, tok=token) od = body.get("data", {}) test_org_id = od.get("id", "") if od else "" if test_org_id: created_resources["organizations"].append(test_org_id) log("5.2", "POST /organizations - create", code, ms, f"id={test_org_id[:20]}..., code={od.get('code')}") else: log("5.2", "POST /organizations - create", code, ms, f"FAIL: {json.dumps(body, ensure_ascii=False)[:200]}", expect_pass=False) # 5.3 Duplicate org code code, body, ms = api("POST", "/api/v1/organizations", { "name": "Dup Org", "code": "TEST_ORG_API" }, tok=token) log("5.3", "POST /organizations - duplicate code", code, ms, f"Expected error, got {code}", expect_pass=False) if test_org_id: # 5.4 Update organization code, body, ms = api("PUT", f"/api/v1/organizations/{test_org_id}", { "name": "Updated Org", "description": "Updated", "version": 1 }, tok=token) od = body.get("data", {}) if od: log("5.4", "PUT /organizations/{id} - update", code, ms, f"name={od.get('name')}, version={od.get('version')}") else: log("5.4", "PUT /organizations/{id} - update", code, ms, f"FAIL: {json.dumps(body, ensure_ascii=False)[:200]}", expect_pass=False) # ============================================================ # PART 6: Department Management # ============================================================ print("\n" + "-" * 70) print("PART 6: Department Management (CRUD)") print("-" * 70) if test_org_id: # 6.1 GET departments under org code, body, ms = api("GET", f"/api/v1/organizations/{test_org_id}/departments", tok=token) data = body.get("data", {}) depts = data.get("items", []) if isinstance(data, dict) else (data if isinstance(data, list) else []) log("6.1", "GET /organizations/{id}/departments", code, ms, f"items={len(depts) if isinstance(depts, list) else 'N/A'}") # 6.2 Create department code, body, ms = api("POST", f"/api/v1/organizations/{test_org_id}/departments", { "name": "Test Dept API", "code": "TEST_DEPT_API", "description": "Department by API test" }, tok=token) dd = body.get("data", {}) test_dept_id = dd.get("id", "") if dd else "" if test_dept_id: created_resources["departments"].append(test_dept_id) log("6.2", "POST /organizations/{id}/departments - create", code, ms, f"id={test_dept_id[:20]}..., code={dd.get('code')}") else: log("6.2", "POST .../departments - create", code, ms, f"FAIL: {json.dumps(body, ensure_ascii=False)[:200]}", expect_pass=False) if test_dept_id: # 6.3 Update department code, body, ms = api("PUT", f"/api/v1/departments/{test_dept_id}", { "name": "Updated Dept", "description": "Updated", "version": 1 }, tok=token) dd = body.get("data", {}) if dd: log("6.3", "PUT /departments/{id} - update", code, ms, f"name={dd.get('name')}, version={dd.get('version')}") else: log("6.3", "PUT /departments/{id} - update", code, ms, f"FAIL: {json.dumps(body, ensure_ascii=False)[:200]}", expect_pass=False) # 6.4 GET departments with invalid org code, body, ms = api("GET", "/api/v1/organizations/00000000-0000-0000-0000-000000000000/departments", tok=token) data = body.get("data", {}) depts_inv = data.get("items", []) if isinstance(data, dict) else [] log("6.4", "GET /organizations/invalid/departments", code, ms, f"items={len(depts_inv) if isinstance(depts_inv, list) else 'N/A'}, expected empty") else: print(" [SKIP] Cannot test departments - no org created") test_dept_id = "" # ============================================================ # PART 7: Position Management # ============================================================ print("\n" + "-" * 70) print("PART 7: Position Management (CRUD)") print("-" * 70) if test_dept_id: # 7.1 GET positions under dept code, body, ms = api("GET", f"/api/v1/departments/{test_dept_id}/positions", tok=token) data = body.get("data", {}) positions = data.get("items", []) if isinstance(data, dict) else (data if isinstance(data, list) else []) log("7.1", "GET /departments/{id}/positions", code, ms, f"items={len(positions) if isinstance(positions, list) else 'N/A'}") # 7.2 Create position code, body, ms = api("POST", f"/api/v1/departments/{test_dept_id}/positions", { "name": "Test Position API", "code": "TEST_POS_API", "description": "Position by API test" }, tok=token) pd = body.get("data", {}) test_pos_id = pd.get("id", "") if pd else "" if test_pos_id: created_resources["positions"].append(test_pos_id) log("7.2", "POST .../positions - create", code, ms, f"id={test_pos_id[:20]}..., code={pd.get('code')}") else: log("7.2", "POST .../positions - create", code, ms, f"FAIL: {json.dumps(body, ensure_ascii=False)[:200]}", expect_pass=False) if test_pos_id: # 7.3 Update position code, body, ms = api("PUT", f"/api/v1/positions/{test_pos_id}", { "name": "Updated Position", "description": "Updated", "version": 1 }, tok=token) pd = body.get("data", {}) if pd: log("7.3", "PUT /positions/{id} - update", code, ms, f"name={pd.get('name')}, version={pd.get('version')}") else: log("7.3", "PUT /positions/{id} - update", code, ms, f"FAIL: {json.dumps(body, ensure_ascii=False)[:200]}", expect_pass=False) # 7.4 Duplicate position code code, body, ms = api("POST", f"/api/v1/departments/{test_dept_id}/positions", { "name": "Dup Pos", "code": "TEST_POS_API" }, tok=token) log("7.4", "POST .../positions - duplicate code", code, ms, f"Expected error, got {code}", expect_pass=False) else: print(" [SKIP] Cannot test positions - no department created") # ============================================================ # PART 8: Security & Permission Validation # ============================================================ print("\n" + "-" * 70) print("PART 8: Security & Permission Validation") print("-" * 70) # 8.1 No token code, body, ms = api("GET", "/api/v1/users") log("8.1", "GET /users - no token", code, ms, f"Expected 401, got {code}", expect_pass=False) # 8.2 Invalid token code, body, ms = api("GET", "/api/v1/users", tok="invalid.jwt.token") log("8.2", "GET /users - invalid token", code, ms, f"Expected 401, got {code}", expect_pass=False) # 8.3 Tampered token tampered = token[:-5] + "XXXXX" code, body, ms = api("GET", "/api/v1/users", tok=tampered) log("8.3", "GET /users - tampered token", code, ms, f"Expected 401, got {code}", expect_pass=False) # 8.4 Access other tenant data (tenant_id in URL) code, body, ms = api("GET", "/api/v1/users?tenant_id=00000000-0000-0000-0000-000000000000", tok=token) log("8.4", "GET /users - cross-tenant attempt", code, ms, f"Should ignore tenant_id param, got {code}") # 8.5 POST /users - XSS attempt code, body, ms = api("POST", "/api/v1/users", { "username": "xss_user", "password": "Test@2026!", "display_name": "" }, tok=token) if code == 201: xss_user_id = body.get("data", {}).get("id", "") if xss_user_id: created_resources["users"].append(xss_user_id) log("8.5", "POST /users - XSS payload", code, ms, f"Created with XSS payload - check if sanitized (id={xss_user_id[:20] if xss_user_id else 'N/A'})") else: log("8.5", "POST /users - XSS payload", code, ms, f"Blocked or error: {code}") # ============================================================ # PART 9: Cleanup - Delete Created Resources # ============================================================ print("\n" + "-" * 70) print("PART 9: Cleanup - Deleting Created Resources (Soft Delete)") print("-" * 70) # Delete positions for pid in created_resources["positions"]: code, body, ms = api("DELETE", f"/api/v1/positions/{pid}", tok=token) log("9.1", f"DELETE /positions/{pid[:20]}...", code, ms, f"code={code}") # Delete departments for did in created_resources["departments"]: code, body, ms = api("DELETE", f"/api/v1/departments/{did}", tok=token) log("9.2", f"DELETE /departments/{did[:20]}...", code, ms, f"code={code}") # Delete test user(s) for uid in created_resources["users"]: code, body, ms = api("DELETE", f"/api/v1/users/{uid}", tok=token) log("9.3", f"DELETE /users/{uid[:20]}...", code, ms, f"code={code}") # Delete role (skip admin/viewer system roles) for rid in created_resources["roles"]: code, body, ms = api("DELETE", f"/api/v1/roles/{rid}", tok=token) log("9.4", f"DELETE /roles/{rid[:20]}...", code, ms, f"code={code}") # Delete organizations for oid in created_resources["organizations"]: code, body, ms = api("DELETE", f"/api/v1/organizations/{oid}", tok=token) log("9.5", f"DELETE /organizations/{oid[:20]}...", code, ms, f"code={code}") # ============================================================ # FINAL SUMMARY # ============================================================ print("\n" + "=" * 70) print(" TEST SUMMARY") print("=" * 70) total = len(results) passed = sum(1 for r in results if r["status"] == "PASS") failed = sum(1 for r in results if r["status"] == "FAIL") print(f"\n Total Tests: {total}") print(f" Passed: {passed}") print(f" Failed: {failed}") print(f" Pass Rate: {passed/total*100:.1f}%") # Performance stats all_ms = [r["ms"] for r in results if r["ms"] > 0] if all_ms: avg_ms = sum(all_ms) / len(all_ms) max_ms = max(all_ms) min_ms = min(all_ms) print(f"\n Avg Response: {avg_ms:.0f}ms") print(f" Max Response: {max_ms}ms") print(f" Min Response: {min_ms}ms") # Failed tests detail if failed > 0: print(f"\n FAILED TESTS:") for r in results: if r["status"] == "FAIL": print(f" [{r['id']}] {r['name']}: HTTP {r['code']} | {r['detail'][:100]}") print("\n" + "=" * 70)