diff --git a/docs/superpowers/plans/2026-04-25-erp-ai-phase1-mvp.md b/docs/superpowers/plans/2026-04-25-erp-ai-phase1-mvp.md index 9a6db25..67e85d2 100644 --- a/docs/superpowers/plans/2026-04-25-erp-ai-phase1-mvp.md +++ b/docs/superpowers/plans/2026-04-25-erp-ai-phase1-mvp.md @@ -1340,3 +1340,354 @@ git commit -m "feat(ai): 数据脱敏服务 + Prompt 模板渲染引擎" ``` --- + +## Chunk 4: Service 层 — AnalysisService 核心编排 + +### Task 8: AnalysisService — 分析管道核心逻辑 + +**Files:** +- Create: `crates/erp-ai/src/service/mod.rs` +- Create: `crates/erp-ai/src/service/analysis.rs` +- Create: `crates/erp-ai/src/service/prompt.rs` +- Create: `crates/erp-ai/src/service/usage.rs` + +- [ ] **Step 1: 创建 service/mod.rs** + +```rust +// crates/erp-ai/src/service/mod.rs +pub mod analysis; +pub mod prompt; +pub mod usage; +``` + +- [ ] **Step 2: 创建 service/prompt.rs — Prompt CRUD** + +```rust +// crates/erp-ai/src/service/prompt.rs +use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set}; +use uuid::Uuid; + +use crate::entity::ai_prompt; +use crate::error::{AiError, AiResult}; + +pub struct PromptService { + pub db: sea_orm::DatabaseConnection, +} + +impl PromptService { + pub fn new(db: sea_orm::DatabaseConnection) -> Self { + Self { db } + } + + /// 获取当前激活的 Prompt 模板 + pub async fn get_active_prompt( + &self, + tenant_id: Uuid, + name: &str, + ) -> AiResult { + ai_prompt::Entity::find() + .filter(ai_prompt::Column::TenantId.eq(tenant_id)) + .filter(ai_prompt::Column::Name.eq(name)) + .filter(ai_prompt::Column::IsActive.eq(true)) + .filter(ai_prompt::Column::DeletedAt.is_null()) + .one(&self.db) + .await? + .ok_or_else(|| AiError::PromptNotFound(name.into())) + } + + /// 新建 Prompt 版本 + pub async fn create_prompt( + &self, + tenant_id: Uuid, + user_id: Uuid, + name: String, + system_prompt: String, + user_prompt_template: String, + model_config: serde_json::Value, + category: String, + ) -> AiResult { + let id = Uuid::now_v7(); + let now = chrono::Utc::now(); + let active = ai_prompt::ActiveModel { + id: Set(id), + tenant_id: Set(tenant_id), + name: Set(name), + description: Set(String::new()), + system_prompt: Set(system_prompt), + user_prompt_template: Set(user_prompt_template), + variables_schema: Set(None), + model_config: Set(model_config), + version: Set(1), + is_active: Set(true), + category: Set(category), + tags: Set(None), + created_at: Set(now), + updated_at: Set(now), + created_by: Set(Some(user_id)), + updated_by: Set(Some(user_id)), + deleted_at: Set(None), + version_lock: Set(1), + }; + Ok(active.insert(&self.db).await?) + } +} +``` + +- [ ] **Step 3: 创建 service/usage.rs — 用量记录** + +```rust +// crates/erp-ai/src/service/usage.rs +use sea_orm::ActiveModelTrait; +use uuid::Uuid; + +use crate::entity::ai_usage; +use crate::error::AiResult; + +pub struct UsageService { + pub db: sea_orm::DatabaseConnection, +} + +impl UsageService { + pub fn new(db: sea_orm::DatabaseConnection) -> Self { + Self { db } + } + + pub async fn log_usage( + &self, + tenant_id: Uuid, + provider: &str, + model: &str, + analysis_type: &str, + input_tokens: u32, + output_tokens: u32, + duration_ms: u64, + cost_cents: i32, + is_cache_hit: bool, + ) -> AiResult { + let id = Uuid::now_v7(); + let active = ai_usage::ActiveModel { + id: Set(id), + tenant_id: Set(tenant_id), + provider: Set(provider.into()), + model: Set(model.into()), + analysis_type: Set(analysis_type.into()), + input_tokens: Set(input_tokens as i32), + output_tokens: Set(output_tokens as i32), + duration_ms: Set(duration_ms as i32), + cost_cents: Set(cost_cents), + is_cache_hit: Set(is_cache_hit), + created_at: Set(chrono::Utc::now()), + }; + Ok(active.insert(&self.db).await?) + } +} +``` + +> `Set` 需要 `use sea_orm::Set;` + +- [ ] **Step 4: 创建 service/analysis.rs — 核心编排** + +```rust +// crates/erp-ai/src/service/analysis.rs +use async_stream::stream; +use futures::{Stream, StreamExt}; +use sha2::{Digest, Sha256}; +use std::pin::Pin; +use uuid::Uuid; + +use crate::dto::{AnalysisType, GenerateRequest, StreamChunk}; +use crate::entity::ai_analysis; +use crate::error::{AiError, AiResult}; +use crate::prompt::PromptRenderer; +use crate::provider::AiProvider; +use crate::sanitization::SanitizationService; + +pub struct AnalysisService { + pub provider: Box, + pub sanitizer: SanitizationService, + pub renderer: PromptRenderer, + pub db: sea_orm::DatabaseConnection, +} + +impl AnalysisService { + pub fn new( + provider: Box, + db: sea_orm::DatabaseConnection, + ) -> Self { + Self { + provider, + sanitizer: SanitizationService::new(), + renderer: PromptRenderer::new(), + db, + } + } + + /// 执行流式分析 — 返回 SSE 事件流 + pub async fn stream_analyze( + &self, + tenant_id: Uuid, + user_id: Uuid, + patient_id: Uuid, + analysis_type: AnalysisType, + source_ref: String, + system_prompt: String, + user_template: String, + sanitized_data: serde_json::Value, + model: String, + temperature: f32, + max_tokens: u32, + ) -> AiResult<( + Pin> + Send>>, + uuid::Uuid, + String, + )> { + let analysis_id = Uuid::now_v7(); + let input_hash = self.compute_hash(&sanitized_data); + let provider_name = self.provider.name().to_string(); + + // 1. 渲染 Prompt + let user_prompt = self.renderer.render(&user_template, &sanitized_data)?; + + // 2. 创建分析记录 + self.create_analysis_record( + analysis_id, tenant_id, patient_id, + analysis_type.as_str(), &source_ref, + &input_hash, &provider_name, &model, + ).await?; + + // 3. 调用 AI 流式生成 + let req = GenerateRequest { + system_prompt, + user_prompt, + model, + temperature, + max_tokens, + }; + let stream = self.provider.stream_generate(req).await?; + + Ok((stream, analysis_id, provider_name)) + } + + /// 更新分析记录为完成 + pub async fn complete_analysis( + &self, + analysis_id: Uuid, + content: String, + metadata: serde_json::Value, + ) -> AiResult<()> { + let entity: Option = ai_analysis::Entity::find_by_id(analysis_id) + .one(&self.db) + .await? + .ok_or_else(|| AiError::AnalysisNotFound(analysis_id.to_string()))?; + + let mut active: ai_analysis::ActiveModel = entity.into(); + active.status = sea_orm::Set("completed".into()); + active.result_content = sea_orm::Set(Some(content)); + active.result_metadata = sea_orm::Set(Some(metadata)); + active.updated_at = sea_orm::Set(chrono::Utc::now()); + active.update(&self.db).await?; + Ok(()) + } + + /// 标记分析失败 + pub async fn fail_analysis(&self, analysis_id: Uuid, error: String) -> AiResult<()> { + let entity: Option = ai_analysis::Entity::find_by_id(analysis_id) + .one(&self.db) + .await? + .ok_or_else(|| AiError::AnalysisNotFound(analysis_id.to_string()))?; + + let mut active: ai_analysis::ActiveModel = entity.into(); + active.status = sea_orm::Set("failed".into()); + active.error_message = sea_orm::Set(Some(error)); + active.updated_at = sea_orm::Set(chrono::Utc::now()); + active.update(&self.db).await?; + Ok(()) + } + + /// 查找缓存 + pub async fn find_cached( + &self, + tenant_id: Uuid, + input_hash: &str, + prompt_version: i32, + ) -> AiResult> { + let result = ai_analysis::Entity::find() + .filter(ai_analysis::Column::TenantId.eq(tenant_id)) + .filter(ai_analysis::Column::InputDataHash.eq(input_hash)) + .filter(ai_analysis::Column::PromptVersion.eq(prompt_version)) + .filter(ai_analysis::Column::Status.eq("completed")) + .filter(ai_analysis::Column::DeletedAt.is_null()) + .one(&self.db) + .await?; + Ok(result) + } + + fn compute_hash(&self, data: &serde_json::Value) -> String { + let canonical = serde_json::to_string(data).unwrap_or_default(); + let mut hasher = Sha256::new(); + hasher.update(canonical.as_bytes()); + hex::encode(hasher.finalize()) + } + + async fn create_analysis_record( + &self, + id: Uuid, + tenant_id: Uuid, + patient_id: Uuid, + analysis_type: &str, + source_ref: &str, + input_hash: &str, + provider: &str, + model: &str, + ) -> AiResult<()> { + let now = chrono::Utc::now(); + let active = ai_analysis::ActiveModel { + id: sea_orm::Set(id), + tenant_id: sea_orm::Set(tenant_id), + patient_id: sea_orm::Set(patient_id), + analysis_type: sea_orm::Set(analysis_type.into()), + source_ref: sea_orm::Set(source_ref.into()), + prompt_id: sea_orm::Set(Uuid::nil()), // Phase 1 填充 + prompt_version: sea_orm::Set(1), + model_used: sea_orm::Set(model.into()), + input_data_hash: sea_orm::Set(input_hash.into()), + sanitized_input: sea_orm::Set(None), + result_content: sea_orm::Set(None), + result_metadata: sea_orm::Set(None), + status: sea_orm::Set("streaming".into()), + error_message: sea_orm::Set(None), + created_at: sea_orm::Set(now), + updated_at: sea_orm::Set(now), + created_by: sea_orm::Set(None), + updated_by: sea_orm::Set(None), + deleted_at: sea_orm::Set(None), + version_lock: sea_orm::Set(1), + }; + active.insert(&self.db).await?; + Ok(()) + } +} +``` + +> 注意: 需要添加 `use sea_orm::{ActiveModelTrait, EntityTrait, QueryFilter, Set, ColumnTrait};` + +- [ ] **Step 5: 更新 lib.rs** + +```rust +pub mod service; +``` + +- [ ] **Step 6: 验证编译** + +```bash +cargo check -p erp-ai +``` + +- [ ] **Step 7: 提交** + +```bash +git add crates/erp-ai/src/ +git commit -m "feat(ai): AnalysisService 核心编排 + PromptService + UsageService" +``` + +---