Files
base/scripts/e2e_appointment_test.py
iven 3772afd987 chore: 干净 ERP 基座 — 删除 health/ai/wechat 业务代码
删除内容:
- 前端: health/(67文件), ai/(2文件), Copilot, MediaPicker, 相关API/Store/Hook
- 后端: wechat_handler, wechat_service, wechat_user entity, analytics handler, ai_workflow_seed
- 配置: WechatConfig, AppConfig.wechat, AuthState wechat 字段
- 启动: 微信凭据检查块, ensure_ai_workflows() 调用
- 迁移: 新增 m20260613_000170_drop_wechat_users.rs
- 脚本: api_test_health_alert.py, api_test_mp.py, mpsync.sh/ps1
- E2E: health-data page, flows/ 目录

保留: erp-core/auth/workflow/message/config/plugin + 基座前端 + 通用组件
2026-06-13 00:32:50 +08:00

489 lines
18 KiB
Python

#!/usr/bin/env python3
"""HMS 预约排班链路端到端 API 测试"""
import urllib.request, json, os, sys, time
from datetime import datetime, timedelta
from urllib.error import HTTPError
BASE = 'http://localhost:3000/api/v1'
def log(msg):
print(msg, flush=True)
def api(method, path, body=None, token=None):
url = f'{BASE}{path}'
data_bytes = json.dumps(body).encode('utf-8') if body else None
headers = {'Content-Type': 'application/json'}
if token:
headers['Authorization'] = f'Bearer {token}'
req = urllib.request.Request(url, data=data_bytes, method=method, headers=headers)
try:
resp = urllib.request.urlopen(req, timeout=15)
raw = resp.read().decode('utf-8')
return json.loads(raw), resp.status
except HTTPError as e:
raw = e.read().decode('utf-8')
try:
return json.loads(raw), e.code
except:
return {'raw_error': raw[:300]}, e.code
def extract_items(data_obj):
"""Extract items from various response structures"""
if isinstance(data_obj, list):
return data_obj
if isinstance(data_obj, dict):
# Try common field names
for key in ['data', 'items', 'records', 'rows']:
if key in data_obj:
val = data_obj[key]
if isinstance(val, list):
return val
# If data_obj has total but no list field, check all values
for v in data_obj.values():
if isinstance(v, list) and len(v) > 0:
return v
return []
results = []
TOKEN = None
DOCTOR_ID = None
SCHEDULE_ID = None
PATIENT_ID = None
APPOINTMENT_ID = None
# ================================================================
# STEP 0: 登录
# ================================================================
log('=' * 60)
log('[STEP 0] 登录')
d, code = api('POST', '/auth/login', {'username': 'admin', 'password': 'Admin@2026'})
if d.get('success') and code == 200:
TOKEN = d['data']['access_token']
log(f' PASS | Token length: {len(TOKEN)}')
results.append(('登录', 'PASS', code))
else:
log(f' FAIL | HTTP {code} | {str(d)[:200]}')
results.append(('登录', 'FAIL', code))
sys.exit(1)
# ================================================================
# PHASE 1: 医护管理
# ================================================================
log('')
log('=' * 60)
log('PHASE 1: 医护管理')
# 1.1 医护列表
log('[T1.1] GET /health/doctors')
d, code = api('GET', '/health/doctors', token=TOKEN)
ok = d.get('success', False) and code == 200
if ok:
data_obj = d.get('data', {})
total = data_obj.get('total', 0) if isinstance(data_obj, dict) else 0
items = extract_items(data_obj)
log(f' PASS | HTTP {code} | total={total} | items_count={len(items)}')
if items:
DOCTOR_ID = items[0].get('id')
for it in items[:3]:
log(f' - id={it.get("id","?")[:20]} | name={it.get("name","?")} | dept={it.get("department","?")}')
else:
log(f' Data structure keys: {list(data_obj.keys()) if isinstance(data_obj, dict) else type(data_obj).__name__}')
# Debug: print full structure
log(f' Full data (truncated): {json.dumps(data_obj, ensure_ascii=False)[:500]}')
else:
log(f' FAIL | HTTP {code} | {str(d)[:200]}')
results.append(('T1.1 医护列表', 'PASS' if ok else 'FAIL', code))
# 1.2 创建医护(如果没有现成的)
if not DOCTOR_ID:
log('')
log('[T1.2] 创建测试医生 (因为没有现成的)')
new_doc = {
'name': f'API测试医生_{int(time.time())}',
'department': '测试科',
'title': '主治医师',
'speciality': '自动化测试',
'phone': '13800000001',
'status': 'active'
}
d, code = api('POST', '/health/doctors', new_doc, token=TOKEN)
if d.get('success') and code in [200, 201]:
DOCTOR_ID = d['data'].get('id')
log(f' PASS | HTTP {code} | doctor_id={DOCTOR_ID}')
else:
log(f' Result | HTTP {code} | {str(d)[:300]}')
results.append(('T1.2 创建医护', 'PASS' if d.get('success') else 'FAIL', code))
else:
# Already have a doctor, create a test one anyway for isolation
log('')
log('[T1.2] 创建隔离测试医生')
new_doc = {
'name': f'E2E测试医生_{int(time.time())}',
'department': '测试科',
'title': '主治医师',
'speciality': 'E2E自动化测试',
'phone': '13800000999',
'status': 'active'
}
d, code = api('POST', '/health/doctors', new_doc, token=TOKEN)
if d.get('success') and code in [200, 201]:
DOCTOR_ID = d['data'].get('id') # Use the new one
log(f' PASS | HTTP {code} | doctor_id={DOCTOR_ID}')
else:
log(f' Result | HTTP {code} | {str(d)[:300]}')
results.append(('T1.2 创建医护', 'PASS' if d.get('success') else 'FAIL', code))
# 1.3 医护详情
if DOCTOR_ID:
log('')
log(f'[T1.3] GET /health/doctors/{{id}}')
d, code = api('GET', f'/health/doctors/{DOCTOR_ID}', token=TOKEN)
ok = d.get('success', False) and code == 200
if ok:
doc = d.get('data', {})
log(f' PASS | HTTP {code} | name={doc.get("name")} | dept={doc.get("department")}')
else:
log(f' FAIL | HTTP {code} | {str(d)[:200]}')
results.append(('T1.3 医护详情', 'PASS' if ok else 'FAIL', code))
# 1.4 安全: 无认证
log('')
log('[T1.4] 无认证访问 /health/doctors (应 401)')
try:
req = urllib.request.Request(f'{BASE}/health/doctors')
resp = urllib.request.urlopen(req, timeout=10)
log(f' FAIL | HTTP {resp.status} (应 401)')
results.append(('T1.4 无认证拦截', 'FAIL', resp.status))
except HTTPError as e:
ok = e.code == 401
log(f' {"PASS" if ok else "FAIL"} | HTTP {e.code}')
results.append(('T1.4 无认证拦截', 'PASS' if ok else 'FAIL', e.code))
# ================================================================
# PHASE 2: 排班管理
# ================================================================
log('')
log('=' * 60)
log('PHASE 2: 排班管理')
# 2.1 排班列表
log('[T2.1] GET /health/doctor-schedules')
d, code = api('GET', '/health/doctor-schedules', token=TOKEN)
ok = d.get('success', False) and code == 200
if ok:
data_obj = d.get('data', {})
total = data_obj.get('total', 0) if isinstance(data_obj, dict) else 0
items = extract_items(data_obj)
log(f' PASS | HTTP {code} | total={total} | items_count={len(items)}')
if items:
for it in items[:3]:
log(f' - id={it.get("id","?")[:20]} | date={it.get("schedule_date","?")} | doctor_id={str(it.get("doctor_id","?"))[:20]}')
else:
log(f' FAIL | HTTP {code} | {str(d)[:200]}')
results.append(('T2.1 排班列表', 'PASS' if ok else 'FAIL', code))
# 2.2 创建排班
if DOCTOR_ID:
TOMORROW = (datetime.now() + timedelta(days=1)).strftime('%Y-%m-%d')
log('')
log(f'[T2.2] POST /health/doctor-schedules (创建明天排班)')
schedule_data = {
'doctor_id': DOCTOR_ID,
'schedule_date': TOMORROW,
'start_time': '09:00',
'end_time': '17:00',
'max_appointments': 10,
'time_slot_duration': 30
}
d, code = api('POST', '/health/doctor-schedules', schedule_data, token=TOKEN)
ok = d.get('success', False) and code in [200, 201]
if ok:
SCHEDULE_ID = d['data'].get('id')
log(f' PASS | HTTP {code} | schedule_id={SCHEDULE_ID}')
else:
log(f' Result | HTTP {code} | {str(d)[:300]}')
# Schedule might already exist, search for it
d2, code2 = api('GET', '/health/doctor-schedules', token=TOKEN)
if d2.get('success'):
items2 = extract_items(d2.get('data', {}))
for item in items2:
if item.get('doctor_id') == DOCTOR_ID:
SCHEDULE_ID = item.get('id')
log(f' Found existing schedule: {SCHEDULE_ID}')
break
results.append(('T2.2 创建排班', 'PASS' if ok else 'FAIL', code))
else:
results.append(('T2.2 创建排班', 'SKIP', 0))
# ================================================================
# PHASE 3: 患者准备 + 预约管理
# ================================================================
log('')
log('=' * 60)
log('PHASE 3: 预约管理')
# 3.1 获取患者列表
log('[T3.1] GET /health/patients')
d, code = api('GET', '/health/patients', token=TOKEN)
ok = d.get('success', False) and code == 200
if ok:
data_obj = d.get('data', {})
total = data_obj.get('total', 0) if isinstance(data_obj, dict) else 0
items = extract_items(data_obj)
log(f' PASS | HTTP {code} | total={total} | items_count={len(items)}')
if items:
PATIENT_ID = items[0].get('id')
for it in items[:3]:
log(f' - id={it.get("id","?")[:20]} | name={it.get("name","?")}')
else:
log(f' Data structure keys: {list(data_obj.keys()) if isinstance(data_obj, dict) else type(data_obj).__name__}')
else:
log(f' FAIL | HTTP {code} | {str(d)[:200]}')
results.append(('T3.1 患者列表', 'PASS' if ok else 'FAIL', code))
# 3.2 创建患者(如果没有现成的)
if not PATIENT_ID:
log('')
log('[T3.2] 创建测试患者')
new_patient = {
'name': f'E2E测试患者_{int(time.time())}',
'gender': 'male',
'phone': '13900000999',
'status': 'active'
}
d, code = api('POST', '/health/patients', new_patient, token=TOKEN)
if d.get('success') and code in [200, 201]:
PATIENT_ID = d['data'].get('id')
log(f' PASS | HTTP {code} | patient_id={PATIENT_ID}')
else:
log(f' Result | HTTP {code} | {str(d)[:300]}')
results.append(('T3.2 创建患者', 'PASS' if d.get('success') else 'FAIL', code))
else:
results.append(('T3.2 创建患者', 'SKIP (已有)', 0))
# 3.3 预约列表
log('')
log('[T3.3] GET /health/appointments')
d, code = api('GET', '/health/appointments', token=TOKEN)
ok = d.get('success', False) and code == 200
if ok:
data_obj = d.get('data', {})
total = data_obj.get('total', 0) if isinstance(data_obj, dict) else 0
log(f' PASS | HTTP {code} | total={total}')
else:
log(f' FAIL | HTTP {code} | {str(d)[:200]}')
results.append(('T3.3 预约列表', 'PASS' if ok else 'FAIL', code))
# 3.4 创建预约
log('')
log(f'[T3.4] POST /health/appointments (创建预约)')
log(f' DOCTOR_ID={DOCTOR_ID}, PATIENT_ID={PATIENT_ID}, SCHEDULE_ID={SCHEDULE_ID}')
if DOCTOR_ID and PATIENT_ID and SCHEDULE_ID:
TOMORROW = (datetime.now() + timedelta(days=1)).strftime('%Y-%m-%d')
appt_data = {
'patient_id': PATIENT_ID,
'doctor_id': DOCTOR_ID,
'schedule_id': SCHEDULE_ID,
'appointment_date': TOMORROW,
'start_time': '09:00',
'end_time': '09:30',
'type': 'initial_consultation',
'reason': 'API E2E 测试预约'
}
d, code = api('POST', '/health/appointments', appt_data, token=TOKEN)
ok = d.get('success', False) and code in [200, 201]
if ok:
APPOINTMENT_ID = d['data'].get('id')
log(f' PASS | HTTP {code} | appointment_id={APPOINTMENT_ID}')
else:
log(f' Result | HTTP {code} | {str(d)[:300]}')
results.append(('T3.4 创建预约', 'PASS' if ok else 'FAIL', code))
elif DOCTOR_ID and PATIENT_ID:
# Try without schedule_id
TOMORROW = (datetime.now() + timedelta(days=1)).strftime('%Y-%m-%d')
appt_data = {
'patient_id': PATIENT_ID,
'doctor_id': DOCTOR_ID,
'appointment_date': TOMORROW,
'start_time': '09:00',
'end_time': '09:30',
'type': 'initial_consultation',
'reason': 'API E2E 测试预约(无排班)'
}
d, code = api('POST', '/health/appointments', appt_data, token=TOKEN)
ok = d.get('success', False) and code in [200, 201]
if ok:
APPOINTMENT_ID = d['data'].get('id')
log(f' PASS | HTTP {code} | appointment_id={APPOINTMENT_ID}')
else:
log(f' Result | HTTP {code} | {str(d)[:300]}')
results.append(('T3.4 创建预约(无排班)', 'PASS' if ok else 'FAIL', code))
else:
log(' SKIP - 缺少必要 ID')
results.append(('T3.4 创建预约', 'SKIP', 0))
# ================================================================
# PHASE 4: 预约状态流转
# ================================================================
log('')
log('=' * 60)
log('PHASE 4: 预约状态流转')
if APPOINTMENT_ID:
# 4.1 确认预约 (pending -> confirmed)
log(f'[T4.1] PUT /health/appointments/{{id}}/status -> confirmed')
d, code = api('PUT', f'/health/appointments/{APPOINTMENT_ID}/status',
{'status': 'confirmed'}, token=TOKEN)
ok = d.get('success', False) and code == 200
if ok:
log(f' PASS | HTTP {code}')
else:
log(f' FAIL | HTTP {code} | {str(d)[:200]}')
results.append(('T4.1 确认预约', 'PASS' if ok else 'FAIL', code))
# 4.2 完成预约 (confirmed -> completed)
log('')
log(f'[T4.2] PUT /health/appointments/{{id}}/status -> completed')
d, code = api('PUT', f'/health/appointments/{APPOINTMENT_ID}/status',
{'status': 'completed'}, token=TOKEN)
ok = d.get('success', False) and code == 200
if ok:
log(f' PASS | HTTP {code}')
else:
log(f' FAIL | HTTP {code} | {str(d)[:200]}')
results.append(('T4.2 完成预约', 'PASS' if ok else 'FAIL', code))
# 4.3 获取预约详情验证最终状态
log('')
log(f'[T4.3] GET /health/appointments/{{id}} (验证最终状态)')
d, code = api('GET', f'/health/appointments/{APPOINTMENT_ID}', token=TOKEN)
ok = d.get('success', False) and code == 200
if ok:
appt = d.get('data', {})
final_status = appt.get('status', 'N/A')
ok = final_status == 'completed'
log(f' {"PASS" if ok else "FAIL"} | HTTP {code} | final_status={final_status}')
else:
log(f' FAIL | HTTP {code} | {str(d)[:200]}')
results.append(('T4.3 最终状态验证', 'PASS' if ok else 'FAIL', code))
# 4.4 尝试非法状态流转 (completed -> confirmed, 应失败)
log('')
log(f'[T4.4] PUT status=confirmed from completed (应失败)')
d, code = api('PUT', f'/health/appointments/{APPOINTMENT_ID}/status',
{'status': 'confirmed'}, token=TOKEN)
ok = code in [400, 409, 422]
log(f' {"PASS" if ok else "FAIL"} | HTTP {code} (预期 4xx)')
results.append(('T4.4 非法状态流转拦截', 'PASS' if ok else 'FAIL', code))
else:
log(' SKIP - 无预约 ID')
for name in ['T4.1 确认预约', 'T4.2 完成预约', 'T4.3 最终状态验证', 'T4.4 非法状态流转拦截']:
results.append((name, 'SKIP', 0))
# ================================================================
# PHASE 5: 边界条件和安全测试
# ================================================================
log('')
log('=' * 60)
log('PHASE 5: 边界条件和安全测试')
# 5.1 创建预约 - 缺少必填字段
log('[T5.1] POST /health/appointments (缺少 doctor_id)')
dummy_id = '00000000-0000-0000-0000-000000000000'
d, code = api('POST', '/health/appointments', {
'patient_id': PATIENT_ID or dummy_id,
'appointment_date': '2026-12-01',
'start_time': '09:00',
'end_time': '09:30'
}, token=TOKEN)
ok = code in [400, 404, 422]
log(f' {"PASS" if ok else "FAIL"} | HTTP {code} (预期 4xx)')
results.append(('T5.1 缺少必填字段', 'PASS' if ok else 'FAIL', code))
# 5.2 创建预约 - 无效 UUID
log('')
log('[T5.2] POST /health/appointments (无效 doctor_id)')
d, code = api('POST', '/health/appointments', {
'patient_id': PATIENT_ID or dummy_id,
'doctor_id': 'not-a-uuid',
'appointment_date': '2026-12-01',
'start_time': '09:00',
'end_time': '09:30',
'type': 'initial_consultation'
}, token=TOKEN)
ok = code in [400, 404, 422]
log(f' {"PASS" if ok else "FAIL"} | HTTP {code} (预期 4xx)')
results.append(('T5.2 无效 UUID', 'PASS' if ok else 'FAIL', code))
# 5.3 排班日历视图
log('')
log('[T5.3] GET /health/doctor-schedules/calendar')
d, code = api('GET', '/health/doctor-schedules/calendar', token=TOKEN)
ok = d.get('success', False) and code == 200
log(f' {"PASS" if ok else "FAIL"} | HTTP {code}')
results.append(('T5.3 排班日历视图', 'PASS' if ok else 'FAIL', code))
# 5.4 无效 Token
log('')
log('[T5.4] GET /health/doctors (无效 Token)')
d, code = api('GET', '/health/doctors', token='invalid-token-xxx')
ok = code == 401
log(f' {"PASS" if ok else "FAIL"} | HTTP {code} (预期 401)')
results.append(('T5.4 无效 Token', 'PASS' if ok else 'FAIL', code))
# 5.5 SQL 注入测试
log('')
log('[T5.5] GET /health/doctors?search=; DROP TABLE')
d, code = api('GET', '/health/doctors?search=;%20DROP%20TABLE%20users;%20--', token=TOKEN)
ok = code == 200 # Should not crash
log(f' {"PASS" if ok else "FAIL"} | HTTP {code} (不应崩溃)')
results.append(('T5.5 SQL注入防护', 'PASS' if ok else 'FAIL', code))
# 5.6 XSS 注入测试
log('')
log('[T5.6] POST /health/doctors (XSS name)')
d, code = api('POST', '/health/doctors', {
'name': '<script>alert(1)</script>',
'department': '测试科',
'phone': '13800000002',
'status': 'active'
}, token=TOKEN)
ok = d.get('success', False) and code in [200, 201]
if ok:
name = d.get('data', {}).get('name', '')
has_script = '<script>' in name
log(f' {"PASS" if not has_script else "WARN"} | HTTP {code} | name contains script: {has_script}')
else:
log(f' Result | HTTP {code} | (可能被校验拦截)')
results.append(('T5.6 XSS防护', 'PASS', code))
# ================================================================
# SUMMARY
# ================================================================
log('')
log('=' * 60)
log('TEST SUMMARY')
log('=' * 60)
pass_count = sum(1 for _, s, _ in results if s == 'PASS')
fail_count = sum(1 for _, s, _ in results if s == 'FAIL')
skip_count = sum(1 for _, s, _ in results if s == 'SKIP')
total_tests = len(results)
for name, status, code in results:
if status == 'PASS':
icon = '+'
elif status == 'FAIL':
icon = 'X'
else:
icon = '-'
log(f' [{icon}] {name} (HTTP {code})')
log('')
log(f'Total: {total_tests} | PASS: {pass_count} | FAIL: {fail_count} | SKIP: {skip_count}')
if total_tests > skip_count:
log(f'Success Rate: {pass_count/(total_tests-skip_count)*100:.1f}% (excluding skipped)')
log('=' * 60)
sys.exit(0 if fail_count == 0 else 1)