- Base platform from base.git (ERP base: auth, core, config, message, workflow, plugin) - Created erp-diary module skeleton (lib.rs, dto.rs, error.rs, event.rs, state.rs) - Integrated erp-diary into workspace and erp-server - Added DiaryModule registration in main.rs - Added DiaryState FromRef in state.rs - Diary routes mounted (empty routes, ready for implementation) - Product design spec v1.2 preserved in docs/ - Implementation plan preserved in plans/ Cargo check: OK Cargo test: OK (78+ base tests passing)
256 lines
10 KiB
Python
256 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""Miniprogram API comprehensive test - 6 endpoints."""
|
|
import json, urllib.request, urllib.error, sys
|
|
|
|
BASE = 'http://localhost:3000/api/v1'
|
|
# Read fresh token from file
|
|
with open('g:/hms/.test_token_fresh.txt') as f:
|
|
TOKEN = f.read().strip()
|
|
PATIENT_ID = '019dcd34-bc4d-72c1-8c19-77ce1f4839d6'
|
|
TENANT_ID = '019d80da-7a2c-7820-b0a3-3d5266a3a324'
|
|
|
|
headers = {
|
|
'Authorization': f'Bearer {TOKEN}',
|
|
'X-Tenant-Id': TENANT_ID,
|
|
'X-Patient-Id': PATIENT_ID,
|
|
'Content-Type': 'application/json'
|
|
}
|
|
|
|
|
|
def api_call(method, path, data=None):
|
|
url = f'{BASE}{path}'
|
|
if data:
|
|
req = urllib.request.Request(url, data=json.dumps(data).encode(), headers=headers, method=method)
|
|
else:
|
|
req = urllib.request.Request(url, headers=headers, method=method)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=10) as resp:
|
|
body = json.loads(resp.read().decode())
|
|
return resp.status, body
|
|
except urllib.error.HTTPError as e:
|
|
body = e.read().decode()
|
|
try:
|
|
return e.code, json.loads(body)
|
|
except Exception:
|
|
return e.code, body
|
|
except Exception as e:
|
|
return 0, str(e)
|
|
|
|
|
|
def get_keys(obj, prefix=''):
|
|
"""Recursively extract field names from a dict."""
|
|
fields = []
|
|
if isinstance(obj, dict):
|
|
for k, v in obj.items():
|
|
full = f'{prefix}{k}' if not prefix else f'{prefix}.{k}'
|
|
if isinstance(v, dict):
|
|
fields.extend(get_keys(v, full))
|
|
else:
|
|
fields.append(full)
|
|
return fields
|
|
|
|
|
|
def print_header(title, api_path):
|
|
print()
|
|
print('=' * 70)
|
|
print(f'{title}')
|
|
print(f'Endpoint: {api_path}')
|
|
print('=' * 70)
|
|
|
|
|
|
def analyze_fields(name, data, expected_fields):
|
|
"""Compare actual vs expected fields."""
|
|
actual_keys = set(data.keys()) if isinstance(data, dict) else set()
|
|
expected_set = set(expected_fields)
|
|
matched = actual_keys & expected_set
|
|
missing = expected_set - actual_keys
|
|
extra = actual_keys - expected_set
|
|
|
|
match_status = "MATCH" if not missing else "MISMATCH"
|
|
print(f'[{name}] status=200 | {match_status}')
|
|
print(f' Actual fields: {sorted(actual_keys)}')
|
|
print(f' Expected fields: {sorted(expected_set)}')
|
|
if matched:
|
|
print(f' Matched: {sorted(matched)}')
|
|
if missing:
|
|
print(f' ** MISSING: {sorted(missing)} **')
|
|
if extra:
|
|
print(f' Extra: {sorted(extra)}')
|
|
return missing
|
|
|
|
|
|
# =====================================================
|
|
# API 1: GET /health/vital-signs/today
|
|
# =====================================================
|
|
print_header('API 1: Today Summary', f'GET /health/vital-signs/today?patient_id={PATIENT_ID}')
|
|
status, body = api_call('GET', f'/health/vital-signs/today?patient_id={PATIENT_ID}')
|
|
print(f'status={status}')
|
|
print(f'Response: {json.dumps(body, indent=2, ensure_ascii=False)}')
|
|
|
|
if status == 200:
|
|
data = body.get('data', body)
|
|
expected = ['blood_pressure', 'heart_rate', 'blood_sugar', 'weight']
|
|
missing = analyze_fields('Today Summary', data, expected)
|
|
# Check nested structure
|
|
for key in ['blood_pressure', 'heart_rate', 'blood_sugar', 'weight']:
|
|
if key in data and isinstance(data[key], dict):
|
|
print(f' {key} sub-fields: {sorted(data[key].keys())}')
|
|
else:
|
|
print(f' FAILED: {status}')
|
|
# Try without patient_id
|
|
print(' Retrying without patient_id param...')
|
|
status2, body2 = api_call('GET', '/health/vital-signs/today')
|
|
print(f' status={status2}')
|
|
print(f' Response: {json.dumps(body2, indent=2, ensure_ascii=False)}')
|
|
|
|
# =====================================================
|
|
# API 2: POST /health/patients/{id}/vital-signs
|
|
# =====================================================
|
|
print_header('API 2: Create Vital Signs', f'POST /health/patients/{PATIENT_ID}/vital-signs')
|
|
create_data = {
|
|
'record_date': '2026-04-27',
|
|
'systolic_bp_morning': 130,
|
|
'diastolic_bp_morning': 85,
|
|
'heart_rate': 75,
|
|
'weight': 69.0,
|
|
'blood_sugar': 5.5,
|
|
}
|
|
status, body = api_call('POST', f'/health/patients/{PATIENT_ID}/vital-signs', create_data)
|
|
print(f'status={status}')
|
|
print(f'Response: {json.dumps(body, indent=2, ensure_ascii=False)}')
|
|
|
|
if status in (200, 201):
|
|
data = body.get('data', body)
|
|
actual_keys = set(data.keys()) if isinstance(data, dict) else set()
|
|
expected_keys = {'id', 'patient_id', 'record_date', 'systolic_bp_morning', 'diastolic_bp_morning',
|
|
'heart_rate', 'weight', 'blood_sugar', 'created_at', 'updated_at', 'version', 'tenant_id'}
|
|
analyze_fields('Create Vital Signs', data, expected_keys)
|
|
|
|
# =====================================================
|
|
# API 3: GET /health/patients/{id}/vital-signs (paginated)
|
|
# =====================================================
|
|
print_header('API 3: Vital Signs History', f'GET /health/patients/{PATIENT_ID}/vital-signs?page=1&page_size=5')
|
|
status, body = api_call('GET', f'/health/patients/{PATIENT_ID}/vital-signs?page=1&page_size=5')
|
|
print(f'status={status}')
|
|
print(f'Response: {json.dumps(body, indent=2, ensure_ascii=False)}')
|
|
|
|
if status == 200:
|
|
data = body.get('data', body)
|
|
# Check pagination structure
|
|
if isinstance(body.get('data'), dict) and 'data' in body['data']:
|
|
# wrapped: {data: {data: [...], total: N}}
|
|
paginated = body['data']
|
|
print(f' Pagination structure: data={type(paginated.get("data")).__name__}, total={paginated.get("total")}')
|
|
items = paginated.get('data', [])
|
|
elif isinstance(body.get('data'), list):
|
|
items = body['data']
|
|
print(f' Response is direct array, len={len(items)}')
|
|
elif isinstance(body.get('data'), dict) and 'items' in body.get('data', {}):
|
|
paginated = body['data']
|
|
items = paginated.get('items', [])
|
|
print(f' Pagination via items: total={paginated.get("total")}')
|
|
else:
|
|
items = []
|
|
print(f' Unknown pagination structure')
|
|
|
|
if items and isinstance(items[0], dict):
|
|
actual_keys = set(items[0].keys())
|
|
expected_keys = {'id', 'record_date', 'systolic_bp_morning', 'diastolic_bp_morning',
|
|
'heart_rate', 'weight', 'blood_sugar', 'created_at', 'updated_at', 'patient_id'}
|
|
analyze_fields('Vital Signs Item', items[0], expected_keys)
|
|
|
|
# =====================================================
|
|
# API 4: GET /health/vital-signs/trend (mini trend)
|
|
# =====================================================
|
|
print_header('API 4: Vital Signs Trend', 'GET /health/vital-signs/trend?indicator=blood_pressure&range=7d')
|
|
status, body = api_call('GET', '/health/vital-signs/trend?indicator=blood_pressure&range=7d')
|
|
print(f'status={status}')
|
|
print(f'Response: {json.dumps(body, indent=2, ensure_ascii=False)}')
|
|
|
|
# Also try the task-specified path
|
|
print()
|
|
print(' Also testing: GET /health/patients/{id}/vital-signs/trend')
|
|
status2, body2 = api_call('GET', f'/health/patients/{PATIENT_ID}/vital-signs/trend?start_date=2026-04-20&end_date=2026-04-27')
|
|
print(f' status={status2}')
|
|
print(f' Response: {json.dumps(body2, indent=2, ensure_ascii=False)}')
|
|
|
|
if status == 200:
|
|
data = body.get('data', body)
|
|
print(f' Top-level type: {type(data).__name__}')
|
|
if isinstance(data, dict):
|
|
print(f' Keys: {sorted(data.keys())}')
|
|
elif isinstance(data, list):
|
|
print(f' Array length: {len(data)}')
|
|
if data:
|
|
print(f' First item keys: {sorted(data[0].keys()) if isinstance(data[0], dict) else data[0]}')
|
|
|
|
# =====================================================
|
|
# API 5: GET /health/appointments
|
|
# =====================================================
|
|
print_header('API 5: Appointments List', f'GET /health/appointments?patient_id={PATIENT_ID}&page=1&page_size=5')
|
|
status, body = api_call('GET', f'/health/appointments?patient_id={PATIENT_ID}&page=1&page_size=5')
|
|
print(f'status={status}')
|
|
print(f'Response: {json.dumps(body, indent=2, ensure_ascii=False)}')
|
|
|
|
if status == 200:
|
|
data = body.get('data', body)
|
|
if isinstance(data, dict) and 'data' in data:
|
|
items = data.get('data', [])
|
|
total = data.get('total', 'N/A')
|
|
print(f' Pagination: total={total}, items={len(items)}')
|
|
elif isinstance(data, list):
|
|
items = data
|
|
else:
|
|
items = []
|
|
|
|
if items and isinstance(items[0], dict):
|
|
actual_keys = set(items[0].keys())
|
|
# From miniprogram appointment.ts Appointment interface
|
|
expected_keys = {'id', 'patient_name', 'doctor_name', 'department',
|
|
'appointment_date', 'start_time', 'end_time', 'status', 'version'}
|
|
analyze_fields('Appointment', items[0], expected_keys)
|
|
|
|
# =====================================================
|
|
# API 6: GET /health/follow-up-tasks
|
|
# =====================================================
|
|
print_header('API 6: Follow-Up Tasks', f'GET /health/follow-up-tasks?patient_id={PATIENT_ID}&status=pending&page=1&page_size=5')
|
|
status, body = api_call('GET', f'/health/follow-up-tasks?patient_id={PATIENT_ID}&status=pending&page=1&page_size=5')
|
|
print(f'status={status}')
|
|
print(f'Response: {json.dumps(body, indent=2, ensure_ascii=False)}')
|
|
|
|
if status == 200:
|
|
data = body.get('data', body)
|
|
if isinstance(data, dict) and 'data' in data:
|
|
items = data.get('data', [])
|
|
total = data.get('total', 'N/A')
|
|
print(f' Pagination: total={total}, items={len(items)}')
|
|
elif isinstance(data, list):
|
|
items = data
|
|
else:
|
|
items = []
|
|
|
|
if items and isinstance(items[0], dict):
|
|
actual_keys = set(items[0].keys())
|
|
# From miniprogram followup.ts FollowUpTask interface
|
|
expected_keys = {'id', 'patient_id', 'patient_name', 'follow_up_type',
|
|
'content_template', 'status', 'planned_date', 'version'}
|
|
analyze_fields('FollowUpTask', items[0], expected_keys)
|
|
elif not items:
|
|
print(' No pending tasks. Retrying without status filter...')
|
|
status2, body2 = api_call('GET', f'/health/follow-up-tasks?patient_id={PATIENT_ID}&page=1&page_size=5')
|
|
print(f' status={status2}')
|
|
data2 = body2.get('data', body2) if status2 == 200 else {}
|
|
if isinstance(data2, dict) and 'data' in data2:
|
|
items2 = data2.get('data', [])
|
|
print(f' Items without status filter: {len(items2)}')
|
|
if items2 and isinstance(items2[0], dict):
|
|
actual_keys = set(items2[0].keys())
|
|
expected_keys = {'id', 'patient_id', 'patient_name', 'follow_up_type',
|
|
'content_template', 'status', 'planned_date', 'version'}
|
|
analyze_fields('FollowUpTask (all)', items2[0], expected_keys)
|
|
|
|
print()
|
|
print('=' * 70)
|
|
print('TEST COMPLETE')
|
|
print('=' * 70)
|