Files
hms/crates/erp-plugin/src/host.rs
iven 6d5a711d2c
Some checks failed
CI / rust-check (push) Has been cancelled
CI / rust-test (push) Has been cancelled
CI / frontend-build (push) Has been cancelled
CI / security-audit (push) Has been cancelled
fix: 修复测试发现的 7 个问题 + 全 workspace clippy 清零
功能修复:
1. 患者创建空名称验证:后端添加 name.trim().is_empty() 检查
2. 仪表盘统计容错:单个查询失败返回零值而非 500
3. FHIR 路由修复:从 /fhir 移到 /api/v1/fhir 保持一致
4. 冻结模块后端中间件:新增 frozen_module_middleware 拦截冻结路径
5. 积分端点权限码:health.health-data.list → health.points.list
6. 角色权限迁移:护士补充 devices.list,运营补充 points.list/manage
7. 测试结果文档:R01-R05 角色测试 + T00/T10 结果归档

Clippy 全 workspace 清零(14→0 errors):
- erp-core: 修复 empty doc line、collapsible if、redundant closure 等 9 处
- erp-health: 修复 too_many_arguments、unused var、unnecessary parens 等 58 处
- erp-ai: 修复 dead_code、unused import 等 11 处
- erp-plugin: 修复 too_many_arguments、wildcard pattern 等 11 处
- erp-server-migration: 修复 enum_variant_names 5 处
- erp-auth/config/workflow/message: 各 1-3 处

工程改进:
- lint-staged 配置迁移到 .lintstagedrc.js(函数式避免文件列表传给 clippy)
- cargo fmt 统一格式化
2026-05-07 23:43:14 +08:00

439 lines
14 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use std::collections::HashMap;
use sea_orm::DatabaseConnection;
use uuid::Uuid;
use wasmtime::StoreLimits;
use crate::dynamic_table::DynamicTableManager;
use crate::engine::PluginEngine;
use crate::erp::plugin::host_api;
/// 待刷新的写操作
#[derive(Debug)]
pub enum PendingOp {
Insert {
id: String,
entity: String,
data: Vec<u8>,
},
Update {
entity: String,
id: String,
data: Vec<u8>,
version: i64,
},
Delete {
entity: String,
id: String,
},
PublishEvent {
event_type: String,
payload: Vec<u8>,
},
}
/// Host 端状态 — 绑定到每个 WASM Store 实例
///
/// 支持两种执行模式:
/// - **预填充模式**db = None读操作从预填充缓存取向后兼容
/// - **混合执行模式**db = Some读操作走实时 SQL + 写操作保持延迟批量
pub struct HostState {
pub(crate) limits: StoreLimits,
#[allow(dead_code)]
pub(crate) tenant_id: Uuid,
#[allow(dead_code)]
pub(crate) user_id: Uuid,
pub(crate) permissions: Vec<String>,
pub(crate) plugin_id: String,
// 预填充的读取缓存(向后兼容)
pub(crate) query_results: HashMap<String, Vec<u8>>,
pub(crate) config_cache: HashMap<String, Vec<u8>>,
pub(crate) current_user_json: Vec<u8>,
// 待刷新的写操作
pub(crate) pending_ops: Vec<PendingOp>,
// 日志
pub(crate) logs: Vec<(String, String)>,
// 混合执行模式:数据库连接和事件总线
pub(crate) db: Option<DatabaseConnection>,
pub(crate) event_bus: Option<erp_core::events::EventBus>,
// 跨插件实体映射:"erp-crm.customer" → "plugin_erp_crm__customer"
pub(crate) cross_plugin_entities: HashMap<String, String>,
// 编号规则映射:"invoice" → "INV-{YEAR}-{SEQ:4}"
pub(crate) numbering_rules: HashMap<String, NumberingRule>,
// 插件配置值
pub(crate) plugin_config: serde_json::Value,
}
/// 编号规则缓存
#[derive(Debug, Clone)]
pub struct NumberingRule {
pub prefix: String,
pub format: String,
pub seq_length: u32,
pub reset_rule: String,
}
impl HostState {
pub fn new(
plugin_id: String,
tenant_id: Uuid,
user_id: Uuid,
permissions: Vec<String>,
) -> Self {
let current_user = serde_json::json!({
"id": user_id.to_string(),
"tenant_id": tenant_id.to_string(),
});
Self {
limits: wasmtime::StoreLimitsBuilder::new().build(),
tenant_id,
user_id,
permissions,
plugin_id,
query_results: HashMap::new(),
config_cache: HashMap::new(),
current_user_json: serde_json::to_vec(&current_user).unwrap_or_default(),
pending_ops: Vec::new(),
logs: Vec::new(),
db: None,
event_bus: None,
cross_plugin_entities: HashMap::new(),
numbering_rules: HashMap::new(),
plugin_config: serde_json::json!({}),
}
}
/// 创建带数据库连接的 HostState混合执行模式
pub fn new_with_db(
plugin_id: String,
tenant_id: Uuid,
user_id: Uuid,
permissions: Vec<String>,
db: DatabaseConnection,
event_bus: erp_core::events::EventBus,
) -> Self {
let mut state = Self::new(plugin_id, tenant_id, user_id, permissions);
state.db = Some(db);
state.event_bus = Some(event_bus);
state
}
}
// 实现 bindgen 生成的 Host trait — 插件调用 Host API 的入口
impl host_api::Host for HostState {
fn db_insert(&mut self, entity: String, data: Vec<u8>) -> Result<Vec<u8>, String> {
let id = Uuid::now_v7().to_string();
let response = serde_json::json!({
"id": id,
"entity": entity,
"status": "queued",
});
self.pending_ops.push(PendingOp::Insert {
id: id.clone(),
entity,
data,
});
serde_json::to_vec(&response).map_err(|e| e.to_string())
}
fn db_query(
&mut self,
entity: String,
filter: Vec<u8>,
pagination: Vec<u8>,
) -> Result<Vec<u8>, String> {
// 预填充模式(向后兼容)
if self.db.is_none() {
return self
.query_results
.get(&entity)
.cloned()
.ok_or_else(|| format!("实体 '{}' 的查询结果未预填充", entity));
}
let db = self.db.clone().ok_or("数据库连接不可用")?;
let event_bus = self.event_bus.clone().ok_or("事件总线不可用")?;
// 先 flush pending writes确保读后写一致性
let ops = std::mem::take(&mut self.pending_ops);
if !ops.is_empty() {
let rt = tokio::runtime::Handle::current();
rt.block_on(PluginEngine::flush_ops(
&db,
&self.plugin_id,
ops,
self.tenant_id,
self.user_id,
&event_bus,
))
.map_err(|e| format!("flush pending ops 失败: {}", e))?;
}
// 解析 filter 和 pagination
let filter_val: Option<serde_json::Value> = if filter.is_empty() {
None
} else {
serde_json::from_slice(&filter).ok()
};
let pagination_val: Option<serde_json::Value> = if pagination.is_empty() {
None
} else {
serde_json::from_slice(&pagination).ok()
};
// 构建查询 — 支持点分记号跨插件查询(如 "erp-crm.customer"
let table_name = if entity.contains('.') {
self.cross_plugin_entities
.get(&entity)
.cloned()
.ok_or_else(|| format!("跨插件实体 '{}' 未注册", entity))?
} else {
DynamicTableManager::table_name(&self.plugin_id, &entity)
};
let limit = pagination_val
.as_ref()
.and_then(|p| p.get("limit"))
.and_then(|v| v.as_u64())
.unwrap_or(50);
let offset = pagination_val
.as_ref()
.and_then(|p| p.get("offset"))
.and_then(|v| v.as_u64())
.unwrap_or(0);
let (sql, values) = DynamicTableManager::build_filtered_query_sql(
&table_name,
self.tenant_id,
limit,
offset,
filter_val,
None,
None,
None,
)
.map_err(|e| format!("查询构建失败: {}", e))?;
// 执行查询
let rt = tokio::runtime::Handle::current();
let rows = rt
.block_on(async {
use sea_orm::{FromQueryResult, Statement};
#[derive(Debug, FromQueryResult)]
struct QueryRow {
data: serde_json::Value,
}
let results = QueryRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
values,
))
.all(&db)
.await
.map_err(|e| format!("查询执行失败: {}", e))?;
let items: Vec<serde_json::Value> = results.into_iter().map(|r| r.data).collect();
Ok::<Vec<serde_json::Value>, String>(items)
})
.map_err(|e: String| e)?;
serde_json::to_vec(&rows).map_err(|e| e.to_string())
}
fn db_update(
&mut self,
entity: String,
id: String,
data: Vec<u8>,
version: i64,
) -> Result<Vec<u8>, String> {
let response = serde_json::json!({
"id": id,
"entity": entity,
"version": version + 1,
"status": "queued",
});
self.pending_ops.push(PendingOp::Update {
entity,
id,
data,
version,
});
serde_json::to_vec(&response).map_err(|e| e.to_string())
}
fn db_delete(&mut self, entity: String, id: String) -> Result<(), String> {
self.pending_ops.push(PendingOp::Delete { entity, id });
Ok(())
}
fn event_publish(&mut self, event_type: String, payload: Vec<u8>) -> Result<(), String> {
self.pending_ops.push(PendingOp::PublishEvent {
event_type,
payload,
});
Ok(())
}
fn config_get(&mut self, key: String) -> Result<Vec<u8>, String> {
self.config_cache
.get(&key)
.cloned()
.ok_or_else(|| format!("配置项 '{}' 未预填充", key))
}
fn log_write(&mut self, level: String, message: String) {
tracing::info!(
plugin = %self.plugin_id,
level = %level,
"Plugin log: {}",
message
);
self.logs.push((level, message));
}
fn current_user(&mut self) -> Result<Vec<u8>, String> {
Ok(self.current_user_json.clone())
}
fn check_permission(&mut self, permission: String) -> Result<bool, String> {
Ok(self.permissions.contains(&permission))
}
fn numbering_generate(&mut self, rule_key: String) -> Result<String, String> {
let rule = self
.numbering_rules
.get(&rule_key)
.ok_or_else(|| format!("编号规则 '{}' 未声明", rule_key))?
.clone();
let db = self.db.clone().ok_or("编号生成需要数据库连接")?;
let _tenant_id = self.tenant_id;
let plugin_id = self.plugin_id.clone();
let rt = tokio::runtime::Handle::current();
rt.block_on(async {
use sea_orm::{ConnectionTrait, FromQueryResult, Statement};
let now = chrono::Utc::now();
let year = now.format("%Y").to_string();
let month = now.format("%m").to_string();
let day = now.format("%d").to_string();
// 计算当前周期的 key用于 reset_rule 判断)
let period_key = match rule.reset_rule.as_str() {
"daily" => format!("{}-{}-{}", year, month, day),
"monthly" => format!("{}-{}", year, month),
"yearly" => year.clone(),
_ => String::new(), // "never" — 不需要周期 key
};
// 序列表名(使用 sanitize_identifier 防注入)
let table_name = format!(
"plugin_numbering_seq_{}",
crate::dynamic_table::sanitize_identifier(&plugin_id)
);
// 确保序列表存在
let create_sql = format!(
"CREATE TABLE IF NOT EXISTS {} (\
rule_key VARCHAR(255) NOT NULL, \
period_key VARCHAR(64) NOT NULL DEFAULT '', \
current_val BIGINT NOT NULL DEFAULT 0, \
PRIMARY KEY (rule_key, period_key)\
)",
table_name
);
db.execute(Statement::from_string(
sea_orm::DatabaseBackend::Postgres,
create_sql,
))
.await
.map_err(|e| format!("创建序列表失败: {}", e))?;
// 使用 advisory lock 保证并发安全
// lock_id 基于规则名哈希
let lock_id: i64 = {
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
(plugin_id.clone() + &rule_key).hash(&mut hasher);
(hasher.finish() as i64).abs()
};
let lock_sql = format!("SELECT pg_advisory_xact_lock({})", lock_id);
db.execute(Statement::from_string(
sea_orm::DatabaseBackend::Postgres,
lock_sql,
))
.await
.map_err(|e| format!("获取锁失败: {}", e))?;
// 读取当前值
#[derive(Debug, FromQueryResult)]
struct SeqRow {
current_val: i64,
}
let read_sql = format!(
"SELECT current_val FROM {} WHERE rule_key = $1 AND period_key = $2",
table_name
);
let current = SeqRow::find_by_statement(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
read_sql,
[rule_key.clone().into(), period_key.clone().into()],
))
.one(&db)
.await
.map_err(|e| format!("读取序列失败: {}", e))?;
let next_val = current.map(|r| r.current_val + 1).unwrap_or(1);
// UPSERT 新值
let upsert_sql = format!(
"INSERT INTO {} (rule_key, period_key, current_val) VALUES ($1, $2, $3) \
ON CONFLICT (rule_key, period_key) DO UPDATE SET current_val = $3",
table_name
);
db.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
upsert_sql,
[
rule_key.clone().into(),
period_key.clone().into(),
next_val.into(),
],
))
.await
.map_err(|e| format!("更新序列失败: {}", e))?;
let seq_str = format!("{:0>width$}", next_val, width = rule.seq_length as usize);
let number = rule
.format
.replace("{PREFIX}", &rule.prefix)
.replace("{YEAR}", &year)
.replace("{MONTH}", &month)
.replace("{DAY}", &day)
.replace(&format!("{{SEQ:{}}}", rule.seq_length), &seq_str)
.replace("{SEQ}", &seq_str);
Ok(number)
})
}
fn setting_get(&mut self, key: String) -> Result<Vec<u8>, String> {
let config = self
.plugin_config
.as_object()
.ok_or("插件配置不是有效对象")?;
let value = config.get(&key).cloned().unwrap_or(serde_json::Value::Null);
serde_json::to_vec(&value).map_err(|e| e.to_string())
}
}