#!/usr/bin/env python3 """HMS 预约排班链路端到端 API 测试""" import urllib.request, json, os, sys, time from datetime import datetime, timedelta from urllib.error import HTTPError BASE = 'http://localhost:3000/api/v1' def log(msg): print(msg, flush=True) def api(method, path, body=None, token=None): url = f'{BASE}{path}' data_bytes = json.dumps(body).encode('utf-8') if body else None headers = {'Content-Type': 'application/json'} if token: headers['Authorization'] = f'Bearer {token}' req = urllib.request.Request(url, data=data_bytes, method=method, headers=headers) try: resp = urllib.request.urlopen(req, timeout=15) raw = resp.read().decode('utf-8') return json.loads(raw), resp.status except HTTPError as e: raw = e.read().decode('utf-8') try: return json.loads(raw), e.code except: return {'raw_error': raw[:300]}, e.code def extract_items(data_obj): """Extract items from various response structures""" if isinstance(data_obj, list): return data_obj if isinstance(data_obj, dict): # Try common field names for key in ['data', 'items', 'records', 'rows']: if key in data_obj: val = data_obj[key] if isinstance(val, list): return val # If data_obj has total but no list field, check all values for v in data_obj.values(): if isinstance(v, list) and len(v) > 0: return v return [] results = [] TOKEN = None DOCTOR_ID = None SCHEDULE_ID = None PATIENT_ID = None APPOINTMENT_ID = None # ================================================================ # STEP 0: 登录 # ================================================================ log('=' * 60) log('[STEP 0] 登录') d, code = api('POST', '/auth/login', {'username': 'admin', 'password': 'Admin@2026'}) if d.get('success') and code == 200: TOKEN = d['data']['access_token'] log(f' PASS | Token length: {len(TOKEN)}') results.append(('登录', 'PASS', code)) else: log(f' FAIL | HTTP {code} | {str(d)[:200]}') results.append(('登录', 'FAIL', code)) sys.exit(1) # ================================================================ # PHASE 1: 医护管理 # ================================================================ log('') log('=' * 60) log('PHASE 1: 医护管理') # 1.1 医护列表 log('[T1.1] GET /health/doctors') d, code = api('GET', '/health/doctors', token=TOKEN) ok = d.get('success', False) and code == 200 if ok: data_obj = d.get('data', {}) total = data_obj.get('total', 0) if isinstance(data_obj, dict) else 0 items = extract_items(data_obj) log(f' PASS | HTTP {code} | total={total} | items_count={len(items)}') if items: DOCTOR_ID = items[0].get('id') for it in items[:3]: log(f' - id={it.get("id","?")[:20]} | name={it.get("name","?")} | dept={it.get("department","?")}') else: log(f' Data structure keys: {list(data_obj.keys()) if isinstance(data_obj, dict) else type(data_obj).__name__}') # Debug: print full structure log(f' Full data (truncated): {json.dumps(data_obj, ensure_ascii=False)[:500]}') else: log(f' FAIL | HTTP {code} | {str(d)[:200]}') results.append(('T1.1 医护列表', 'PASS' if ok else 'FAIL', code)) # 1.2 创建医护(如果没有现成的) if not DOCTOR_ID: log('') log('[T1.2] 创建测试医生 (因为没有现成的)') new_doc = { 'name': f'API测试医生_{int(time.time())}', 'department': '测试科', 'title': '主治医师', 'speciality': '自动化测试', 'phone': '13800000001', 'status': 'active' } d, code = api('POST', '/health/doctors', new_doc, token=TOKEN) if d.get('success') and code in [200, 201]: DOCTOR_ID = d['data'].get('id') log(f' PASS | HTTP {code} | doctor_id={DOCTOR_ID}') else: log(f' Result | HTTP {code} | {str(d)[:300]}') results.append(('T1.2 创建医护', 'PASS' if d.get('success') else 'FAIL', code)) else: # Already have a doctor, create a test one anyway for isolation log('') log('[T1.2] 创建隔离测试医生') new_doc = { 'name': f'E2E测试医生_{int(time.time())}', 'department': '测试科', 'title': '主治医师', 'speciality': 'E2E自动化测试', 'phone': '13800000999', 'status': 'active' } d, code = api('POST', '/health/doctors', new_doc, token=TOKEN) if d.get('success') and code in [200, 201]: DOCTOR_ID = d['data'].get('id') # Use the new one log(f' PASS | HTTP {code} | doctor_id={DOCTOR_ID}') else: log(f' Result | HTTP {code} | {str(d)[:300]}') results.append(('T1.2 创建医护', 'PASS' if d.get('success') else 'FAIL', code)) # 1.3 医护详情 if DOCTOR_ID: log('') log(f'[T1.3] GET /health/doctors/{{id}}') d, code = api('GET', f'/health/doctors/{DOCTOR_ID}', token=TOKEN) ok = d.get('success', False) and code == 200 if ok: doc = d.get('data', {}) log(f' PASS | HTTP {code} | name={doc.get("name")} | dept={doc.get("department")}') else: log(f' FAIL | HTTP {code} | {str(d)[:200]}') results.append(('T1.3 医护详情', 'PASS' if ok else 'FAIL', code)) # 1.4 安全: 无认证 log('') log('[T1.4] 无认证访问 /health/doctors (应 401)') try: req = urllib.request.Request(f'{BASE}/health/doctors') resp = urllib.request.urlopen(req, timeout=10) log(f' FAIL | HTTP {resp.status} (应 401)') results.append(('T1.4 无认证拦截', 'FAIL', resp.status)) except HTTPError as e: ok = e.code == 401 log(f' {"PASS" if ok else "FAIL"} | HTTP {e.code}') results.append(('T1.4 无认证拦截', 'PASS' if ok else 'FAIL', e.code)) # ================================================================ # PHASE 2: 排班管理 # ================================================================ log('') log('=' * 60) log('PHASE 2: 排班管理') # 2.1 排班列表 log('[T2.1] GET /health/doctor-schedules') d, code = api('GET', '/health/doctor-schedules', token=TOKEN) ok = d.get('success', False) and code == 200 if ok: data_obj = d.get('data', {}) total = data_obj.get('total', 0) if isinstance(data_obj, dict) else 0 items = extract_items(data_obj) log(f' PASS | HTTP {code} | total={total} | items_count={len(items)}') if items: for it in items[:3]: log(f' - id={it.get("id","?")[:20]} | date={it.get("schedule_date","?")} | doctor_id={str(it.get("doctor_id","?"))[:20]}') else: log(f' FAIL | HTTP {code} | {str(d)[:200]}') results.append(('T2.1 排班列表', 'PASS' if ok else 'FAIL', code)) # 2.2 创建排班 if DOCTOR_ID: TOMORROW = (datetime.now() + timedelta(days=1)).strftime('%Y-%m-%d') log('') log(f'[T2.2] POST /health/doctor-schedules (创建明天排班)') schedule_data = { 'doctor_id': DOCTOR_ID, 'schedule_date': TOMORROW, 'start_time': '09:00', 'end_time': '17:00', 'max_appointments': 10, 'time_slot_duration': 30 } d, code = api('POST', '/health/doctor-schedules', schedule_data, token=TOKEN) ok = d.get('success', False) and code in [200, 201] if ok: SCHEDULE_ID = d['data'].get('id') log(f' PASS | HTTP {code} | schedule_id={SCHEDULE_ID}') else: log(f' Result | HTTP {code} | {str(d)[:300]}') # Schedule might already exist, search for it d2, code2 = api('GET', '/health/doctor-schedules', token=TOKEN) if d2.get('success'): items2 = extract_items(d2.get('data', {})) for item in items2: if item.get('doctor_id') == DOCTOR_ID: SCHEDULE_ID = item.get('id') log(f' Found existing schedule: {SCHEDULE_ID}') break results.append(('T2.2 创建排班', 'PASS' if ok else 'FAIL', code)) else: results.append(('T2.2 创建排班', 'SKIP', 0)) # ================================================================ # PHASE 3: 患者准备 + 预约管理 # ================================================================ log('') log('=' * 60) log('PHASE 3: 预约管理') # 3.1 获取患者列表 log('[T3.1] GET /health/patients') d, code = api('GET', '/health/patients', token=TOKEN) ok = d.get('success', False) and code == 200 if ok: data_obj = d.get('data', {}) total = data_obj.get('total', 0) if isinstance(data_obj, dict) else 0 items = extract_items(data_obj) log(f' PASS | HTTP {code} | total={total} | items_count={len(items)}') if items: PATIENT_ID = items[0].get('id') for it in items[:3]: log(f' - id={it.get("id","?")[:20]} | name={it.get("name","?")}') else: log(f' Data structure keys: {list(data_obj.keys()) if isinstance(data_obj, dict) else type(data_obj).__name__}') else: log(f' FAIL | HTTP {code} | {str(d)[:200]}') results.append(('T3.1 患者列表', 'PASS' if ok else 'FAIL', code)) # 3.2 创建患者(如果没有现成的) if not PATIENT_ID: log('') log('[T3.2] 创建测试患者') new_patient = { 'name': f'E2E测试患者_{int(time.time())}', 'gender': 'male', 'phone': '13900000999', 'status': 'active' } d, code = api('POST', '/health/patients', new_patient, token=TOKEN) if d.get('success') and code in [200, 201]: PATIENT_ID = d['data'].get('id') log(f' PASS | HTTP {code} | patient_id={PATIENT_ID}') else: log(f' Result | HTTP {code} | {str(d)[:300]}') results.append(('T3.2 创建患者', 'PASS' if d.get('success') else 'FAIL', code)) else: results.append(('T3.2 创建患者', 'SKIP (已有)', 0)) # 3.3 预约列表 log('') log('[T3.3] GET /health/appointments') d, code = api('GET', '/health/appointments', token=TOKEN) ok = d.get('success', False) and code == 200 if ok: data_obj = d.get('data', {}) total = data_obj.get('total', 0) if isinstance(data_obj, dict) else 0 log(f' PASS | HTTP {code} | total={total}') else: log(f' FAIL | HTTP {code} | {str(d)[:200]}') results.append(('T3.3 预约列表', 'PASS' if ok else 'FAIL', code)) # 3.4 创建预约 log('') log(f'[T3.4] POST /health/appointments (创建预约)') log(f' DOCTOR_ID={DOCTOR_ID}, PATIENT_ID={PATIENT_ID}, SCHEDULE_ID={SCHEDULE_ID}') if DOCTOR_ID and PATIENT_ID and SCHEDULE_ID: TOMORROW = (datetime.now() + timedelta(days=1)).strftime('%Y-%m-%d') appt_data = { 'patient_id': PATIENT_ID, 'doctor_id': DOCTOR_ID, 'schedule_id': SCHEDULE_ID, 'appointment_date': TOMORROW, 'start_time': '09:00', 'end_time': '09:30', 'type': 'initial_consultation', 'reason': 'API E2E 测试预约' } d, code = api('POST', '/health/appointments', appt_data, token=TOKEN) ok = d.get('success', False) and code in [200, 201] if ok: APPOINTMENT_ID = d['data'].get('id') log(f' PASS | HTTP {code} | appointment_id={APPOINTMENT_ID}') else: log(f' Result | HTTP {code} | {str(d)[:300]}') results.append(('T3.4 创建预约', 'PASS' if ok else 'FAIL', code)) elif DOCTOR_ID and PATIENT_ID: # Try without schedule_id TOMORROW = (datetime.now() + timedelta(days=1)).strftime('%Y-%m-%d') appt_data = { 'patient_id': PATIENT_ID, 'doctor_id': DOCTOR_ID, 'appointment_date': TOMORROW, 'start_time': '09:00', 'end_time': '09:30', 'type': 'initial_consultation', 'reason': 'API E2E 测试预约(无排班)' } d, code = api('POST', '/health/appointments', appt_data, token=TOKEN) ok = d.get('success', False) and code in [200, 201] if ok: APPOINTMENT_ID = d['data'].get('id') log(f' PASS | HTTP {code} | appointment_id={APPOINTMENT_ID}') else: log(f' Result | HTTP {code} | {str(d)[:300]}') results.append(('T3.4 创建预约(无排班)', 'PASS' if ok else 'FAIL', code)) else: log(' SKIP - 缺少必要 ID') results.append(('T3.4 创建预约', 'SKIP', 0)) # ================================================================ # PHASE 4: 预约状态流转 # ================================================================ log('') log('=' * 60) log('PHASE 4: 预约状态流转') if APPOINTMENT_ID: # 4.1 确认预约 (pending -> confirmed) log(f'[T4.1] PUT /health/appointments/{{id}}/status -> confirmed') d, code = api('PUT', f'/health/appointments/{APPOINTMENT_ID}/status', {'status': 'confirmed'}, token=TOKEN) ok = d.get('success', False) and code == 200 if ok: log(f' PASS | HTTP {code}') else: log(f' FAIL | HTTP {code} | {str(d)[:200]}') results.append(('T4.1 确认预约', 'PASS' if ok else 'FAIL', code)) # 4.2 完成预约 (confirmed -> completed) log('') log(f'[T4.2] PUT /health/appointments/{{id}}/status -> completed') d, code = api('PUT', f'/health/appointments/{APPOINTMENT_ID}/status', {'status': 'completed'}, token=TOKEN) ok = d.get('success', False) and code == 200 if ok: log(f' PASS | HTTP {code}') else: log(f' FAIL | HTTP {code} | {str(d)[:200]}') results.append(('T4.2 完成预约', 'PASS' if ok else 'FAIL', code)) # 4.3 获取预约详情验证最终状态 log('') log(f'[T4.3] GET /health/appointments/{{id}} (验证最终状态)') d, code = api('GET', f'/health/appointments/{APPOINTMENT_ID}', token=TOKEN) ok = d.get('success', False) and code == 200 if ok: appt = d.get('data', {}) final_status = appt.get('status', 'N/A') ok = final_status == 'completed' log(f' {"PASS" if ok else "FAIL"} | HTTP {code} | final_status={final_status}') else: log(f' FAIL | HTTP {code} | {str(d)[:200]}') results.append(('T4.3 最终状态验证', 'PASS' if ok else 'FAIL', code)) # 4.4 尝试非法状态流转 (completed -> confirmed, 应失败) log('') log(f'[T4.4] PUT status=confirmed from completed (应失败)') d, code = api('PUT', f'/health/appointments/{APPOINTMENT_ID}/status', {'status': 'confirmed'}, token=TOKEN) ok = code in [400, 409, 422] log(f' {"PASS" if ok else "FAIL"} | HTTP {code} (预期 4xx)') results.append(('T4.4 非法状态流转拦截', 'PASS' if ok else 'FAIL', code)) else: log(' SKIP - 无预约 ID') for name in ['T4.1 确认预约', 'T4.2 完成预约', 'T4.3 最终状态验证', 'T4.4 非法状态流转拦截']: results.append((name, 'SKIP', 0)) # ================================================================ # PHASE 5: 边界条件和安全测试 # ================================================================ log('') log('=' * 60) log('PHASE 5: 边界条件和安全测试') # 5.1 创建预约 - 缺少必填字段 log('[T5.1] POST /health/appointments (缺少 doctor_id)') dummy_id = '00000000-0000-0000-0000-000000000000' d, code = api('POST', '/health/appointments', { 'patient_id': PATIENT_ID or dummy_id, 'appointment_date': '2026-12-01', 'start_time': '09:00', 'end_time': '09:30' }, token=TOKEN) ok = code in [400, 404, 422] log(f' {"PASS" if ok else "FAIL"} | HTTP {code} (预期 4xx)') results.append(('T5.1 缺少必填字段', 'PASS' if ok else 'FAIL', code)) # 5.2 创建预约 - 无效 UUID log('') log('[T5.2] POST /health/appointments (无效 doctor_id)') d, code = api('POST', '/health/appointments', { 'patient_id': PATIENT_ID or dummy_id, 'doctor_id': 'not-a-uuid', 'appointment_date': '2026-12-01', 'start_time': '09:00', 'end_time': '09:30', 'type': 'initial_consultation' }, token=TOKEN) ok = code in [400, 404, 422] log(f' {"PASS" if ok else "FAIL"} | HTTP {code} (预期 4xx)') results.append(('T5.2 无效 UUID', 'PASS' if ok else 'FAIL', code)) # 5.3 排班日历视图 log('') log('[T5.3] GET /health/doctor-schedules/calendar') d, code = api('GET', '/health/doctor-schedules/calendar', token=TOKEN) ok = d.get('success', False) and code == 200 log(f' {"PASS" if ok else "FAIL"} | HTTP {code}') results.append(('T5.3 排班日历视图', 'PASS' if ok else 'FAIL', code)) # 5.4 无效 Token log('') log('[T5.4] GET /health/doctors (无效 Token)') d, code = api('GET', '/health/doctors', token='invalid-token-xxx') ok = code == 401 log(f' {"PASS" if ok else "FAIL"} | HTTP {code} (预期 401)') results.append(('T5.4 无效 Token', 'PASS' if ok else 'FAIL', code)) # 5.5 SQL 注入测试 log('') log('[T5.5] GET /health/doctors?search=; DROP TABLE') d, code = api('GET', '/health/doctors?search=;%20DROP%20TABLE%20users;%20--', token=TOKEN) ok = code == 200 # Should not crash log(f' {"PASS" if ok else "FAIL"} | HTTP {code} (不应崩溃)') results.append(('T5.5 SQL注入防护', 'PASS' if ok else 'FAIL', code)) # 5.6 XSS 注入测试 log('') log('[T5.6] POST /health/doctors (XSS name)') d, code = api('POST', '/health/doctors', { 'name': '', 'department': '测试科', 'phone': '13800000002', 'status': 'active' }, token=TOKEN) ok = d.get('success', False) and code in [200, 201] if ok: name = d.get('data', {}).get('name', '') has_script = '