diff --git a/Cargo.lock b/Cargo.lock index 29f0902..b8a034e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1442,15 +1442,18 @@ name = "erp-message" version = "0.1.0" dependencies = [ "anyhow", + "async-stream", "async-trait", "axum", "chrono", "erp-core", + "futures", "sea-orm", "serde", "serde_json", "thiserror 2.0.18", "tokio", + "tokio-stream", "tracing", "utoipa", "uuid", @@ -1587,6 +1590,7 @@ dependencies = [ "axum", "chrono", "erp-core", + "reqwest", "sea-orm", "serde", "serde_json", diff --git a/apps/web/src/components/NotificationPanel.tsx b/apps/web/src/components/NotificationPanel.tsx index c7064ec..0133288 100644 --- a/apps/web/src/components/NotificationPanel.tsx +++ b/apps/web/src/components/NotificationPanel.tsx @@ -21,10 +21,14 @@ export default function NotificationPanel() { if (initializedRef.current) return; initializedRef.current = true; - const { fetchUnreadCount, fetchRecentMessages } = useMessageStore.getState(); + const { fetchUnreadCount, fetchRecentMessages, connectSSE } = useMessageStore.getState(); fetchUnreadCount(); fetchRecentMessages(); + // SSE 实时推送,收到消息即刷新 + const disconnectSSE = connectSSE(); + + // 降级轮询(SSE 断开时兜底) const interval = setInterval(() => { fetchUnreadCount(); fetchRecentMessages(); @@ -32,6 +36,7 @@ export default function NotificationPanel() { return () => { clearInterval(interval); + disconnectSSE(); initializedRef.current = false; }; }, []); diff --git a/apps/web/src/pages/PluginAdmin.tsx b/apps/web/src/pages/PluginAdmin.tsx index cd0c3b5..b4b1f0d 100644 --- a/apps/web/src/pages/PluginAdmin.tsx +++ b/apps/web/src/pages/PluginAdmin.tsx @@ -240,7 +240,7 @@ export default function PluginAdmin() { title="确定要清除该插件记录吗?" onConfirm={() => handleAction(record.id, async () => { await purgePlugin(record.id); return record; }, '清除')} > - diff --git a/apps/web/src/pages/messages/NotificationPreferences.tsx b/apps/web/src/pages/messages/NotificationPreferences.tsx index 4bdb339..5e22804 100644 --- a/apps/web/src/pages/messages/NotificationPreferences.tsx +++ b/apps/web/src/pages/messages/NotificationPreferences.tsx @@ -30,6 +30,14 @@ export default function NotificationPreferences() { dnd_end: values.dnd_range?.[1]?.format('HH:mm'), }; + if (req.dnd_enabled && req.dnd_start && req.dnd_end) { + if (req.dnd_start >= req.dnd_end) { + message.error('免打扰开始时间必须早于结束时间'); + setLoading(false); + return; + } + } + await client.put('/message-subscriptions', { dnd_enabled: req.dnd_enabled, dnd_start: req.dnd_start, @@ -63,7 +71,11 @@ export default function NotificationPreferences() { {dndEnabled && ( - + )} diff --git a/apps/web/src/pages/settings/AuditLogViewer.tsx b/apps/web/src/pages/settings/AuditLogViewer.tsx index e1aa37f..59d1daf 100644 --- a/apps/web/src/pages/settings/AuditLogViewer.tsx +++ b/apps/web/src/pages/settings/AuditLogViewer.tsx @@ -1,20 +1,33 @@ -import { useState, useEffect, useCallback } from 'react'; +import { useState, useEffect, useCallback, useRef } from 'react'; import { Table, Select, Input, Tag, message } from 'antd'; import type { ColumnsType, TablePaginationConfig } from 'antd/es/table'; import { listAuditLogs, type AuditLogItem, type AuditLogQuery } from '../../api/auditLogs'; +import { listUsers } from '../../api/users'; import { useThemeMode } from '../../hooks/useThemeMode'; const RESOURCE_TYPE_OPTIONS = [ { value: 'user', label: '用户' }, { value: 'role', label: '角色' }, + { value: 'position', label: '岗位' }, { value: 'organization', label: '组织' }, { value: 'department', label: '部门' }, - { value: 'position', label: '岗位' }, { value: 'process_instance', label: '流程实例' }, + { value: 'process_definition', label: '流程定义' }, + { value: 'task', label: '流程任务' }, { value: 'dictionary', label: '字典' }, { value: 'menu', label: '菜单' }, { value: 'setting', label: '设置' }, { value: 'numbering_rule', label: '编号规则' }, + { value: 'patient', label: '患者' }, + { value: 'patient_tag', label: '患者标签' }, + { value: 'patient_family_member', label: '家庭成员' }, + { value: 'patient_doctor_relation', label: '医患关系' }, + { value: 'points_transaction', label: '积分流水' }, + { value: 'points_product', label: '积分商品' }, + { value: 'points_order', label: '积分订单' }, + { value: 'points_rule', label: '积分规则' }, + { value: 'offline_event', label: '线下活动' }, + { value: 'offline_event_registration', label: '活动签到' }, ]; const ACTION_STYLES: Record = { @@ -40,6 +53,8 @@ export default function AuditLogViewer() { const [loading, setLoading] = useState(false); const [query, setQuery] = useState({ page: 1, page_size: 20 }); const isDark = useThemeMode(); + const userNameCache = useRef>({}); + const cacheLoaded = useRef(false); const fetchLogs = useCallback(async (params: AuditLogQuery) => { setLoading(true); @@ -53,6 +68,34 @@ export default function AuditLogViewer() { setLoading(false); }, []); + // 加载用户名称缓存(分页遍历所有用户) + useEffect(() => { + if (cacheLoaded.current) return; + let cancelled = false; + const loadAllUsers = async () => { + try { + let currentPage = 1; + const pageSize = 100; + let hasMore = true; + while (hasMore && !cancelled) { + const result = await listUsers(currentPage, pageSize); + for (const user of result.data) { + userNameCache.current[user.id] = user.display_name || user.username; + } + hasMore = result.data.length >= pageSize; + currentPage += 1; + } + if (!cancelled) { + cacheLoaded.current = true; + } + } catch { + // 静默失败,将显示 UUID + } + }; + loadAllUsers(); + return () => { cancelled = true; }; + }, []); + useEffect(() => { fetchLogs(query); }, [query, fetchLogs]); @@ -126,11 +169,14 @@ export default function AuditLogViewer() { key: 'user_id', width: 200, ellipsis: true, - render: (v: string) => ( - - {v} - - ), + render: (v: string) => { + const name = userNameCache.current[v]; + return ( + + {name || v} + + ); + }, }, { title: '时间', diff --git a/apps/web/src/pages/settings/ChangePassword.tsx b/apps/web/src/pages/settings/ChangePassword.tsx index 167fa12..c2aae72 100644 --- a/apps/web/src/pages/settings/ChangePassword.tsx +++ b/apps/web/src/pages/settings/ChangePassword.tsx @@ -56,12 +56,20 @@ export default function ChangePassword() { label="新密码" rules={[ { required: true, message: '请输入新密码' }, - { min: 6, message: '密码长度不能少于6位' }, + { min: 8, message: '密码长度不能少于8位' }, + ({ getFieldValue }) => ({ + validator(_, value) { + if (!value || getFieldValue('current_password') !== value) { + return Promise.resolve(); + } + return Promise.reject(new Error('新密码不能与当前密码相同')); + }, + }), ]} > } - placeholder="请输入新密码(至少6位)" + placeholder="请输入新密码(至少8位)" /> nodes 映射 + const definitionCache = useRef>({}); + + const resolveNodeNames = useCallback((defId: string, nodeIds: string[]): string => { + const nodes = definitionCache.current[defId]; + if (!nodes) return nodeIds.join(', '); + return nodeIds + .map((nid) => { + const node = nodes.find((n) => n.id === nid); + return node ? node.name : nid; + }) + .join(', '); + }, []); + + // 加载当前页实例对应的流程定义 + const loadDefinitions = useCallback(async (instances: ProcessInstanceInfo[]) => { + const uncached = new Set(); + for (const inst of instances) { + if (!definitionCache.current[inst.definition_id]) { + uncached.add(inst.definition_id); + } + } + await Promise.all( + Array.from(uncached).map(async (defId) => { + try { + const def = await getProcessDefinition(defId); + definitionCache.current[defId] = def.nodes; + } catch { + // 静默,将显示原始 node_id + } + }), + ); + }, []); + + // 触发重渲染当定义缓存更新后 + const [, forceUpdate] = useState(0); + useEffect(() => { + if (data.length > 0) { + loadDefinitions(data).then(() => forceUpdate((n) => n + 1)); + } + // 仅在 data 变化时触发,避免无限循环 + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [data]); + const fetchData = useCallback(async () => { setLoading(true); try { @@ -146,7 +190,12 @@ export default function InstanceMonitor() { title: '当前节点', key: 'current_nodes', width: 150, - render: (_, record) => record.active_tokens.map(t => t.node_id).join(', ') || '-', + render: (_, record) => { + const nodeIds = record.active_tokens.map((t) => t.node_id); + if (nodeIds.length === 0) return '-'; + const resolved = resolveNodeNames(record.definition_id, nodeIds); + return {resolved}; + }, }, { title: '发起时间', diff --git a/apps/web/src/stores/message.ts b/apps/web/src/stores/message.ts index 25ac514..e7510a0 100644 --- a/apps/web/src/stores/message.ts +++ b/apps/web/src/stores/message.ts @@ -7,6 +7,7 @@ interface MessageState { fetchUnreadCount: () => Promise; fetchRecentMessages: () => Promise; markAsRead: (id: string) => Promise; + connectSSE: () => () => void; } // 请求去重:记录正在进行的请求,防止并发重复调用 @@ -68,4 +69,27 @@ export const useMessageStore = create((set, get) => ({ set({ unreadCount: prev.unreadCount, recentMessages: prev.recentMessages }); } }, + + connectSSE: () => { + const baseUrl = import.meta.env.VITE_API_BASE_URL || '/api/v1'; + const token = localStorage.getItem('token'); + if (!token) return () => {}; + + const url = `${baseUrl}/messages/stream?token=${encodeURIComponent(token)}`; + const es = new EventSource(url); + + es.addEventListener('message', () => { + // 收到新消息推送,立即刷新未读数和最近消息 + get().fetchUnreadCount(); + get().fetchRecentMessages(); + }); + + es.onerror = () => { + // SSE 连接断开时 EventSource 会自动重连 + }; + + return () => { + es.close(); + }; + }, })); diff --git a/crates/erp-auth/src/service/dept_service.rs b/crates/erp-auth/src/service/dept_service.rs index 793ea35..5989894 100644 --- a/crates/erp-auth/src/service/dept_service.rs +++ b/crates/erp-auth/src/service/dept_service.rs @@ -81,6 +81,19 @@ impl DeptService { } } + // Check name uniqueness within the same organization + let name_exists = department::Entity::find() + .filter(department::Column::TenantId.eq(tenant_id)) + .filter(department::Column::OrgId.eq(org_id)) + .filter(department::Column::Name.eq(&req.name)) + .filter(department::Column::DeletedAt.is_null()) + .one(db) + .await + .map_err(|e| AuthError::Validation(e.to_string()))?; + if name_exists.is_some() { + return Err(AuthError::Validation("部门名称已存在".to_string())); + } + // Compute path from parent department or organization root let path = if let Some(parent_id) = req.parent_id { let parent = department::Entity::find_by_id(parent_id) @@ -192,6 +205,24 @@ impl DeptService { } } + // If name is being changed, check uniqueness within the same org (exclude self) + if let Some(ref new_name) = req.name + && new_name != &model.name + { + let name_exists = department::Entity::find() + .filter(department::Column::TenantId.eq(tenant_id)) + .filter(department::Column::OrgId.eq(model.org_id)) + .filter(department::Column::Name.eq(new_name.as_str())) + .filter(department::Column::DeletedAt.is_null()) + .filter(department::Column::Id.ne(id)) + .one(db) + .await + .map_err(|e| AuthError::Validation(e.to_string()))?; + if name_exists.is_some() { + return Err(AuthError::Validation("部门名称已存在".to_string())); + } + } + let next_ver = check_version(req.version, model.version) .map_err(|e| AuthError::Validation(e.to_string()))?; diff --git a/crates/erp-auth/src/service/org_service.rs b/crates/erp-auth/src/service/org_service.rs index 8c82d90..d169101 100644 --- a/crates/erp-auth/src/service/org_service.rs +++ b/crates/erp-auth/src/service/org_service.rs @@ -69,6 +69,18 @@ impl OrgService { } } + // Check name uniqueness within tenant + let name_exists = organization::Entity::find() + .filter(organization::Column::TenantId.eq(tenant_id)) + .filter(organization::Column::Name.eq(&req.name)) + .filter(organization::Column::DeletedAt.is_null()) + .one(db) + .await + .map_err(|e| AuthError::Validation(e.to_string()))?; + if name_exists.is_some() { + return Err(AuthError::Validation("组织名称已存在".to_string())); + } + let (path, level) = if let Some(parent_id) = req.parent_id { let parent = organization::Entity::find_by_id(parent_id) .one(db) @@ -174,6 +186,23 @@ impl OrgService { } } + // If name is being changed, check uniqueness (exclude self) + if let Some(ref new_name) = req.name + && new_name != &model.name + { + let name_exists = organization::Entity::find() + .filter(organization::Column::TenantId.eq(tenant_id)) + .filter(organization::Column::Name.eq(new_name.as_str())) + .filter(organization::Column::DeletedAt.is_null()) + .filter(organization::Column::Id.ne(id)) + .one(db) + .await + .map_err(|e| AuthError::Validation(e.to_string()))?; + if name_exists.is_some() { + return Err(AuthError::Validation("组织名称已存在".to_string())); + } + } + let next_ver = check_version(req.version, model.version) .map_err(|e| AuthError::Validation(e.to_string()))?; diff --git a/crates/erp-message/Cargo.toml b/crates/erp-message/Cargo.toml index f763661..d535d6b 100644 --- a/crates/erp-message/Cargo.toml +++ b/crates/erp-message/Cargo.toml @@ -18,3 +18,6 @@ thiserror.workspace = true utoipa = { workspace = true, features = ["uuid", "chrono"] } async-trait.workspace = true validator.workspace = true +futures.workspace = true +tokio-stream.workspace = true +async-stream.workspace = true diff --git a/crates/erp-message/src/handler/mod.rs b/crates/erp-message/src/handler/mod.rs index c4966bf..2c0a2ed 100644 --- a/crates/erp-message/src/handler/mod.rs +++ b/crates/erp-message/src/handler/mod.rs @@ -1,3 +1,4 @@ pub mod message_handler; +pub mod sse_handler; pub mod subscription_handler; pub mod template_handler; diff --git a/crates/erp-message/src/handler/sse_handler.rs b/crates/erp-message/src/handler/sse_handler.rs new file mode 100644 index 0000000..e04256d --- /dev/null +++ b/crates/erp-message/src/handler/sse_handler.rs @@ -0,0 +1,53 @@ +use std::convert::Infallible; + +use axum::extract::Extension; +use axum::response::sse::{Event, KeepAlive, Sse}; +use futures::stream::Stream; + +use erp_core::error::AppError; +use erp_core::types::TenantContext; + +use crate::message_state::MessageState; + +/// SSE 消息推送端点。 +/// +/// 客户端连接后监听 `message.sent` 事件,仅推送当前用户的消息。 +/// 使用 EventBus 的 filtered subscriber 按前缀过滤事件。 +pub async fn message_stream( + axum::extract::State(state): axum::extract::State, + Extension(ctx): Extension, +) -> Result>>, AppError> { + let user_id = ctx.user_id; + let tenant_id = ctx.tenant_id; + let (mut rx, _handle) = state.event_bus.subscribe_filtered("message.sent".to_string()); + + let sse_stream = async_stream::stream! { + loop { + match rx.recv().await { + Some(event) => { + if event.tenant_id != tenant_id { + continue; + } + let is_recipient = event.payload.get("recipient_id") + .and_then(|v: &serde_json::Value| v.as_str()) + .map(|s| s == user_id.to_string()) + .unwrap_or(false); + if !is_recipient { + continue; + } + + let data = serde_json::to_string(&event.payload) + .unwrap_or_default(); + yield Ok(Event::default() + .event("message") + .data(data)); + } + None => { + break; + } + } + } + }; + + Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default())) +} diff --git a/crates/erp-message/src/module.rs b/crates/erp-message/src/module.rs index 6e4b146..9a2182b 100644 --- a/crates/erp-message/src/module.rs +++ b/crates/erp-message/src/module.rs @@ -10,7 +10,7 @@ use erp_core::events::EventBus; use erp_core::module::ErpModule; use crate::entity::message_subscription; -use crate::handler::{message_handler, subscription_handler, template_handler}; +use crate::handler::{message_handler, sse_handler, subscription_handler, template_handler}; /// 消息中心模块,实现 ErpModule trait。 pub struct MessageModule; @@ -36,6 +36,8 @@ impl MessageModule { .route("/messages/{id}/read", put(message_handler::mark_read)) .route("/messages/read-all", put(message_handler::mark_all_read)) .route("/messages/{id}", delete(message_handler::delete_message)) + // SSE 实时推送 + .route("/messages/stream", get(sse_handler::message_stream)) // 模板路由 .route( "/message-templates", diff --git a/crates/erp-message/src/service/message_service.rs b/crates/erp-message/src/service/message_service.rs index 5814e05..f783f82 100644 --- a/crates/erp-message/src/service/message_service.rs +++ b/crates/erp-message/src/service/message_service.rs @@ -1,7 +1,7 @@ use chrono::Utc; use sea_orm::{ ActiveModelTrait, ColumnTrait, ConnectionTrait, DatabaseBackend, EntityTrait, PaginatorTrait, - QueryFilter, Set, Statement, + QueryFilter, Set, Statement, FromQueryResult, }; use uuid::Uuid; @@ -12,6 +12,12 @@ use erp_core::audit::AuditLog; use erp_core::audit_service; use erp_core::events::EventBus; +/// 原始 SQL 查询 user_id 的结果结构体。 +#[derive(Debug, FromQueryResult)] +struct UserIdRow { + user_id: Uuid, +} + /// 消息服务。 pub struct MessageService; @@ -80,6 +86,12 @@ impl MessageService { } /// 发送消息。 + /// + /// 根据 `recipient_type` 执行不同的投递策略: + /// - `"user"` — 单条消息,直接投递给 `recipient_id` 指定的用户。 + /// - `"role"` — 查询 `user_roles` 表,向该角色下的所有用户批量投递。 + /// - `"department"` — 查询 `user_departments` 表,向该部门下的所有用户批量投递。 + /// - `"all"` — 查询 `users` 表,向租户内所有活跃用户批量投递。 pub async fn send( tenant_id: Uuid, sender_id: Uuid, @@ -87,49 +99,79 @@ impl MessageService { db: &sea_orm::DatabaseConnection, event_bus: &EventBus, ) -> MessageResult { - let id = Uuid::now_v7(); let now = Utc::now(); - let model = message::ActiveModel { - id: Set(id), - tenant_id: Set(tenant_id), - template_id: Set(req.template_id), - sender_id: Set(Some(sender_id)), - sender_type: Set("user".to_string()), - recipient_id: Set(req.recipient_id), - recipient_type: Set(req.recipient_type.clone()), - title: Set(req.title.clone()), - body: Set(req.body.clone()), - priority: Set(req.priority.clone()), - business_type: Set(req.business_type.clone()), - business_id: Set(req.business_id), - is_read: Set(false), - read_at: Set(None), - is_archived: Set(false), - archived_at: Set(None), - sent_at: Set(Some(now)), - status: Set("sent".to_string()), - created_at: Set(now), - updated_at: Set(now), - created_by: Set(sender_id), - updated_by: Set(sender_id), - deleted_at: Set(None), - version: Set(1), + // Resolve target user IDs based on recipient type + let recipient_user_ids = match req.recipient_type.as_str() { + "user" => vec![req.recipient_id], + "role" => { + Self::resolve_user_ids_by_role(db, req.recipient_id, tenant_id).await? + } + "department" => { + Self::resolve_user_ids_by_department(db, req.recipient_id, tenant_id).await? + } + "all" => { + Self::resolve_all_active_user_ids(db, tenant_id).await? + } + other => { + return Err(MessageError::Validation(format!( + "不支持的收件人类型: {other}" + ))); + } }; - let inserted = model - .insert(db) + if recipient_user_ids.is_empty() { + return Err(MessageError::Validation( + "没有找到符合条件的收件人".to_string(), + )); + } + + // Build message models for all recipients + let models: Vec = recipient_user_ids + .iter() + .map(|uid| message::ActiveModel { + id: Set(Uuid::now_v7()), + tenant_id: Set(tenant_id), + template_id: Set(req.template_id), + sender_id: Set(Some(sender_id)), + sender_type: Set("user".to_string()), + recipient_id: Set(*uid), + recipient_type: Set("user".to_string()), + title: Set(req.title.clone()), + body: Set(req.body.clone()), + priority: Set(req.priority.clone()), + business_type: Set(req.business_type.clone()), + business_id: Set(req.business_id), + is_read: Set(false), + read_at: Set(None), + is_archived: Set(false), + archived_at: Set(None), + sent_at: Set(Some(now)), + status: Set("sent".to_string()), + created_at: Set(now), + updated_at: Set(now), + created_by: Set(sender_id), + updated_by: Set(sender_id), + deleted_at: Set(None), + version: Set(1), + }) + .collect(); + + // Batch insert all messages + message::Entity::insert_many(models) + .exec(db) .await .map_err(|e| MessageError::Validation(e.to_string()))?; + // Publish one event per batch (summary event) event_bus .publish( erp_core::events::DomainEvent::new( "message.sent", tenant_id, serde_json::json!({ - "message_id": id, - "recipient_id": req.recipient_id, + "recipient_type": req.recipient_type, + "recipient_count": recipient_user_ids.len(), "title": req.title, }), ), @@ -139,12 +181,94 @@ impl MessageService { audit_service::record( AuditLog::new(tenant_id, Some(sender_id), "message.send", "message") - .with_resource_id(id), + .with_changes( + None, + Some(serde_json::json!({ + "recipient_type": req.recipient_type, + "recipient_count": recipient_user_ids.len(), + "title": req.title, + })), + ), db, ) .await; - Ok(Self::model_to_resp(&inserted)) + // Construct a representative response (no row returned from batch insert) + Ok(MessageResp { + id: Uuid::nil(), + tenant_id, + template_id: req.template_id, + sender_id: Some(sender_id), + sender_type: "user".to_string(), + recipient_id: req.recipient_id, + recipient_type: req.recipient_type.clone(), + title: req.title.clone(), + body: req.body.clone(), + priority: req.priority.clone(), + business_type: req.business_type.clone(), + business_id: req.business_id, + is_read: false, + read_at: None, + is_archived: false, + status: "sent".to_string(), + sent_at: Some(now), + created_at: now, + updated_at: now, + version: 1, + }) + } + + /// 根据角色 ID 查询关联的用户 ID 列表(跨模块 raw SQL)。 + async fn resolve_user_ids_by_role( + db: &sea_orm::DatabaseConnection, + role_id: Uuid, + tenant_id: Uuid, + ) -> MessageResult> { + let rows = UserIdRow::find_by_statement(Statement::from_sql_and_values( + DatabaseBackend::Postgres, + "SELECT user_id FROM user_roles WHERE role_id = $1 AND tenant_id = $2 AND deleted_at IS NULL", + [role_id.into(), tenant_id.into()], + )) + .all(db) + .await + .map_err(|e| MessageError::Validation(e.to_string()))?; + + Ok(rows.into_iter().map(|r| r.user_id).collect()) + } + + /// 根据部门 ID 查询关联的用户 ID 列表(跨模块 raw SQL)。 + async fn resolve_user_ids_by_department( + db: &sea_orm::DatabaseConnection, + department_id: Uuid, + tenant_id: Uuid, + ) -> MessageResult> { + let rows = UserIdRow::find_by_statement(Statement::from_sql_and_values( + DatabaseBackend::Postgres, + "SELECT user_id FROM user_departments WHERE department_id = $1 AND tenant_id = $2 AND deleted_at IS NULL", + [department_id.into(), tenant_id.into()], + )) + .all(db) + .await + .map_err(|e| MessageError::Validation(e.to_string()))?; + + Ok(rows.into_iter().map(|r| r.user_id).collect()) + } + + /// 查询租户内所有活跃用户的 ID 列表(跨模块 raw SQL)。 + async fn resolve_all_active_user_ids( + db: &sea_orm::DatabaseConnection, + tenant_id: Uuid, + ) -> MessageResult> { + let rows = UserIdRow::find_by_statement(Statement::from_sql_and_values( + DatabaseBackend::Postgres, + "SELECT id AS user_id FROM users WHERE tenant_id = $1 AND deleted_at IS NULL AND status = 'active'", + [tenant_id.into()], + )) + .all(db) + .await + .map_err(|e| MessageError::Validation(e.to_string()))?; + + Ok(rows.into_iter().map(|r| r.user_id).collect()) } /// 系统发送消息(由事件处理器调用)。 diff --git a/crates/erp-server/src/main.rs b/crates/erp-server/src/main.rs index 4c357c3..6c299cf 100644 --- a/crates/erp-server/src/main.rs +++ b/crates/erp-server/src/main.rs @@ -411,7 +411,7 @@ async fn main() -> anyhow::Result<()> { tracing::info!("Outbox relay started"); // Start timeout checker (scan overdue tasks every 60s) - erp_workflow::WorkflowModule::start_timeout_checker(db.clone()); + erp_workflow::WorkflowModule::start_timeout_checker(db.clone(), event_bus.clone()); tracing::info!("Timeout checker started"); // Start follow-up overdue checker (handled by HealthModule::on_startup) diff --git a/crates/erp-server/tests/integration/workflow_tests.rs b/crates/erp-server/tests/integration/workflow_tests.rs index 8d82c32..a971680 100644 --- a/crates/erp-server/tests/integration/workflow_tests.rs +++ b/crates/erp-server/tests/integration/workflow_tests.rs @@ -26,6 +26,7 @@ fn make_simple_definition(name: &str, key: &str, assignee_id: Option assignee_id: None, candidate_groups: None, service_type: None, + service_config: None, position: None, }, NodeDef { @@ -35,6 +36,7 @@ fn make_simple_definition(name: &str, key: &str, assignee_id: Option assignee_id, candidate_groups: None, service_type: None, + service_config: None, position: None, }, NodeDef { @@ -44,6 +46,7 @@ fn make_simple_definition(name: &str, key: &str, assignee_id: Option assignee_id: None, candidate_groups: None, service_type: None, + service_config: None, position: None, }, ], diff --git a/crates/erp-workflow/Cargo.toml b/crates/erp-workflow/Cargo.toml index 98cf556..e9b145e 100644 --- a/crates/erp-workflow/Cargo.toml +++ b/crates/erp-workflow/Cargo.toml @@ -18,3 +18,4 @@ thiserror.workspace = true utoipa = { workspace = true, features = ["uuid", "chrono"] } async-trait.workspace = true validator.workspace = true +reqwest = { workspace = true, features = ["json"] } diff --git a/crates/erp-workflow/src/dto.rs b/crates/erp-workflow/src/dto.rs index 7f98e5e..1bc7dd6 100644 --- a/crates/erp-workflow/src/dto.rs +++ b/crates/erp-workflow/src/dto.rs @@ -30,6 +30,9 @@ pub struct NodeDef { pub candidate_groups: Option>, #[serde(skip_serializing_if = "Option::is_none")] pub service_type: Option, + /// 服务任务 HTTP 调用配置 + #[serde(skip_serializing_if = "Option::is_none")] + pub service_config: Option, /// 前端渲染位置 #[serde(skip_serializing_if = "Option::is_none")] pub position: Option, @@ -41,6 +44,23 @@ pub struct NodePosition { pub y: f64, } +/// ServiceTask HTTP 调用配置 +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct ServiceTaskConfig { + /// 请求 URL + pub url: String, + /// HTTP 方法(GET / POST),默认 GET + #[serde(default = "default_method")] + pub method: String, + /// POST body 模板(支持从流程变量替换 ${var_name}) + #[serde(skip_serializing_if = "Option::is_none")] + pub body: Option, +} + +fn default_method() -> String { + "GET".to_string() +} + /// 流程图连线定义 #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct EdgeDef { diff --git a/crates/erp-workflow/src/engine/executor.rs b/crates/erp-workflow/src/engine/executor.rs index d33f9db..efc6cb6 100644 --- a/crates/erp-workflow/src/engine/executor.rs +++ b/crates/erp-workflow/src/engine/executor.rs @@ -249,8 +249,7 @@ impl FlowExecutor { .await } NodeType::ServiceTask => { - // ServiceTask 自动执行:当前阶段自动跳过(直接推进到后继节点) - // 创建一个立即消费的 token 记录(用于审计追踪) + // ServiceTask 自动执行 HTTP 调用 let now = Utc::now(); let system_user = uuid::Uuid::nil(); let auto_token_id = Uuid::now_v7(); @@ -274,7 +273,18 @@ impl FlowExecutor { .await .map_err(|e| WorkflowError::Validation(e.to_string()))?; - tracing::info!(node_id = node_id, node_name = %node.name, "ServiceTask 自动跳过(尚未实现 HTTP 调用)"); + // 执行 HTTP 调用(如果配置了 service_config) + let var_name = format!("service_task_{node_id}_result"); + let result_value = Self::execute_service_task(node, variables).await; + // 将结果存储为流程变量 + Self::set_process_variable( + instance_id, + tenant_id, + &var_name, + &result_value, + txn, + ) + .await?; // 沿出边继续推进 let outgoing = graph.get_outgoing_edges(node_id); @@ -444,6 +454,125 @@ impl FlowExecutor { Ok(new_tokens) } + /// 执行 ServiceTask HTTP 调用。 + /// + /// 根据 `service_config` 中的 url/method/body 发起 HTTP 请求。 + /// 如果没有配置 `service_config` 或调用失败,返回错误信息 JSON 而不是阻塞流程。 + async fn execute_service_task( + node: &crate::engine::model::FlowNode, + variables: &HashMap, + ) -> serde_json::Value { + let config = match &node.service_config { + Some(c) => c, + None => { + tracing::warn!( + node_id = &node.id, + node_name = %node.name, + "ServiceTask 没有 service_config 配置,跳过 HTTP 调用" + ); + return serde_json::json!({ + "status": "skipped", + "reason": "未配置 service_config" + }); + } + }; + + let method = config.method.to_uppercase(); + let url = &config.url; + + tracing::info!( + node_id = &node.id, + node_name = %node.name, + method = %method, + url = %url, + "ServiceTask 开始 HTTP 调用" + ); + + let client = reqwest::Client::new(); + let result = match method.as_str() { + "POST" => { + let body = config.body.as_ref().map(|b| { + // 简单变量替换:${var_name} → variables 中的值 + let mut body_str = b.to_string(); + for (key, val) in variables { + let placeholder = format!("${{{key}}}"); + body_str = body_str.replace(&placeholder, &val.to_string()); + } + body_str + }); + client.post(url).body(body.unwrap_or_default()).send().await + } + _ => { + // 默认 GET + client.get(url).send().await + } + }; + + match result { + Ok(resp) => { + let status = resp.status().as_u16(); + let body = resp.text().await.unwrap_or_default(); + tracing::info!( + node_id = &node.id, + status = status, + "ServiceTask HTTP 调用完成" + ); + serde_json::json!({ + "status": "success", + "http_status": status, + "body": body, + }) + } + Err(e) => { + tracing::warn!( + node_id = &node.id, + error = %e, + "ServiceTask HTTP 调用失败(流程继续推进)" + ); + serde_json::json!({ + "status": "error", + "error": e.to_string(), + }) + } + } + } + + /// 将流程变量写入 process_variables 表。 + async fn set_process_variable( + instance_id: Uuid, + tenant_id: Uuid, + name: &str, + value: &serde_json::Value, + txn: &impl ConnectionTrait, + ) -> WorkflowResult<()> { + use crate::entity::process_variable; + + let now = Utc::now(); + let system_user = Uuid::nil(); + let var_model = process_variable::ActiveModel { + id: Set(Uuid::now_v7()), + tenant_id: Set(tenant_id), + instance_id: Set(instance_id), + name: Set(name.to_string()), + var_type: Set("json".to_string()), + value_string: Set(Some(value.to_string())), + value_number: Set(None), + value_boolean: Set(None), + value_date: Set(None), + created_at: Set(now), + updated_at: Set(now), + created_by: Set(system_user), + updated_by: Set(system_user), + deleted_at: Set(None), + version: Set(1), + }; + var_model + .insert(txn) + .await + .map_err(|e| WorkflowError::Validation(e.to_string()))?; + Ok(()) + } + /// 检查实例是否所有 token 都已完成,如果是则完成实例。 async fn check_instance_completion( instance_id: Uuid, diff --git a/crates/erp-workflow/src/engine/model.rs b/crates/erp-workflow/src/engine/model.rs index 69bd2bf..8a37b49 100644 --- a/crates/erp-workflow/src/engine/model.rs +++ b/crates/erp-workflow/src/engine/model.rs @@ -28,6 +28,7 @@ pub struct FlowNode { pub assignee_id: Option, pub candidate_groups: Option>, pub service_type: Option, + pub service_config: Option, } /// 内存中的边模型。 @@ -60,6 +61,7 @@ impl FlowGraph { assignee_id: n.assignee_id, candidate_groups: n.candidate_groups.clone(), service_type: n.service_type.clone(), + service_config: n.service_config.clone(), }; if n.node_type == NodeType::StartEvent { diff --git a/crates/erp-workflow/src/engine/parser.rs b/crates/erp-workflow/src/engine/parser.rs index 2304606..8bdd2af 100644 --- a/crates/erp-workflow/src/engine/parser.rs +++ b/crates/erp-workflow/src/engine/parser.rs @@ -136,6 +136,7 @@ mod tests { assignee_id: None, candidate_groups: None, service_type: None, + service_config: None, position: Some(NodePosition { x: 100.0, y: 100.0 }), } } @@ -148,6 +149,7 @@ mod tests { assignee_id: None, candidate_groups: None, service_type: None, + service_config: None, position: Some(NodePosition { x: 100.0, y: 300.0 }), } } @@ -160,6 +162,7 @@ mod tests { assignee_id: None, candidate_groups: None, service_type: None, + service_config: None, position: None, } } @@ -219,6 +222,7 @@ mod tests { assignee_id: None, candidate_groups: None, service_type: None, + service_config: None, position: None, }, ]; @@ -249,6 +253,7 @@ mod tests { assignee_id: None, candidate_groups: None, service_type: None, + service_config: None, position: None, }, make_end(), diff --git a/crates/erp-workflow/src/engine/timeout.rs b/crates/erp-workflow/src/engine/timeout.rs index 30918f2..235a3a6 100644 --- a/crates/erp-workflow/src/engine/timeout.rs +++ b/crates/erp-workflow/src/engine/timeout.rs @@ -1,7 +1,7 @@ // 超时检查框架 // // TimeoutChecker 定期扫描 tasks 表中已超时但仍处于 pending 状态的任务, -// 以便触发自动完成或升级逻辑(后续迭代实现)。 +// 发布 task.timeout 事件用于升级通知。 use chrono::Utc; use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; @@ -52,4 +52,26 @@ impl TimeoutChecker { Ok(overdue.iter().map(|t| t.id).collect()) } + + /// 查询所有租户中已超时的任务(含详细信息)。 + /// + /// 返回 (task_id, tenant_id, instance_id, assignee_id) 元组, + /// 用于发布 task.timeout 事件。 + pub async fn find_all_overdue_tasks_with_details( + db: &sea_orm::DatabaseConnection, + ) -> WorkflowResult)>> { + let now = Utc::now(); + let overdue = task::Entity::find() + .filter(task::Column::Status.eq("pending")) + .filter(task::Column::DueDate.lt(now)) + .filter(task::Column::DeletedAt.is_null()) + .all(db) + .await + .map_err(|e| crate::error::WorkflowError::Validation(e.to_string()))?; + + Ok(overdue + .iter() + .map(|t| (t.id, t.tenant_id, t.instance_id, t.assignee_id)) + .collect()) + } } diff --git a/crates/erp-workflow/src/handler/task_handler.rs b/crates/erp-workflow/src/handler/task_handler.rs index efbaa4f..e76449c 100644 --- a/crates/erp-workflow/src/handler/task_handler.rs +++ b/crates/erp-workflow/src/handler/task_handler.rs @@ -167,3 +167,33 @@ where Ok(Json(ApiResponse::ok(resp))) } + +#[utoipa::path( + put, + path = "/api/v1/workflow/tasks/{id}/claim", + params(("id" = Uuid, Path, description = "任务ID")), + responses( + (status = 200, description = "认领成功", body = ApiResponse), + (status = 401, description = "未授权"), + (status = 403, description = "权限不足"), + (status = 404, description = "任务不存在"), + ), + security(("bearer_auth" = [])), + tag = "流程任务" +)] +/// PUT /api/v1/workflow/tasks/{id}/claim +pub async fn claim_task( + State(state): State, + Extension(ctx): Extension, + Path(id): Path, +) -> Result>, AppError> +where + WorkflowState: FromRef, + S: Clone + Send + Sync + 'static, +{ + require_permission(&ctx, "workflow.approve")?; + + let resp = TaskService::claim(id, ctx.tenant_id, ctx.user_id, &state.db).await?; + + Ok(Json(ApiResponse::ok(resp))) +} diff --git a/crates/erp-workflow/src/module.rs b/crates/erp-workflow/src/module.rs index 4441811..0a9cab4 100644 --- a/crates/erp-workflow/src/module.rs +++ b/crates/erp-workflow/src/module.rs @@ -1,5 +1,5 @@ use axum::Router; -use axum::routing::{get, post}; +use axum::routing::{get, post, put}; use std::time::Duration; use uuid::Uuid; @@ -83,13 +83,17 @@ impl WorkflowModule { "/workflow/tasks/{id}/delegate", post(task_handler::delegate_task), ) + .route( + "/workflow/tasks/{id}/claim", + put(task_handler::claim_task), + ) } /// 启动超时检查后台任务。 /// /// 每 60 秒扫描一次 tasks 表,查找 due_date 已过期但仍处于 pending 状态的任务。 - /// 发现超时任务时记录 warning 日志,后续迭代将实现自动完成/升级逻辑。 - pub fn start_timeout_checker(db: sea_orm::DatabaseConnection) { + /// 发现超时任务时发布 `task.timeout` 事件到事件总线,并记录 warning 日志。 + pub fn start_timeout_checker(db: sea_orm::DatabaseConnection, event_bus: EventBus) { tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(60)); @@ -99,14 +103,26 @@ impl WorkflowModule { loop { interval.tick().await; - match crate::engine::timeout::TimeoutChecker::find_all_overdue_tasks(&db).await { + match crate::engine::timeout::TimeoutChecker::find_all_overdue_tasks_with_details(&db).await { Ok(overdue) => { if !overdue.is_empty() { tracing::warn!( count = overdue.len(), - task_ids = ?overdue, - "发现超时未完成的任务 — TODO: 实现自动完成/升级逻辑" + "发现超时未完成的任务,发布 task.timeout 事件" ); + for (task_id, tenant_id, instance_id, assignee_id) in &overdue { + // 发布超时事件 + let event = erp_core::events::DomainEvent::new( + "task.timeout", + *tenant_id, + serde_json::json!({ + "task_id": task_id, + "instance_id": instance_id, + "assignee_id": assignee_id, + }), + ); + event_bus.publish(event, &db).await; + } } } Err(e) => { @@ -138,7 +154,140 @@ impl ErpModule for WorkflowModule { vec!["auth"] } - fn register_event_handlers(&self, _bus: &EventBus) {} + fn register_event_handlers(&self, _bus: &EventBus) { + // 事件处理器已迁移到 on_startup(需要 DB 连接),此处保留空实现以兼容 trait 签名 + } + + async fn on_startup( + &self, + ctx: &erp_core::module::ModuleContext, + ) -> erp_core::error::AppResult<()> { + let db = ctx.db.clone(); + let bus = ctx.event_bus.clone(); + + // 订阅 user. 前缀事件,处理 user.deleted + let (mut receiver, _handle) = bus.subscribe_filtered("user.".to_string()); + + tokio::spawn(async move { + loop { + match receiver.recv().await { + Some(event) if event.event_type == "user.deleted" => { + let user_id = match event.payload.get("user_id").and_then(|v| v.as_str()) { + Some(id) => match Uuid::parse_str(id) { + Ok(u) => u, + Err(e) => { + tracing::warn!( + error = %e, + "user.deleted 事件的 user_id 解析失败,跳过" + ); + continue; + } + }, + _ => { + tracing::warn!("user.deleted 事件缺少 user_id 字段,跳过"); + continue; + } + }; + + tracing::info!( + user_id = %user_id, + tenant_id = %event.tenant_id, + "收到 user.deleted 事件,查找并终止相关流程实例" + ); + + // 查找该用户有活跃任务的流程实例 + use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set}; + use chrono::Utc; + + // 查找该用户作为 assignee 的 pending 任务 + let active_tasks = crate::entity::task::Entity::find() + .filter(crate::entity::task::Column::TenantId.eq(event.tenant_id)) + .filter(crate::entity::task::Column::AssigneeId.eq(user_id)) + .filter(crate::entity::task::Column::Status.eq("pending")) + .filter(crate::entity::task::Column::DeletedAt.is_null()) + .all(&db) + .await; + + match active_tasks { + Ok(tasks) if tasks.is_empty() => { + tracing::info!( + user_id = %user_id, + "该用户没有活跃的待办任务,无需终止流程" + ); + } + Ok(tasks) => { + // 收集需要终止的实例 ID + let instance_ids: std::collections::HashSet = + tasks.iter().map(|t| t.instance_id).collect(); + + for instance_id in &instance_ids { + // 将实例状态设置为 terminated + let instance = crate::entity::process_instance::Entity::find_by_id(*instance_id) + .one(&db) + .await; + + if let Ok(Some(inst)) = instance { + if inst.tenant_id == event.tenant_id + && inst.deleted_at.is_none() + && inst.status == "running" + { + let ver = inst.version; + let mut active: crate::entity::process_instance::ActiveModel = inst.into(); + active.status = Set("terminated".to_string()); + active.updated_at = Set(Utc::now()); + active.version = Set(ver + 1); + match active.update(&db).await { + Ok(_) => { + tracing::info!( + instance_id = %instance_id, + "流程实例已终止(用户被删除)" + ); + } + Err(e) => { + tracing::warn!( + instance_id = %instance_id, + error = %e, + "终止流程实例失败" + ); + } + } + } + } + } + tracing::info!( + user_id = %user_id, + instance_count = instance_ids.len(), + task_count = tasks.len(), + "用户删除事件处理完成" + ); + } + Err(e) => { + tracing::warn!( + error = %e, + "查询用户活跃任务失败" + ); + } + } + } + Some(event) => { + // 其他 user. 前缀事件,忽略 + tracing::debug!( + event_type = %event.event_type, + "忽略非 user.deleted 事件" + ); + } + None => { + // 通道关闭,退出循环 + tracing::info!("Workflow 事件订阅通道已关闭"); + break; + } + } + } + }); + + tracing::info!(module = "workflow", "Workflow 事件处理器已注册(监听 user.deleted)"); + Ok(()) + } async fn on_tenant_created( &self, diff --git a/crates/erp-workflow/src/service/task_service.rs b/crates/erp-workflow/src/service/task_service.rs index cb4cc66..4fcb826 100644 --- a/crates/erp-workflow/src/service/task_service.rs +++ b/crates/erp-workflow/src/service/task_service.rs @@ -376,6 +376,52 @@ impl TaskService { Ok(id) } + /// 认领任务:将 pending 状态的任务分配给当前用户。 + /// + /// 适用于 candidate_groups 群组任务池中的任务,用户主动认领后 + /// 任务状态变为 in_progress,assignee_id 设置为认领用户。 + pub async fn claim( + id: Uuid, + tenant_id: Uuid, + user_id: Uuid, + db: &sea_orm::DatabaseConnection, + ) -> WorkflowResult { + let task_model = task::Entity::find_by_id(id) + .one(db) + .await + .map_err(|e| WorkflowError::Validation(e.to_string()))? + .filter(|t| t.tenant_id == tenant_id && t.deleted_at.is_none()) + .ok_or_else(|| WorkflowError::NotFound(format!("任务不存在: {id}")))?; + + if task_model.status != "pending" { + return Err(WorkflowError::InvalidState(format!( + "任务状态不是 pending(当前状态: {}),无法认领", + task_model.status + ))); + } + + let current_version = task_model.version; + let mut active: task::ActiveModel = task_model.into(); + active.assignee_id = Set(Some(user_id)); + active.status = Set("in_progress".to_string()); + active.version = Set(current_version + 1); + active.updated_at = Set(Utc::now()); + active.updated_by = Set(user_id); + + let updated = active + .update(db) + .await + .map_err(|e| WorkflowError::Validation(e.to_string()))?; + + audit_service::record( + AuditLog::new(tenant_id, Some(user_id), "task.claim", "task").with_resource_id(id), + db, + ) + .await; + + Ok(Self::model_to_resp(&updated)) + } + fn model_to_resp(m: &task::Model) -> TaskResp { TaskResp { id: m.id, diff --git a/wiki/architecture.md b/wiki/architecture.md index c2553f7..365bfc1 100644 --- a/wiki/architecture.md +++ b/wiki/architecture.md @@ -46,7 +46,33 @@ HMS 平台 防止并发创建预约时超额。事务内 `UPDATE current_appointments + 1 WHERE current < max`,CAS 成功后才 INSERT 预约记录。 -## 2. 关键文件 + 数据流 +## 2. 项目结构 + +### 目录布局 + +```text +hms/ +├── crates/ # Rust Workspace +│ ├── erp-core/ # L1: 基础类型、错误、事件、模块 trait +│ ├── erp-auth/ # L2: 身份与权限模块 +│ ├── erp-workflow/ # L2: 工作流引擎模块 +│ ├── erp-message/ # L2: 消息中心模块 +│ ├── erp-config/ # L2: 系统配置模块 +│ ├── erp-health/ # L2: 健康管理模块 ★ HMS 核心 +│ └── erp-server/ # L3: Axum 服务入口,组装所有模块 +│ └── migration/ # SeaORM 数据库迁移 +├── apps/ +│ └── web/ # Vite + React 19 SPA (主力前端) +├── packages/ +│ └── ui-components/ # React 共享组件库 +├── desktop/ # (可选) Tauri 桌面端 +├── docker/ # Docker 开发环境配置 +├── docs/ +│ ├── superpowers/specs/ # 设计规格文档 +│ └── discussions/ # 讨论记录 +├── wiki/ # 项目知识库 +└── Cargo.toml # Workspace root +``` ### 模块依赖图 @@ -98,7 +124,76 @@ HMS 平台 | 扩展 ← | [[wasm-plugin]] | 插件通过 Host Bridge 桥接 | | 业务 ← | [[erp-health]] | 健康模块原生集成 | -## 3. 代码逻辑 +## 3. 模块开发规范 + +### 新建业务模块清单 + +每个新模块**必须**包含: + +1. `Cargo.toml` — 依赖 `erp-core` +2. `src/lib.rs` — 模块入口,实现 `ErpModule` trait +3. `src/error.rs` — 模块错误类型,wrap `AppError` +4. `src/entity/` — SeaORM Entity 定义 +5. `src/service/` — 业务逻辑层 +6. `src/handler/` — Axum 路由处理器 +7. `src/event.rs` — 模块事件定义和处理器 + +### ErpModule trait 实现 + +```rust +pub struct AuthModule; + +impl ErpModule for AuthModule { + fn name(&self) -> &str { "auth" } + fn version(&self) -> &str { env!("CARGO_PKG_VERSION") } + fn dependencies(&self) -> Vec<&str> { vec![] } + + fn register_routes(&self, router: Router) -> Router { + router.nest("/api/v1", auth_routes()) + } + + fn register_event_handlers(&self, bus: &EventBus) { /* 订阅其他模块事件 */ } + + async fn on_tenant_created(&self, tenant_id: Uuid) -> AppResult<()> { Ok(()) } +} +``` + +### 数据库迁移规范 + +- 迁移文件放在 `crates/erp-server/migration/src/` +- 命名格式:`m{YYYYMMDD}_{6位序号}_{描述}.rs` +- 必须可回滚(实现 `down` 方法) +- 新增表必须包含所有标准字段(id, tenant_id, created_at, updated_at, created_by, updated_by, deleted_at, version) +- 必须幂等(使用 `if_not_exists`) + +## 4. 安全注意事项 + +### 认证安全 + +- **密码存储**: Argon2 哈希,禁止明文 +- **JWT**: access token 15min + refresh token 7d +- **Refresh Token 轮换**: 每次使用后签发新的,旧的作废 +- **Token 存储**: 桌面端使用 Tauri secure store +- **密码修改**: 使所有已签发的 JWT 失效 + +### 多租户安全 + +- **中间件注入**: `tenant_id` 从 JWT 中提取,应用层不可伪造 +- **数据隔离**: 所有查询自动过滤 `tenant_id` +- **越权防护**: 禁止跨租户数据访问 +- **租户 provisioning**: `on_tenant_created` 钩子初始化数据 + +### 通用安全 + +- 不硬编码密钥 — 使用环境变量或配置文件 +- 用户输入验证 — 所有 API 端点验证输入 +- SQL 注入防护 — SeaORM 参数化查询 +- 限流 — Redis token bucket +- CORS — 白名单制,默认拒绝 +- 审计日志 — 所有关键操作记录变更前后状态 +- 动态表 SQL — 使用 `sanitize_identifier` 防注入 + +## 5. 代码逻辑 ⚡ **不变量**: 模块间只通过 EventBus 和 trait 通信,无直接依赖 ⚡ **不变量**: 所有数据表必须含 `tenant_id`,查询自动过滤 @@ -108,15 +203,16 @@ HMS 平台 ⚡ **不变量**: 预约创建必须走原子 CAS,不能用 read-then-write ⚡ **不变量**: PII 数据(身份证、手机号)加密存储 + 脱敏展示 -## 4. 活跃问题 + 陷阱 +## 6. 活跃问题 + 陷阱 ⚠️ 当前共享数据库 + tenant_id 过滤,未来可扩展为 Schema 隔离或数据库隔离 ⚠️ EventBus 内存 broadcast 需 outbox 持久化保障(已通过后台任务实现) ⚠️ 微信登录固定到 default_tenant_id — 多租户场景需设计解析策略 -## 5. 变更记录 +## 7. 变更记录 | 日期 | 变更 | |------|------| +| 2026-04-26 | 从 CLAUDE.md 迁移:目录结构、模块开发规范(§5)、安全注意事项(§7) | | 2026-04-25 | 全面更新:6 模块已实现状态表、预约 CAS 决策、PII 加密不变量、健康模块集成 | | 2026-04-23 | 重构为 5 节结构,删除 erp-common 引用,精简技术选型表 | diff --git a/wiki/infrastructure.md b/wiki/infrastructure.md index e245c9f..c631697 100644 --- a/wiki/infrastructure.md +++ b/wiki/infrastructure.md @@ -82,7 +82,41 @@ psql: `D:\postgreSQL\bin\psql.exe -U postgres -h localhost -d erp` | 提供 → | [[testing]] | 测试环境配置 | | 提供 → | [[miniprogram]] | 后端 API + 微信登录 | -## 3. 代码逻辑 +## 3. 常用命令 + +### Rust + +```bash +cargo check # 编译检查 +cargo test --workspace # 运行所有测试 +cargo run -p erp-server # 启动后端服务 +cargo fmt --check # 检查格式 +cargo clippy -- -D warnings # Lint 检查 +``` + +### 前端 + +```bash +cd apps/web && pnpm install # 安装依赖 +cd apps/web && pnpm dev # 开发模式(端口 5174) +cd apps/web && pnpm build # 构建生产版本 +``` + +### 数据库 + +```bash +PGPASSWORD=123123 "D:\postgreSQL\bin\psql.exe" -U postgres -h localhost -d erp # 连接数据库 +``` + +### WASM 插件 + +```bash +cargo build -p erp-plugin-test-sample --target wasm32-unknown-unknown --release +wasm-tools component new target/wasm32-unknown-unknown/release/erp_plugin_test_sample.wasm -o target/erp_plugin_test_sample.component.wasm +cargo test -p erp-plugin-prototype # 运行插件集成测试 +``` + +## 4. 代码逻辑 ### 一键启动(推荐) @@ -110,16 +144,17 @@ cd apps/web && pnpm install && pnpm dev ⚡ **不变量**: 后端必须从 `crates/erp-server/` 目录启动或通过环境变量覆盖所有配置 ⚡ **不变量**: Vite 固定端口 5174(`--strictPort`),前端代理 `/api` → 后端 3000 -## 4. 活跃问题 + 陷阱 +## 5. 活跃问题 + 陷阱 ⚠️ Redis 不可达时限流自动降级为 fail-open(放行所有请求) ⚠️ Docker Compose 配置保留在 `docker/` 下但日常开发不依赖 ⚠️ 首次 `cargo run` 编译整个 workspace 较慢(含 wasmtime),后续增量快 -## 5. 变更记录 +## 6. 变更记录 | 日期 | 变更 | |------|------| +| 2026-04-26 | 从 CLAUDE.md 迁移:常用命令(§9) | | 2026-04-25 | 外部化微信凭据和健康加密密钥为环境变量;添加 4 个新的必设环境变量 | | 2026-04-24 | 添加微信小程序配置信息和集成契约 | | 2026-04-23 | 重构为 5 节结构,确立为连接信息的单一真相源 |