use chrono::Utc; use sea_orm::{ActiveModelTrait, ColumnTrait, ConnectionTrait, EntityTrait, PaginatorTrait, QueryFilter, Set}; use uuid::Uuid; use sha2::{Sha256, Digest}; use erp_core::error::AppResult; use crate::dto::{ PluginEntityResp, PluginHealthResp, PluginPermissionResp, PluginResp, }; use crate::dynamic_table::DynamicTableManager; use crate::engine::PluginEngine; use crate::entity::{plugin, plugin_entity, plugin_event_subscription}; use crate::error::PluginError; use crate::manifest::{parse_manifest, PluginManifest}; pub struct PluginService; impl PluginService { /// 上传插件: 解析 manifest + 存储 wasm_binary + status=uploaded pub async fn upload( tenant_id: Uuid, operator_id: Uuid, wasm_binary: Vec, manifest_toml: &str, db: &sea_orm::DatabaseConnection, ) -> AppResult { // 解析 manifest let manifest = parse_manifest(manifest_toml)?; // 计算 WASM hash let mut hasher = Sha256::new(); hasher.update(&wasm_binary); let wasm_hash = format!("{:x}", hasher.finalize()); let now = Utc::now(); let plugin_id = Uuid::now_v7(); // 序列化 manifest 为 JSON let manifest_json = serde_json::to_value(&manifest).map_err(|e| PluginError::InvalidManifest(e.to_string()))?; let model = plugin::ActiveModel { id: Set(plugin_id), tenant_id: Set(tenant_id), name: Set(manifest.metadata.name.clone()), plugin_version: Set(manifest.metadata.version.clone()), description: Set(if manifest.metadata.description.is_empty() { None } else { Some(manifest.metadata.description.clone()) }), author: Set(if manifest.metadata.author.is_empty() { None } else { Some(manifest.metadata.author.clone()) }), status: Set("uploaded".to_string()), manifest_json: Set(manifest_json), wasm_binary: Set(wasm_binary), wasm_hash: Set(wasm_hash), config_json: Set(serde_json::json!({})), error_message: Set(None), installed_at: Set(None), enabled_at: Set(None), created_at: Set(now), updated_at: Set(now), created_by: Set(Some(operator_id)), updated_by: Set(Some(operator_id)), deleted_at: Set(None), version: Set(1), }; let model = model.insert(db).await?; Ok(plugin_model_to_resp(&model, &manifest, vec![])) } /// 安装插件: 创建动态表 + 注册 entity 记录 + 注册事件订阅 + 注册权限 + status=installed pub async fn install( plugin_id: Uuid, tenant_id: Uuid, operator_id: Uuid, db: &sea_orm::DatabaseConnection, engine: &PluginEngine, ) -> AppResult { let model = find_plugin(plugin_id, tenant_id, db).await?; validate_status(&model.status, "uploaded")?; let manifest: PluginManifest = serde_json::from_value(model.manifest_json.clone()) .map_err(|e| PluginError::InvalidManifest(e.to_string()))?; let now = Utc::now(); // 创建动态表 + 注册 entity 记录 let mut entity_resps = Vec::new(); if let Some(schema) = &manifest.schema { for entity_def in &schema.entities { let table_name = DynamicTableManager::table_name(&manifest.metadata.id, &entity_def.name); // 创建动态表 DynamicTableManager::create_table(db, &manifest.metadata.id, entity_def).await?; // 注册 entity 记录 let entity_id = Uuid::now_v7(); let entity_model = plugin_entity::ActiveModel { id: Set(entity_id), tenant_id: Set(tenant_id), plugin_id: Set(plugin_id), entity_name: Set(entity_def.name.clone()), table_name: Set(table_name.clone()), schema_json: Set(serde_json::to_value(entity_def).unwrap_or_default()), created_at: Set(now), updated_at: Set(now), created_by: Set(Some(operator_id)), updated_by: Set(Some(operator_id)), deleted_at: Set(None), version: Set(1), }; entity_model.insert(db).await?; entity_resps.push(PluginEntityResp { name: entity_def.name.clone(), display_name: entity_def.display_name.clone(), table_name, }); } } // 注册事件订阅 if let Some(events) = &manifest.events { for pattern in &events.subscribe { let sub_id = Uuid::now_v7(); let sub_model = plugin_event_subscription::ActiveModel { id: Set(sub_id), plugin_id: Set(plugin_id), event_pattern: Set(pattern.clone()), created_at: Set(now), }; sub_model.insert(db).await?; } } // 注册插件声明的权限到 permissions 表 if let Some(perms) = &manifest.permissions { register_plugin_permissions( db, tenant_id, operator_id, &manifest.metadata.id, perms, &now, ) .await?; } // 加载到内存 engine .load( &manifest.metadata.id, &model.wasm_binary, manifest.clone(), ) .await?; // 更新状态 let mut active: plugin::ActiveModel = model.into(); active.status = Set("installed".to_string()); active.installed_at = Set(Some(now)); active.updated_at = Set(now); active.updated_by = Set(Some(operator_id)); let model = active.update(db).await?; Ok(plugin_model_to_resp(&model, &manifest, entity_resps)) } /// 启用插件: engine.initialize + start_event_listener + status=running pub async fn enable( plugin_id: Uuid, tenant_id: Uuid, operator_id: Uuid, db: &sea_orm::DatabaseConnection, engine: &PluginEngine, ) -> AppResult { let model = find_plugin(plugin_id, tenant_id, db).await?; validate_status_any(&model.status, &["installed", "disabled"])?; let manifest: PluginManifest = serde_json::from_value(model.manifest_json.clone()) .map_err(|e| PluginError::InvalidManifest(e.to_string()))?; let plugin_manifest_id = &manifest.metadata.id; // 如果之前是 disabled 状态,需要先卸载再重新加载到内存 // (disable 只改内存状态但不从 DashMap 移除) if model.status == "disabled" { engine.unload(plugin_manifest_id).await.ok(); engine .load(plugin_manifest_id, &model.wasm_binary, manifest.clone()) .await?; } // 初始化 engine.initialize(plugin_manifest_id).await?; // 启动事件监听 engine.start_event_listener(plugin_manifest_id).await?; let now = Utc::now(); let mut active: plugin::ActiveModel = model.into(); active.status = Set("running".to_string()); active.enabled_at = Set(Some(now)); active.updated_at = Set(now); active.updated_by = Set(Some(operator_id)); active.error_message = Set(None); let model = active.update(db).await?; let entity_resps = find_plugin_entities(plugin_id, tenant_id, db).await?; Ok(plugin_model_to_resp(&model, &manifest, entity_resps)) } /// 禁用插件: engine.disable + cancel 事件订阅 + status=disabled pub async fn disable( plugin_id: Uuid, tenant_id: Uuid, operator_id: Uuid, db: &sea_orm::DatabaseConnection, engine: &PluginEngine, ) -> AppResult { let model = find_plugin(plugin_id, tenant_id, db).await?; validate_status_any(&model.status, &["running", "enabled"])?; let manifest: PluginManifest = serde_json::from_value(model.manifest_json.clone()) .map_err(|e| PluginError::InvalidManifest(e.to_string()))?; // 禁用引擎 engine.disable(&manifest.metadata.id).await?; let now = Utc::now(); let mut active: plugin::ActiveModel = model.into(); active.status = Set("disabled".to_string()); active.updated_at = Set(now); active.updated_by = Set(Some(operator_id)); let model = active.update(db).await?; let entity_resps = find_plugin_entities(plugin_id, tenant_id, db).await?; Ok(plugin_model_to_resp(&model, &manifest, entity_resps)) } /// 卸载插件: unload + 有条件地 drop 动态表 + 清理权限 + status=uninstalled pub async fn uninstall( plugin_id: Uuid, tenant_id: Uuid, operator_id: Uuid, db: &sea_orm::DatabaseConnection, engine: &PluginEngine, ) -> AppResult { let model = find_plugin(plugin_id, tenant_id, db).await?; validate_status_any(&model.status, &["installed", "disabled"])?; let manifest: PluginManifest = serde_json::from_value(model.manifest_json.clone()) .map_err(|e| PluginError::InvalidManifest(e.to_string()))?; // 卸载(如果 disabled 状态,engine 可能仍在内存中) engine.unload(&manifest.metadata.id).await.ok(); // 软删除当前租户的 entity 记录 let now = Utc::now(); let tenant_entities = plugin_entity::Entity::find() .filter(plugin_entity::Column::PluginId.eq(plugin_id)) .filter(plugin_entity::Column::TenantId.eq(tenant_id)) .filter(plugin_entity::Column::DeletedAt.is_null()) .all(db) .await?; for entity in &tenant_entities { let mut active: plugin_entity::ActiveModel = entity.clone().into(); active.deleted_at = Set(Some(now)); active.updated_at = Set(now); active.updated_by = Set(Some(operator_id)); active.update(db).await?; } // 仅当没有其他租户的活跃 entity 记录引用相同的 table_name 时才 drop 表 if let Some(schema) = &manifest.schema { for entity_def in &schema.entities { let table_name = DynamicTableManager::table_name(&manifest.metadata.id, &entity_def.name); // 检查是否还有其他租户的活跃 entity 记录引用此表 let other_tenants_count = plugin_entity::Entity::find() .filter(plugin_entity::Column::TableName.eq(&table_name)) .filter(plugin_entity::Column::TenantId.ne(tenant_id)) .filter(plugin_entity::Column::DeletedAt.is_null()) .count(db) .await?; if other_tenants_count == 0 { // 没有其他租户使用,安全删除 DynamicTableManager::drop_table(db, &manifest.metadata.id, &entity_def.name) .await .ok(); } } } // 清理此插件注册的权限 unregister_plugin_permissions(db, tenant_id, &manifest.metadata.id).await?; let mut active: plugin::ActiveModel = model.into(); active.status = Set("uninstalled".to_string()); active.updated_at = Set(now); active.updated_by = Set(Some(operator_id)); let model = active.update(db).await?; Ok(plugin_model_to_resp(&model, &manifest, vec![])) } /// 列表查询 pub async fn list( tenant_id: Uuid, page: u64, page_size: u64, status: Option<&str>, search: Option<&str>, db: &sea_orm::DatabaseConnection, ) -> AppResult<(Vec, u64)> { let mut query = plugin::Entity::find() .filter(plugin::Column::TenantId.eq(tenant_id)) .filter(plugin::Column::DeletedAt.is_null()); if let Some(s) = status { query = query.filter(plugin::Column::Status.eq(s)); } if let Some(q) = search { query = query.filter( plugin::Column::Name.contains(q) .or(plugin::Column::Description.contains(q)), ); } let paginator = query .clone() .paginate(db, page_size); let total = paginator.num_items().await?; let models = paginator .fetch_page(page.saturating_sub(1)) .await?; let mut resps = Vec::with_capacity(models.len()); for model in models { let manifest: PluginManifest = serde_json::from_value(model.manifest_json.clone()).unwrap_or_else(|_| { PluginManifest { metadata: crate::manifest::PluginMetadata { id: String::new(), name: String::new(), version: String::new(), description: String::new(), author: String::new(), min_platform_version: None, dependencies: vec![], }, schema: None, events: None, ui: None, permissions: None, } }); let entities = find_plugin_entities(model.id, tenant_id, db).await.unwrap_or_default(); resps.push(plugin_model_to_resp(&model, &manifest, entities)); } Ok((resps, total)) } /// 按 ID 获取详情 pub async fn get_by_id( plugin_id: Uuid, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> AppResult { let model = find_plugin(plugin_id, tenant_id, db).await?; let manifest: PluginManifest = serde_json::from_value(model.manifest_json.clone()) .map_err(|e| PluginError::InvalidManifest(e.to_string()))?; let entities = find_plugin_entities(plugin_id, tenant_id, db).await?; Ok(plugin_model_to_resp(&model, &manifest, entities)) } /// 更新配置 pub async fn update_config( plugin_id: Uuid, tenant_id: Uuid, operator_id: Uuid, config: serde_json::Value, expected_version: i32, db: &sea_orm::DatabaseConnection, ) -> AppResult { let model = find_plugin(plugin_id, tenant_id, db).await?; erp_core::error::check_version(expected_version, model.version)?; let now = Utc::now(); let mut active: plugin::ActiveModel = model.into(); active.config_json = Set(config); active.updated_at = Set(now); active.updated_by = Set(Some(operator_id)); let model = active.update(db).await?; let manifest: PluginManifest = serde_json::from_value(model.manifest_json.clone()) .map_err(|e| PluginError::InvalidManifest(e.to_string()))?; let entities = find_plugin_entities(plugin_id, tenant_id, db).await.unwrap_or_default(); Ok(plugin_model_to_resp(&model, &manifest, entities)) } /// 健康检查 pub async fn health_check( plugin_id: Uuid, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, engine: &PluginEngine, ) -> AppResult { let model = find_plugin(plugin_id, tenant_id, db).await?; let manifest: PluginManifest = serde_json::from_value(model.manifest_json.clone()) .map_err(|e| PluginError::InvalidManifest(e.to_string()))?; let details = engine.health_check(&manifest.metadata.id).await?; Ok(PluginHealthResp { plugin_id, status: details .get("status") .and_then(|v| v.as_str()) .unwrap_or("unknown") .to_string(), details, }) } /// 获取插件 Schema pub async fn get_schema( plugin_id: Uuid, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> AppResult { let model = find_plugin(plugin_id, tenant_id, db).await?; let manifest: PluginManifest = serde_json::from_value(model.manifest_json.clone()) .map_err(|e| PluginError::InvalidManifest(e.to_string()))?; Ok(serde_json::to_value(&manifest.schema).unwrap_or_default()) } /// 清除插件记录(软删除,仅限已卸载状态) pub async fn purge( plugin_id: Uuid, tenant_id: Uuid, operator_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> AppResult<()> { let model = find_plugin(plugin_id, tenant_id, db).await?; validate_status(&model.status, "uninstalled")?; let now = Utc::now(); let mut active: plugin::ActiveModel = model.into(); active.deleted_at = Set(Some(now)); active.updated_at = Set(now); active.updated_by = Set(Some(operator_id)); active.update(db).await?; Ok(()) } } // ---- 内部辅助 ---- fn find_plugin( plugin_id: Uuid, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> impl std::future::Future> + Send { async move { plugin::Entity::find_by_id(plugin_id) .one(db) .await? .filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none()) .ok_or_else(|| { erp_core::error::AppError::NotFound(format!("插件 {} 不存在", plugin_id)) }) } } async fn find_plugin_entities( plugin_id: Uuid, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> AppResult> { let entities = plugin_entity::Entity::find() .filter(plugin_entity::Column::PluginId.eq(plugin_id)) .filter(plugin_entity::Column::TenantId.eq(tenant_id)) .filter(plugin_entity::Column::DeletedAt.is_null()) .all(db) .await?; Ok(entities .into_iter() .map(|e| PluginEntityResp { name: e.entity_name.clone(), display_name: e.entity_name, table_name: e.table_name, }) .collect()) } fn validate_status(actual: &str, expected: &str) -> AppResult<()> { if actual != expected { return Err(PluginError::InvalidState { expected: expected.to_string(), actual: actual.to_string(), } .into()); } Ok(()) } fn validate_status_any(actual: &str, expected: &[&str]) -> AppResult<()> { if !expected.contains(&actual) { return Err(PluginError::InvalidState { expected: expected.join(" 或 "), actual: actual.to_string(), } .into()); } Ok(()) } fn plugin_model_to_resp( model: &plugin::Model, manifest: &PluginManifest, entities: Vec, ) -> PluginResp { let permissions = manifest.permissions.as_ref().map(|perms| { perms .iter() .map(|p| PluginPermissionResp { code: p.code.clone(), name: p.name.clone(), description: p.description.clone(), }) .collect() }); PluginResp { id: model.id, name: model.name.clone(), version: model.plugin_version.clone(), description: model.description.clone(), author: model.author.clone(), status: model.status.clone(), config: model.config_json.clone(), installed_at: model.installed_at, enabled_at: model.enabled_at, entities, permissions, record_version: model.version, } } /// 将插件声明的权限注册到 permissions 表。 /// /// 使用 raw SQL 避免依赖 erp-auth 的 entity 类型。 /// 权限码格式:`{plugin_manifest_id}.{code}`(如 `erp-crm.customer.list`)。 /// 使用 `ON CONFLICT DO NOTHING` 保证幂等。 async fn register_plugin_permissions( db: &sea_orm::DatabaseConnection, tenant_id: Uuid, operator_id: Uuid, plugin_manifest_id: &str, perms: &[crate::manifest::PluginPermission], now: &chrono::DateTime, ) -> AppResult<()> { for perm in perms { let full_code = format!("{}.{}", plugin_manifest_id, perm.code); // resource 使用插件 manifest id,action 使用权限的 code 字段 let resource = plugin_manifest_id.to_string(); let action = perm.code.clone(); let description_sql = if perm.description.is_empty() { "NULL".to_string() } else { format!("'{}'", perm.description.replace('\'', "''")) }; let sql = format!( r#" INSERT INTO permissions (id, tenant_id, code, name, resource, action, description, created_at, updated_at, created_by, updated_by, deleted_at, version) VALUES (gen_random_uuid(), '{tenant_id}', '{full_code}', '{name}', '{resource}', '{action}', {desc}, '{now}', '{now}', '{operator_id}', '{operator_id}', NULL, 1) ON CONFLICT (tenant_id, code) WHERE deleted_at IS NULL DO NOTHING "#, tenant_id = tenant_id, full_code = full_code.replace('\'', "''"), name = perm.name.replace('\'', "''"), resource = resource.replace('\'', "''"), action = action.replace('\'', "''"), desc = description_sql, now = now.to_rfc3339(), operator_id = operator_id, ); db.execute(sea_orm::Statement::from_string( sea_orm::DatabaseBackend::Postgres, sql, )) .await .map_err(|e| { tracing::error!( plugin = plugin_manifest_id, permission = %full_code, error = %e, "注册插件权限失败" ); PluginError::DatabaseError(format!("注册插件权限 {} 失败: {}", full_code, e)) })?; } tracing::info!( plugin = plugin_manifest_id, count = perms.len(), tenant_id = %tenant_id, "插件权限注册完成" ); Ok(()) } /// 清理插件注册的权限(软删除)。 /// /// 使用 raw SQL 按前缀匹配清理:`{plugin_manifest_id}.%`。 /// 同时清理 role_permissions 中对这些权限的关联。 async fn unregister_plugin_permissions( db: &sea_orm::DatabaseConnection, tenant_id: Uuid, plugin_manifest_id: &str, ) -> AppResult<()> { let prefix = format!("{}.%", plugin_manifest_id); let now = chrono::Utc::now().to_rfc3339(); // 先软删除 role_permissions 中的关联 let rp_sql = format!( r#" UPDATE role_permissions SET deleted_at = '{now}', updated_at = '{now}' WHERE permission_id IN ( SELECT id FROM permissions WHERE tenant_id = '{tenant_id}' AND code LIKE '{prefix}' AND deleted_at IS NULL ) AND deleted_at IS NULL "#, now = now, tenant_id = tenant_id, prefix = prefix.replace('\'', "''"), ); db.execute(sea_orm::Statement::from_string( sea_orm::DatabaseBackend::Postgres, rp_sql, )) .await .map_err(|e| { tracing::error!( plugin = plugin_manifest_id, error = %e, "清理插件权限角色关联失败" ); PluginError::DatabaseError(format!("清理插件权限角色关联失败: {}", e)) })?; // 再软删除 permissions let perm_sql = format!( r#" UPDATE permissions SET deleted_at = '{now}', updated_at = '{now}' WHERE tenant_id = '{tenant_id}' AND code LIKE '{prefix}' AND deleted_at IS NULL "#, now = now, tenant_id = tenant_id, prefix = prefix.replace('\'', "''"), ); db.execute(sea_orm::Statement::from_string( sea_orm::DatabaseBackend::Postgres, perm_sql, )) .await .map_err(|e| { tracing::error!( plugin = plugin_manifest_id, error = %e, "清理插件权限失败" ); PluginError::DatabaseError(format!("清理插件权限失败: {}", e)) })?; tracing::info!( plugin = plugin_manifest_id, tenant_id = %tenant_id, "插件权限清理完成" ); Ok(()) }