use std::collections::HashMap; use sea_orm::DatabaseConnection; use uuid::Uuid; use wasmtime::StoreLimits; use crate::dynamic_table::DynamicTableManager; use crate::engine::PluginEngine; use crate::erp::plugin::host_api; /// 待刷新的写操作 #[derive(Debug)] pub enum PendingOp { Insert { id: String, entity: String, data: Vec, }, Update { entity: String, id: String, data: Vec, version: i64, }, Delete { entity: String, id: String, }, PublishEvent { event_type: String, payload: Vec, }, } /// Host 端状态 — 绑定到每个 WASM Store 实例 /// /// 支持两种执行模式: /// - **预填充模式**(db = None):读操作从预填充缓存取,向后兼容 /// - **混合执行模式**(db = Some):读操作走实时 SQL + 写操作保持延迟批量 pub struct HostState { pub(crate) limits: StoreLimits, #[allow(dead_code)] pub(crate) tenant_id: Uuid, #[allow(dead_code)] pub(crate) user_id: Uuid, pub(crate) permissions: Vec, pub(crate) plugin_id: String, // 预填充的读取缓存(向后兼容) pub(crate) query_results: HashMap>, pub(crate) config_cache: HashMap>, pub(crate) current_user_json: Vec, // 待刷新的写操作 pub(crate) pending_ops: Vec, // 日志 pub(crate) logs: Vec<(String, String)>, // 混合执行模式:数据库连接和事件总线 pub(crate) db: Option, pub(crate) event_bus: Option, // 跨插件实体映射:"erp-crm.customer" → "plugin_erp_crm__customer" pub(crate) cross_plugin_entities: HashMap, // 编号规则映射:"invoice" → "INV-{YEAR}-{SEQ:4}" pub(crate) numbering_rules: HashMap, // 插件配置值 pub(crate) plugin_config: serde_json::Value, } /// 编号规则缓存 #[derive(Debug, Clone)] pub struct NumberingRule { pub prefix: String, pub format: String, pub seq_length: u32, pub reset_rule: String, } impl HostState { pub fn new( plugin_id: String, tenant_id: Uuid, user_id: Uuid, permissions: Vec, ) -> Self { let current_user = serde_json::json!({ "id": user_id.to_string(), "tenant_id": tenant_id.to_string(), }); Self { limits: wasmtime::StoreLimitsBuilder::new().build(), tenant_id, user_id, permissions, plugin_id, query_results: HashMap::new(), config_cache: HashMap::new(), current_user_json: serde_json::to_vec(¤t_user).unwrap_or_default(), pending_ops: Vec::new(), logs: Vec::new(), db: None, event_bus: None, cross_plugin_entities: HashMap::new(), numbering_rules: HashMap::new(), plugin_config: serde_json::json!({}), } } /// 创建带数据库连接的 HostState(混合执行模式) pub fn new_with_db( plugin_id: String, tenant_id: Uuid, user_id: Uuid, permissions: Vec, db: DatabaseConnection, event_bus: erp_core::events::EventBus, ) -> Self { let mut state = Self::new(plugin_id, tenant_id, user_id, permissions); state.db = Some(db); state.event_bus = Some(event_bus); state } } // 实现 bindgen 生成的 Host trait — 插件调用 Host API 的入口 impl host_api::Host for HostState { fn db_insert(&mut self, entity: String, data: Vec) -> Result, String> { let id = Uuid::now_v7().to_string(); let response = serde_json::json!({ "id": id, "entity": entity, "status": "queued", }); self.pending_ops.push(PendingOp::Insert { id: id.clone(), entity, data, }); serde_json::to_vec(&response).map_err(|e| e.to_string()) } fn db_query( &mut self, entity: String, filter: Vec, pagination: Vec, ) -> Result, String> { // 预填充模式(向后兼容) if self.db.is_none() { return self .query_results .get(&entity) .cloned() .ok_or_else(|| format!("实体 '{}' 的查询结果未预填充", entity)); } let db = self.db.clone().ok_or("数据库连接不可用")?; let event_bus = self.event_bus.clone().ok_or("事件总线不可用")?; // 先 flush pending writes(确保读后写一致性) let ops = std::mem::take(&mut self.pending_ops); if !ops.is_empty() { let rt = tokio::runtime::Handle::current(); rt.block_on(PluginEngine::flush_ops( &db, &self.plugin_id, ops, self.tenant_id, self.user_id, &event_bus, )) .map_err(|e| format!("flush pending ops 失败: {}", e))?; } // 解析 filter 和 pagination let filter_val: Option = if filter.is_empty() { None } else { serde_json::from_slice(&filter).ok() }; let pagination_val: Option = if pagination.is_empty() { None } else { serde_json::from_slice(&pagination).ok() }; // 构建查询 — 支持点分记号跨插件查询(如 "erp-crm.customer") let table_name = if entity.contains('.') { self.cross_plugin_entities .get(&entity) .cloned() .ok_or_else(|| format!("跨插件实体 '{}' 未注册", entity))? } else { DynamicTableManager::table_name(&self.plugin_id, &entity) }; let limit = pagination_val .as_ref() .and_then(|p| p.get("limit")) .and_then(|v| v.as_u64()) .unwrap_or(50); let offset = pagination_val .as_ref() .and_then(|p| p.get("offset")) .and_then(|v| v.as_u64()) .unwrap_or(0); let (sql, values) = DynamicTableManager::build_filtered_query_sql( &table_name, self.tenant_id, limit, offset, filter_val, None, None, None, ) .map_err(|e| format!("查询构建失败: {}", e))?; // 执行查询 let rt = tokio::runtime::Handle::current(); let rows = rt .block_on(async { use sea_orm::{FromQueryResult, Statement}; #[derive(Debug, FromQueryResult)] struct QueryRow { data: serde_json::Value, } let results = QueryRow::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, sql, values, )) .all(&db) .await .map_err(|e| format!("查询执行失败: {}", e))?; let items: Vec = results.into_iter().map(|r| r.data).collect(); Ok::, String>(items) }) .map_err(|e: String| e)?; serde_json::to_vec(&rows).map_err(|e| e.to_string()) } fn db_update( &mut self, entity: String, id: String, data: Vec, version: i64, ) -> Result, String> { let response = serde_json::json!({ "id": id, "entity": entity, "version": version + 1, "status": "queued", }); self.pending_ops.push(PendingOp::Update { entity, id, data, version, }); serde_json::to_vec(&response).map_err(|e| e.to_string()) } fn db_delete(&mut self, entity: String, id: String) -> Result<(), String> { self.pending_ops.push(PendingOp::Delete { entity, id }); Ok(()) } fn event_publish(&mut self, event_type: String, payload: Vec) -> Result<(), String> { self.pending_ops.push(PendingOp::PublishEvent { event_type, payload, }); Ok(()) } fn config_get(&mut self, key: String) -> Result, String> { self.config_cache .get(&key) .cloned() .ok_or_else(|| format!("配置项 '{}' 未预填充", key)) } fn log_write(&mut self, level: String, message: String) { tracing::info!( plugin = %self.plugin_id, level = %level, "Plugin log: {}", message ); self.logs.push((level, message)); } fn current_user(&mut self) -> Result, String> { Ok(self.current_user_json.clone()) } fn check_permission(&mut self, permission: String) -> Result { Ok(self.permissions.contains(&permission)) } fn numbering_generate(&mut self, rule_key: String) -> Result { let rule = self .numbering_rules .get(&rule_key) .ok_or_else(|| format!("编号规则 '{}' 未声明", rule_key))? .clone(); let db = self.db.clone().ok_or("编号生成需要数据库连接")?; let _tenant_id = self.tenant_id; let plugin_id = self.plugin_id.clone(); let rt = tokio::runtime::Handle::current(); rt.block_on(async { use sea_orm::{ConnectionTrait, FromQueryResult, Statement}; let now = chrono::Utc::now(); let year = now.format("%Y").to_string(); let month = now.format("%m").to_string(); let day = now.format("%d").to_string(); // 计算当前周期的 key(用于 reset_rule 判断) let period_key = match rule.reset_rule.as_str() { "daily" => format!("{}-{}-{}", year, month, day), "monthly" => format!("{}-{}", year, month), "yearly" => year.clone(), _ => String::new(), // "never" — 不需要周期 key }; // 序列表名(使用 sanitize_identifier 防注入) let table_name = format!( "plugin_numbering_seq_{}", crate::dynamic_table::sanitize_identifier(&plugin_id) ); // 确保序列表存在 let create_sql = format!( "CREATE TABLE IF NOT EXISTS {} (\ rule_key VARCHAR(255) NOT NULL, \ period_key VARCHAR(64) NOT NULL DEFAULT '', \ current_val BIGINT NOT NULL DEFAULT 0, \ PRIMARY KEY (rule_key, period_key)\ )", table_name ); db.execute(Statement::from_string( sea_orm::DatabaseBackend::Postgres, create_sql, )) .await .map_err(|e| format!("创建序列表失败: {}", e))?; // 使用 advisory lock 保证并发安全 // lock_id 基于规则名哈希 let lock_id: i64 = { use std::hash::{Hash, Hasher}; let mut hasher = std::collections::hash_map::DefaultHasher::new(); (plugin_id.clone() + &rule_key).hash(&mut hasher); (hasher.finish() as i64).abs() }; let lock_sql = format!("SELECT pg_advisory_xact_lock({})", lock_id); db.execute(Statement::from_string( sea_orm::DatabaseBackend::Postgres, lock_sql, )) .await .map_err(|e| format!("获取锁失败: {}", e))?; // 读取当前值 #[derive(Debug, FromQueryResult)] struct SeqRow { current_val: i64, } let read_sql = format!( "SELECT current_val FROM {} WHERE rule_key = $1 AND period_key = $2", table_name ); let current = SeqRow::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, read_sql, [rule_key.clone().into(), period_key.clone().into()], )) .one(&db) .await .map_err(|e| format!("读取序列失败: {}", e))?; let next_val = current.map(|r| r.current_val + 1).unwrap_or(1); // UPSERT 新值 let upsert_sql = format!( "INSERT INTO {} (rule_key, period_key, current_val) VALUES ($1, $2, $3) \ ON CONFLICT (rule_key, period_key) DO UPDATE SET current_val = $3", table_name ); db.execute(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, upsert_sql, [ rule_key.clone().into(), period_key.clone().into(), next_val.into(), ], )) .await .map_err(|e| format!("更新序列失败: {}", e))?; let seq_str = format!("{:0>width$}", next_val, width = rule.seq_length as usize); let number = rule .format .replace("{PREFIX}", &rule.prefix) .replace("{YEAR}", &year) .replace("{MONTH}", &month) .replace("{DAY}", &day) .replace(&format!("{{SEQ:{}}}", rule.seq_length), &seq_str) .replace("{SEQ}", &seq_str); Ok(number) }) } fn setting_get(&mut self, key: String) -> Result, String> { let config = self .plugin_config .as_object() .ok_or("插件配置不是有效对象")?; let value = config.get(&key).cloned().unwrap_or(serde_json::Value::Null); serde_json::to_vec(&value).map_err(|e| e.to_string()) } }