- Base platform from base.git (ERP base: auth, core, config, message, workflow, plugin) - Created erp-diary module skeleton (lib.rs, dto.rs, error.rs, event.rs, state.rs) - Integrated erp-diary into workspace and erp-server - Added DiaryModule registration in main.rs - Added DiaryState FromRef in state.rs - Diary routes mounted (empty routes, ready for implementation) - Product design spec v1.2 preserved in docs/ - Implementation plan preserved in plans/ Cargo check: OK Cargo test: OK (78+ base tests passing)
478 lines
17 KiB
Python
478 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
HMS Health Management Platform -- Vital Signs to Alert Pipeline E2E API Test
|
|
|
|
Correct route structure (derived from actual source code):
|
|
- Patients: GET/POST /health/patients
|
|
- Vital Signs: GET/POST /health/patients/{id}/vital-signs
|
|
- Vital Signs DTO: { record_date, systolic_bp_morning, diastolic_bp_morning, heart_rate, ... }
|
|
- Trends: GET /health/patients/{id}/trends/{indicator}
|
|
- Lab Reports: GET/POST /health/patients/{id}/lab-reports
|
|
- Lab DTO: { report_date, report_type, items: [{name, value, unit, reference_low, reference_high, is_abnormal}] }
|
|
- Alerts: GET /health/alerts
|
|
- Alert Rules: GET/POST /health/alert-rules
|
|
- Critical Value Thresholds: GET /health/critical-value-thresholds
|
|
- Critical Alerts: GET /health/critical-alerts
|
|
"""
|
|
import json
|
|
import sys
|
|
import time
|
|
from datetime import datetime, timezone, date
|
|
from urllib.request import Request, urlopen
|
|
from urllib.error import HTTPError
|
|
|
|
BASE_URL = "http://localhost:3000/api/v1"
|
|
results = []
|
|
|
|
|
|
def log_test(category, test_name, passed, detail="", response_code=None, response_time_ms=None):
|
|
status = "PASS" if passed else "FAIL"
|
|
entry = {"category": category, "test_name": test_name, "status": status, "detail": detail}
|
|
if response_code is not None:
|
|
entry["http_code"] = response_code
|
|
if response_time_ms is not None:
|
|
entry["response_time_ms"] = round(response_time_ms, 1)
|
|
results.append(entry)
|
|
icon = "[PASS]" if passed else "[FAIL]"
|
|
rt = f" ({response_time_ms:.0f}ms)" if response_time_ms else ""
|
|
code = f" HTTP {response_code}" if response_code else ""
|
|
print(f" {icon} {test_name}{rt}{code} -- {detail}")
|
|
return passed
|
|
|
|
|
|
def api_call(method, path, data=None, token=None):
|
|
url = f"{BASE_URL}{path}"
|
|
headers = {"Content-Type": "application/json"}
|
|
if token:
|
|
headers["Authorization"] = f"Bearer {token}"
|
|
body = json.dumps(data).encode("utf-8") if data else None
|
|
req = Request(url, data=body, headers=headers, method=method)
|
|
start = time.time()
|
|
try:
|
|
resp = urlopen(req, timeout=30)
|
|
elapsed = (time.time() - start) * 1000
|
|
status_code = resp.status
|
|
resp_body = json.loads(resp.read().decode("utf-8"))
|
|
except HTTPError as e:
|
|
elapsed = (time.time() - start) * 1000
|
|
status_code = e.code
|
|
try:
|
|
resp_body = json.loads(e.read().decode("utf-8"))
|
|
except Exception:
|
|
resp_body = {"error": str(e)}
|
|
except Exception as e:
|
|
elapsed = (time.time() - start) * 1000
|
|
status_code = 0
|
|
resp_body = {"error": str(e)}
|
|
return status_code, resp_body, elapsed
|
|
|
|
|
|
def extract_items(data_node):
|
|
"""Extract items from paginated response (supports both data.data and data.items patterns)"""
|
|
if isinstance(data_node, list):
|
|
return data_node
|
|
items = data_node.get("data", data_node.get("items", []))
|
|
return items if isinstance(items, list) else []
|
|
|
|
|
|
def extract_total(data_node):
|
|
"""Extract total from paginated response"""
|
|
if isinstance(data_node, list):
|
|
return len(data_node)
|
|
return data_node.get("total", len(extract_items(data_node)))
|
|
|
|
|
|
# ============================================================
|
|
# STEP 1: Authentication
|
|
# ============================================================
|
|
print("\n" + "=" * 70)
|
|
print("STEP 1: Authentication")
|
|
print("=" * 70)
|
|
|
|
code, resp, rt = api_call("POST", "/auth/login", {"username": "admin", "password": "Admin@2026"})
|
|
if code == 200 and resp.get("success") and resp.get("data", {}).get("access_token"):
|
|
TOKEN = resp["data"]["access_token"]
|
|
log_test("Auth", "Admin login", True, f"Token acquired ({len(TOKEN)} chars)", code, rt)
|
|
else:
|
|
log_test("Auth", "Admin login", False, f"Failed: {json.dumps(resp, ensure_ascii=False)[:200]}", code, rt)
|
|
sys.exit(1)
|
|
|
|
|
|
# ============================================================
|
|
# STEP 2: Get Patient ID
|
|
# ============================================================
|
|
print("\n" + "=" * 70)
|
|
print("STEP 2: Get test Patient ID")
|
|
print("=" * 70)
|
|
|
|
code, resp, rt = api_call("GET", "/health/patients?page=1&page_size=3", token=TOKEN)
|
|
if code == 200 and resp.get("success"):
|
|
items = extract_items(resp["data"])
|
|
total = extract_total(resp["data"])
|
|
PATIENT_ID = items[0]["id"] if items else None
|
|
log_test("Patient", "Get patient list", True, f"Total {total}, using ID: {PATIENT_ID}", code, rt)
|
|
else:
|
|
log_test("Patient", "Get patient list", False, f"Failed: {json.dumps(resp, ensure_ascii=False)[:200]}", code, rt)
|
|
PATIENT_ID = None
|
|
|
|
if not PATIENT_ID:
|
|
print("\nFATAL: No patient ID available, aborting")
|
|
sys.exit(1)
|
|
|
|
|
|
# ============================================================
|
|
# STEP 3: Vital Signs Recording
|
|
# ============================================================
|
|
print("\n" + "=" * 70)
|
|
print("STEP 3: Vital Signs Recording")
|
|
print("=" * 70)
|
|
|
|
today = date.today().isoformat()
|
|
vs_path = f"/health/patients/{PATIENT_ID}/vital-signs"
|
|
|
|
# 3.1 Normal BP 120/80 + HR 72
|
|
code1, resp1, rt1 = api_call("POST", vs_path, {
|
|
"record_date": today,
|
|
"systolic_bp_morning": 120,
|
|
"diastolic_bp_morning": 80,
|
|
"heart_rate": 72,
|
|
"source": "manual"
|
|
}, token=TOKEN)
|
|
|
|
if code1 in (200, 201):
|
|
s1 = resp1.get("success", False)
|
|
log_test("VitalSigns", "Normal BP 120/80 + HR 72", s1,
|
|
f"success={s1}", code1, rt1)
|
|
else:
|
|
log_test("VitalSigns", "Normal BP 120/80 + HR 72", False,
|
|
f"HTTP {code1}: {json.dumps(resp1, ensure_ascii=False)[:200]}", code1, rt1)
|
|
|
|
time.sleep(0.5)
|
|
|
|
# 3.2 Abnormal BP 200/130 (should trigger alert)
|
|
code2, resp2, rt2 = api_call("POST", vs_path, {
|
|
"record_date": today,
|
|
"systolic_bp_morning": 200,
|
|
"diastolic_bp_morning": 130,
|
|
"heart_rate": 85,
|
|
"source": "manual"
|
|
}, token=TOKEN)
|
|
|
|
if code2 in (200, 201):
|
|
s2 = resp2.get("success", False)
|
|
log_test("VitalSigns", "Abnormal BP 200/130", s2,
|
|
f"success={s2}", code2, rt2)
|
|
else:
|
|
log_test("VitalSigns", "Abnormal BP 200/130", False,
|
|
f"HTTP {code2}: {json.dumps(resp2, ensure_ascii=False)[:200]}", code2, rt2)
|
|
|
|
time.sleep(0.5)
|
|
|
|
# 3.3 Abnormal HR 150 (should trigger alert)
|
|
code3, resp3, rt3 = api_call("POST", vs_path, {
|
|
"record_date": today,
|
|
"systolic_bp_morning": 125,
|
|
"diastolic_bp_morning": 82,
|
|
"heart_rate": 150,
|
|
"source": "manual"
|
|
}, token=TOKEN)
|
|
|
|
if code3 in (200, 201):
|
|
s3 = resp3.get("success", False)
|
|
log_test("VitalSigns", "Abnormal HR 150", s3,
|
|
f"success={s3}", code3, rt3)
|
|
else:
|
|
log_test("VitalSigns", "Abnormal HR 150", False,
|
|
f"HTTP {code3}: {json.dumps(resp3, ensure_ascii=False)[:200]}", code3, rt3)
|
|
|
|
time.sleep(0.5)
|
|
|
|
# 3.4 Abnormal blood sugar 20.0
|
|
code4, resp4, rt4 = api_call("POST", vs_path, {
|
|
"record_date": today,
|
|
"blood_sugar": 20.0,
|
|
"blood_sugar_type": "fasting",
|
|
"source": "manual"
|
|
}, token=TOKEN)
|
|
|
|
if code4 in (200, 201):
|
|
s4 = resp4.get("success", False)
|
|
log_test("VitalSigns", "Abnormal blood sugar 20.0", s4,
|
|
f"success={s4}", code4, rt4)
|
|
else:
|
|
log_test("VitalSigns", "Abnormal blood sugar 20.0", False,
|
|
f"HTTP {code4}: {json.dumps(resp4, ensure_ascii=False)[:200]}", code4, rt4)
|
|
|
|
time.sleep(0.5)
|
|
|
|
# 3.5 Validation -- missing required field (record_date)
|
|
code5, resp5, rt5 = api_call("POST", vs_path, {
|
|
"systolic_bp_morning": 120,
|
|
"heart_rate": 72,
|
|
}, token=TOKEN)
|
|
|
|
validation_fail = (code5 == 400 or code5 == 422)
|
|
log_test("VitalSigns", "Missing record_date rejected", validation_fail,
|
|
f"HTTP {code5}", code5, rt5)
|
|
|
|
time.sleep(0.5)
|
|
|
|
# 3.6 Security -- no token
|
|
code6, resp6, rt6 = api_call("POST", vs_path, {
|
|
"record_date": today,
|
|
"heart_rate": 80,
|
|
}, token=None)
|
|
|
|
auth_fail = (code6 == 401)
|
|
log_test("VitalSigns", "No token returns 401", auth_fail,
|
|
f"HTTP {code6} (expected 401)", code6, rt6)
|
|
|
|
|
|
# ============================================================
|
|
# STEP 4: Query Vital Signs
|
|
# ============================================================
|
|
print("\n" + "=" * 70)
|
|
print("STEP 4: Query Vital Signs")
|
|
print("=" * 70)
|
|
|
|
time.sleep(1)
|
|
|
|
# 4.1 List by patient
|
|
code_q1, resp_q1, rt_q1 = api_call("GET", f"/health/patients/{PATIENT_ID}/vital-signs", token=TOKEN)
|
|
if code_q1 == 200 and resp_q1.get("success"):
|
|
total_vs = extract_total(resp_q1["data"])
|
|
items_vs = extract_items(resp_q1["data"])
|
|
log_test("VitalSigns Query", "List by patient", True,
|
|
f"total={total_vs}, returned {len(items_vs)} records", code_q1, rt_q1)
|
|
else:
|
|
log_test("VitalSigns Query", "List by patient", False,
|
|
f"HTTP {code_q1}: {json.dumps(resp_q1, ensure_ascii=False)[:200]}", code_q1, rt_q1)
|
|
|
|
time.sleep(0.5)
|
|
|
|
# 4.2 Trend data (management path)
|
|
code_q2, resp_q2, rt_q2 = api_call("GET",
|
|
f"/health/patients/{PATIENT_ID}/trends", token=TOKEN)
|
|
|
|
if code_q2 == 200 and resp_q2.get("success"):
|
|
trend_data = resp_q2.get("data", {})
|
|
log_test("VitalSigns Query", "Trend list", True,
|
|
f"data type: {type(trend_data).__name__}", code_q2, rt_q2)
|
|
else:
|
|
log_test("VitalSigns Query", "Trend list", False,
|
|
f"HTTP {code_q2}: {json.dumps(resp_q2, ensure_ascii=False)[:200]}", code_q2, rt_q2)
|
|
|
|
time.sleep(0.5)
|
|
|
|
# 4.3 Specific indicator timeseries
|
|
code_q3, resp_q3, rt_q3 = api_call("GET",
|
|
f"/health/patients/{PATIENT_ID}/trends/systolic_bp_morning", token=TOKEN)
|
|
|
|
if code_q3 == 200 and resp_q3.get("success"):
|
|
log_test("VitalSigns Query", "BP timeseries", True,
|
|
f"success", code_q3, rt_q3)
|
|
else:
|
|
log_test("VitalSigns Query", "BP timeseries", False,
|
|
f"HTTP {code_q3}: {json.dumps(resp_q3, ensure_ascii=False)[:200]}", code_q3, rt_q3)
|
|
|
|
|
|
# ============================================================
|
|
# STEP 5: Lab Reports
|
|
# ============================================================
|
|
print("\n" + "=" * 70)
|
|
print("STEP 5: Lab Reports")
|
|
print("=" * 70)
|
|
|
|
time.sleep(0.5)
|
|
|
|
# 5.1 Create lab report (with abnormal items)
|
|
lab_path = f"/health/patients/{PATIENT_ID}/lab-reports"
|
|
code_l1, resp_l1, rt_l1 = api_call("POST", lab_path, {
|
|
"report_date": today,
|
|
"report_type": "blood_routine",
|
|
"items": [
|
|
{"name": "WBC", "value": "12.5", "unit": "10^9/L", "reference_low": "4.0", "reference_high": "10.0", "is_abnormal": True},
|
|
{"name": "Hemoglobin", "value": "135", "unit": "g/L", "reference_low": "120", "reference_high": "160", "is_abnormal": False}
|
|
]
|
|
}, token=TOKEN)
|
|
|
|
if code_l1 in (200, 201) and resp_l1.get("success"):
|
|
lab_id = resp_l1.get("data", {}).get("id", "N/A")
|
|
log_test("LabReport", "Create blood routine (with abnormal)", True,
|
|
f"id={lab_id}", code_l1, rt_l1)
|
|
else:
|
|
log_test("LabReport", "Create blood routine (with abnormal)", False,
|
|
f"HTTP {code_l1}: {json.dumps(resp_l1, ensure_ascii=False)[:200]}", code_l1, rt_l1)
|
|
|
|
time.sleep(0.5)
|
|
|
|
# 5.2 List lab reports
|
|
code_l2, resp_l2, rt_l2 = api_call("GET", f"{lab_path}?page=1&page_size=10", token=TOKEN)
|
|
if code_l2 == 200 and resp_l2.get("success"):
|
|
total_lab = extract_total(resp_l2["data"])
|
|
log_test("LabReport", "List patient lab reports", True,
|
|
f"total={total_lab}", code_l2, rt_l2)
|
|
else:
|
|
log_test("LabReport", "List patient lab reports", False,
|
|
f"HTTP {code_l2}: {json.dumps(resp_l2, ensure_ascii=False)[:200]}", code_l2, rt_l2)
|
|
|
|
|
|
# ============================================================
|
|
# STEP 6: Alert Management
|
|
# ============================================================
|
|
print("\n" + "=" * 70)
|
|
print("STEP 6: Alert Management")
|
|
print("=" * 70)
|
|
|
|
time.sleep(2) # Wait for alert engine processing
|
|
|
|
# 6.1 Alert list
|
|
code_a1, resp_a1, rt_a1 = api_call("GET", "/health/alerts?page=1&page_size=20", token=TOKEN)
|
|
if code_a1 == 200 and resp_a1.get("success"):
|
|
total_alerts = extract_total(resp_a1["data"])
|
|
alert_items = extract_items(resp_a1["data"])
|
|
log_test("Alerts", "Alert list query", True,
|
|
f"total={total_alerts}, returned {len(alert_items)} records", code_a1, rt_a1)
|
|
if total_alerts > 0:
|
|
for a in alert_items[:3]:
|
|
level = a.get("severity", a.get("level", "N/A"))
|
|
status_a = a.get("status", "N/A")
|
|
msg = a.get("message", a.get("title", "N/A"))
|
|
print(f" -> Alert: [{level}] {status_a} -- {str(msg)[:60]}")
|
|
else:
|
|
print(" -> Note: No alerts generated. Check alert rules configuration.")
|
|
else:
|
|
log_test("Alerts", "Alert list query", False,
|
|
f"HTTP {code_a1}: {json.dumps(resp_a1, ensure_ascii=False)[:200]}", code_a1, rt_a1)
|
|
|
|
time.sleep(0.5)
|
|
|
|
# 6.2 Alert rules list
|
|
code_a2, resp_a2, rt_a2 = api_call("GET", "/health/alert-rules?page=1&page_size=20", token=TOKEN)
|
|
if code_a2 == 200 and resp_a2.get("success"):
|
|
total_rules = extract_total(resp_a2["data"])
|
|
rule_items = extract_items(resp_a2["data"])
|
|
log_test("Alerts", "Alert rules list", True,
|
|
f"total={total_rules}, returned {len(rule_items)} records", code_a2, rt_a2)
|
|
if total_rules > 0:
|
|
for r in rule_items[:3]:
|
|
rname = r.get("name", "N/A")
|
|
renabled = r.get("is_enabled", r.get("enabled", "N/A"))
|
|
rtype = r.get("indicator_type", r.get("type", "N/A"))
|
|
print(f" -> Rule: {rname} | type: {rtype} | enabled: {renabled}")
|
|
else:
|
|
log_test("Alerts", "Alert rules list", False,
|
|
f"HTTP {code_a2}: {json.dumps(resp_a2, ensure_ascii=False)[:200]}", code_a2, rt_a2)
|
|
|
|
time.sleep(0.5)
|
|
|
|
# 6.3 Critical value thresholds (may return list directly)
|
|
code_a3, resp_a3, rt_a3 = api_call("GET", "/health/critical-value-thresholds?page=1&page_size=20", token=TOKEN)
|
|
if code_a3 == 200 and resp_a3.get("success"):
|
|
data_a3 = resp_a3.get("data", {})
|
|
if isinstance(data_a3, list):
|
|
total_thresholds = len(data_a3)
|
|
threshold_items = data_a3
|
|
else:
|
|
total_thresholds = extract_total(data_a3)
|
|
threshold_items = extract_items(data_a3)
|
|
log_test("Alerts", "Critical value thresholds list", True,
|
|
f"total={total_thresholds}, returned {len(threshold_items)} records", code_a3, rt_a3)
|
|
if total_thresholds > 0:
|
|
for t in threshold_items[:3]:
|
|
tname = t.get("name", t.get("indicator_name", "N/A"))
|
|
ttype = t.get("indicator_type", "N/A")
|
|
thigh = t.get("high_threshold", t.get("threshold_high", "N/A"))
|
|
tlow = t.get("low_threshold", t.get("threshold_low", "N/A"))
|
|
print(f" -> Threshold: {tname} | type: {ttype} | high: {thigh} | low: {tlow}")
|
|
else:
|
|
log_test("Alerts", "Critical value thresholds list", False,
|
|
f"HTTP {code_a3}: {json.dumps(resp_a3, ensure_ascii=False)[:200]}", code_a3, rt_a3)
|
|
|
|
time.sleep(0.5)
|
|
|
|
# 6.4 Critical alerts list
|
|
code_a4, resp_a4, rt_a4 = api_call("GET", "/health/critical-alerts?page=1&page_size=20", token=TOKEN)
|
|
if code_a4 == 200 and resp_a4.get("success"):
|
|
total_ca = extract_total(resp_a4["data"])
|
|
ca_items = extract_items(resp_a4["data"])
|
|
log_test("Alerts", "Critical alerts list", True,
|
|
f"total={total_ca}, returned {len(ca_items)} records", code_a4, rt_a4)
|
|
else:
|
|
log_test("Alerts", "Critical alerts list", False,
|
|
f"HTTP {code_a4}: {json.dumps(resp_a4, ensure_ascii=False)[:200]}", code_a4, rt_a4)
|
|
|
|
time.sleep(0.5)
|
|
|
|
# 6.5 Security -- alerts without auth
|
|
code_a5, resp_a5, rt_a5 = api_call("GET", "/health/alerts", token=None)
|
|
auth_fail_alerts = (code_a5 == 401)
|
|
log_test("Alerts", "No token on alerts returns 401", auth_fail_alerts,
|
|
f"HTTP {code_a5} (expected 401)", code_a5, rt_a5)
|
|
|
|
|
|
# ============================================================
|
|
# STEP 7: Performance Statistics
|
|
# ============================================================
|
|
print("\n" + "=" * 70)
|
|
print("STEP 7: Performance Statistics")
|
|
print("=" * 70)
|
|
|
|
response_times = [r["response_time_ms"] for r in results if r.get("response_time_ms")]
|
|
if response_times:
|
|
avg_rt = sum(response_times) / len(response_times)
|
|
max_rt = max(response_times)
|
|
min_rt = min(response_times)
|
|
under_200ms = sum(1 for v in response_times if v < 200)
|
|
pct = (under_200ms / len(response_times)) * 100
|
|
print(f" Average response time: {avg_rt:.0f}ms")
|
|
print(f" Max response time: {max_rt:.0f}ms")
|
|
print(f" Min response time: {min_rt:.0f}ms")
|
|
print(f" Under 200ms: {pct:.0f}% ({under_200ms}/{len(response_times)})")
|
|
print(f" SLA (< 200ms 95%): {'PASS' if pct >= 95 else 'FAIL'}")
|
|
|
|
|
|
# ============================================================
|
|
# FINAL REPORT
|
|
# ============================================================
|
|
print("\n" + "=" * 70)
|
|
print("FINAL TEST REPORT")
|
|
print("=" * 70)
|
|
|
|
total_tests = len(results)
|
|
passed = sum(1 for r in results if r["status"] == "PASS")
|
|
failed = total_tests - passed
|
|
pass_rate = (passed / total_tests * 100) if total_tests > 0 else 0
|
|
|
|
print(f"\n Total tests: {total_tests}")
|
|
print(f" Passed: {passed}")
|
|
print(f" Failed: {failed}")
|
|
print(f" Pass rate: {pass_rate:.1f}%")
|
|
|
|
print("\n Category summary:")
|
|
categories = {}
|
|
for r in results:
|
|
cat = r["category"]
|
|
if cat not in categories:
|
|
categories[cat] = {"pass": 0, "fail": 0}
|
|
if r["status"] == "PASS":
|
|
categories[cat]["pass"] += 1
|
|
else:
|
|
categories[cat]["fail"] += 1
|
|
|
|
for cat, counts in categories.items():
|
|
t = counts["pass"] + counts["fail"]
|
|
print(f" {cat}: {counts['pass']}/{t} PASS ({counts['fail']} FAIL)")
|
|
|
|
if failed > 0:
|
|
print("\n Failed test details:")
|
|
for r in results:
|
|
if r["status"] == "FAIL":
|
|
print(f" [FAIL] {r['category']} / {r['test_name']}")
|
|
print(f" {r['detail'][:150]}")
|
|
|
|
print(f"\n Overall: {'ALL PASS' if failed == 0 else 'HAS FAILURES'}")
|
|
print("=" * 70)
|
|
|
|
sys.exit(0 if failed == 0 else 1)
|