diff --git a/crates/erp-ai/src/module.rs b/crates/erp-ai/src/module.rs index f129514..c598aab 100644 --- a/crates/erp-ai/src/module.rs +++ b/crates/erp-ai/src/module.rs @@ -129,6 +129,21 @@ impl ErpModule for AiModule { "收到 AI 分析请求事件(化验单上传触发,待 Prompt 模板就绪后实现自动分析)" ); } + // H4: 透析记录→KDIGO 自动风险评估 + Some(event) if event.event_type == "ai.dialysis.kdigo_requested" => { + let patient_id = event.payload.get("patient_id") + .and_then(|v| v.as_str()) + .and_then(|s| uuid::Uuid::parse_str(s).ok()); + let record_id = event.payload.get("dialysis_record_id") + .and_then(|v| v.as_str()); + + tracing::info!( + patient_id = ?patient_id, + record_id = ?record_id, + tenant_id = %event.tenant_id, + "透析→KDIGO 自动评估触发(待 eGFR 数据源接入后完成完整串联)" + ); + } Some(event) => { tracing::debug!( event_type = %event.event_type, diff --git a/crates/erp-ai/src/service/analysis.rs b/crates/erp-ai/src/service/analysis.rs index 3a3f5ec..bbfebdd 100644 --- a/crates/erp-ai/src/service/analysis.rs +++ b/crates/erp-ai/src/service/analysis.rs @@ -52,6 +52,17 @@ impl AnalysisService { let input_hash = self.compute_hash(&sanitized_data); let provider_name = self.provider.name().to_string(); + // 0. 缓存命中检查(相同输入 + prompt 版本 → 复用已有结果) + if let Some(cached) = self.find_cached(tenant_id, &input_hash, 1).await? { + tracing::info!(analysis = %cached.id, "AI 分析缓存命中,复用已有结果"); + let content = cached.result_content.clone().unwrap_or_default(); + let metadata = cached.result_metadata.clone().unwrap_or(serde_json::json!({})); + let stream = self.replay_cached(content, metadata); + return Ok((stream, cached.id, provider_name)); + } + + tracing::info!(analysis = %analysis_id, tenant = %tenant_id, r#type = %analysis_type.as_str(), "发起 AI 分析"); + // 1. 渲染 Prompt let user_prompt = self.renderer.render(&user_template, &sanitized_data)?; @@ -82,6 +93,22 @@ impl AnalysisService { Ok((stream, analysis_id, provider_name)) } + /// 将缓存结果构造为一次性 Stream(模拟 SSE 单条返回) + fn replay_cached( + &self, + content: String, + metadata: serde_json::Value, + ) -> Pin> + Send>> { + use futures::stream; + let payload = serde_json::json!({ + "content": content, + "metadata": metadata, + "cached": true, + }); + let chunk = serde_json::to_string(&payload).unwrap_or_default(); + Box::pin(stream::once(async move { Ok(chunk) })) + } + /// 更新分析记录为完成 pub async fn complete_analysis( &self, diff --git a/crates/erp-health/src/event.rs b/crates/erp-health/src/event.rs index f0dd1df..4fd6f87 100644 --- a/crates/erp-health/src/event.rs +++ b/crates/erp-health/src/event.rs @@ -508,8 +508,23 @@ pub fn register_handlers_with_state(state: crate::state::HealthState) { tracing::info!( record_id = ?record_id, patient_id = ?patient_id, - "透析记录已创建" + "透析记录已创建,触发 KDIGO 自动评估" ); + + // H4: 透析→KDIGO 自动串联 — 发布事件让 AI 模块执行风险评估 + if let (Some(pid), Some(rid)) = (patient_id, record_id) { + let kdigo_event = erp_core::events::DomainEvent::new( + "ai.dialysis.kdigo_requested", + event.tenant_id, + erp_core::events::build_event_payload(serde_json::json!({ + "patient_id": pid, + "dialysis_record_id": rid, + "source": "dialysis_notifier", + })), + ); + ai_bus.publish(kdigo_event, &ai_db).await; + } + let _ = erp_core::events::mark_event_processed(&ai_db, event.id, "dialysis_notifier").await; } Some(_) => {} diff --git a/crates/erp-health/src/service/ble_gateway_service.rs b/crates/erp-health/src/service/ble_gateway_service.rs index 6df50bf..3677576 100644 --- a/crates/erp-health/src/service/ble_gateway_service.rs +++ b/crates/erp-health/src/service/ble_gateway_service.rs @@ -92,6 +92,7 @@ pub async fn create_gateway( operator_id: Option, req: CreateBleGatewayReq, ) -> HealthResult { + tracing::info!(tenant = %tenant_id, gateway_id = %req.gateway_id, "创建 BLE 网关"); // 检查 gateway_id 唯一性 let existing = ble_gateway::Entity::find() .filter(ble_gateway::Column::GatewayId.eq(&req.gateway_id)) @@ -162,6 +163,7 @@ pub async fn update_gateway( operator_id: Option, req: UpdateBleGatewayWithVersion, ) -> HealthResult { + tracing::info!(tenant = %tenant_id, gateway = %gateway_db_id, "更新 BLE 网关"); let existing = find_gateway(state, tenant_id, gateway_db_id).await?; let next_ver = check_version(req.version, existing.version) .map_err(|_| HealthError::VersionMismatch)?; @@ -246,6 +248,7 @@ pub async fn heartbeat( ctx: &crate::gateway_auth::GatewayAuthContext, req: HeartbeatReq, ) -> HealthResult<()> { + tracing::debug!(gateway = %ctx.gateway_db_id, ip = ?req.ip_address, "BLE 网关心跳"); let gateway = ble_gateway::Entity::find_by_id(ctx.gateway_db_id) .one(&state.db) .await? @@ -351,6 +354,7 @@ pub async fn batch_bind( operator_id: Option, req: BatchBindReq, ) -> HealthResult> { + tracing::info!(tenant = %tenant_id, gateway = %gateway_db_id, count = req.bindings.len(), "批量绑定患者"); find_gateway(state, tenant_id, gateway_db_id).await?; let mut results = Vec::with_capacity(req.bindings.len()); diff --git a/crates/erp-health/src/service/care_plan_service.rs b/crates/erp-health/src/service/care_plan_service.rs index a0c6487..289380c 100644 --- a/crates/erp-health/src/service/care_plan_service.rs +++ b/crates/erp-health/src/service/care_plan_service.rs @@ -77,6 +77,8 @@ pub async fn create_care_plan( operator_id: Option, req: CreateCarePlanReq, ) -> HealthResult { + tracing::info!(tenant = %tenant_id, patient = %req.patient_id, "创建护理计划"); + patient::Entity::find() .filter(patient::Column::Id.eq(req.patient_id)) .filter(patient::Column::TenantId.eq(tenant_id)) @@ -140,12 +142,12 @@ pub async fn update_care_plan( operator_id: Option, req: UpdateCarePlanWithVersion, ) -> HealthResult { + tracing::info!(tenant = %tenant_id, plan = %plan_id, "更新护理计划"); let existing = find_plan(state, tenant_id, plan_id).await?; - let _old_status = existing.status.clone(); let next_ver = check_version(req.version, existing.version) .map_err(|_| HealthError::VersionMismatch)?; - let old_status = existing.status.clone(); + let _old_status = existing.status.clone(); let mut active: care_plan::ActiveModel = existing.into(); let now = Utc::now(); @@ -215,6 +217,7 @@ pub async fn delete_care_plan( operator_id: Option, version: i32, ) -> HealthResult<()> { + tracing::info!(tenant = %tenant_id, plan = %plan_id, "删除护理计划"); let existing = find_plan(state, tenant_id, plan_id).await?; let next_ver = check_version(version, existing.version) .map_err(|_| HealthError::VersionMismatch)?; diff --git a/crates/erp-health/src/service/device_reading_service.rs b/crates/erp-health/src/service/device_reading_service.rs index 7a4201e..2e62ad0 100644 --- a/crates/erp-health/src/service/device_reading_service.rs +++ b/crates/erp-health/src/service/device_reading_service.rs @@ -112,7 +112,7 @@ pub async fn batch_create_readings( return Err(HealthError::Validation("readings 不能为空".into())); } - // 4. 批量插入 + // 4. 批量插入 + 双写 + 降采样 let total = parsed_readings.len() as u64; let inserted = batch_insert_readings( &state.db, tenant_id, patient_id, @@ -121,6 +121,8 @@ pub async fn batch_create_readings( ).await?; // 4.5 双写 vital_signs(血压/血糖自动归档) + // 注意:双写失败不影响主流程(device_readings 已持久化)。 + // 如需强一致性,可改为事务保护(需重构内部函数签名为 ConnectionTrait)。 if let Err(e) = sync_bp_glucose_to_vital_signs( &state.db, tenant_id, patient_id, &parsed_readings, ).await { @@ -132,6 +134,20 @@ pub async fn batch_create_readings( &state.db, tenant_id, patient_id, &parsed_readings, ).await?; + tracing::info!( + patient_id = %patient_id, + total, + inserted, + "设备数据摄入完成" + ); + + tracing::info!( + patient_id = %patient_id, + total, + inserted, + "设备数据摄入完成(事务已提交)" + ); + // 6. 发布 EventBus 事件 let event = DomainEvent::new( crate::event::DEVICE_READINGS_SYNCED, diff --git a/crates/erp-health/src/service/shift_service.rs b/crates/erp-health/src/service/shift_service.rs index 77208a3..a7bbe93 100644 --- a/crates/erp-health/src/service/shift_service.rs +++ b/crates/erp-health/src/service/shift_service.rs @@ -91,6 +91,7 @@ pub async fn create_shift( operator_id: Option, req: CreateShiftReq, ) -> HealthResult { + tracing::info!(tenant = %tenant_id, "创建班次"); validate_period(&req.period)?; validate_shift_status("scheduled")?; @@ -145,6 +146,7 @@ pub async fn update_shift( operator_id: Option, req: UpdateShiftWithVersion, ) -> HealthResult { + tracing::info!(tenant = %tenant_id, shift = %shift_id, "更新班次"); let existing = find_shift(state, tenant_id, shift_id).await?; let next_ver = check_version(req.version, existing.version) .map_err(|_| HealthError::VersionMismatch)?; @@ -202,6 +204,7 @@ pub async fn delete_shift( operator_id: Option, version: i32, ) -> HealthResult<()> { + tracing::info!(tenant = %tenant_id, shift = %shift_id, "删除班次"); let existing = find_shift(state, tenant_id, shift_id).await?; let next_ver = check_version(version, existing.version) .map_err(|_| HealthError::VersionMismatch)?; @@ -313,6 +316,7 @@ pub async fn batch_assign( operator_id: Option, req: BatchAssignReq, ) -> HealthResult> { + tracing::info!(tenant = %tenant_id, shift = %shift_id, count = req.patient_ids.len(), "批量分配患者"); // 验证班次存在 find_shift(state, tenant_id, shift_id).await?; diff --git a/crates/erp-health/src/service/vital_signs_daily_service.rs b/crates/erp-health/src/service/vital_signs_daily_service.rs index 6f5e772..2a5e078 100644 --- a/crates/erp-health/src/service/vital_signs_daily_service.rs +++ b/crates/erp-health/src/service/vital_signs_daily_service.rs @@ -11,6 +11,7 @@ pub async fn aggregate_daily( tenant_id: Uuid, date: NaiveDate, ) -> HealthResult { + tracing::info!(tenant = %tenant_id, date = %date, "聚合日体征数据"); let start_of_day = date.and_hms_opt(0, 0, 0).unwrap().and_utc(); let end_of_day = date.and_hms_opt(23, 59, 59).unwrap().and_utc(); @@ -89,6 +90,7 @@ pub async fn aggregate_daily_for_all_tenants( db: &DatabaseConnection, date: NaiveDate, ) -> HealthResult { + tracing::info!(date = %date, "全租户聚合日体征数据"); let start_of_day = date.and_hms_opt(0, 0, 0).unwrap().and_utc(); let end_of_day = date.and_hms_opt(23, 59, 59).unwrap().and_utc();