diff --git a/crates/erp-server/migration/src/m20260427_000084_domain_events_cleanup.rs b/crates/erp-server/migration/src/m20260427_000084_domain_events_cleanup.rs index faa0ea1..35d097c 100644 --- a/crates/erp-server/migration/src/m20260427_000084_domain_events_cleanup.rs +++ b/crates/erp-server/migration/src/m20260427_000084_domain_events_cleanup.rs @@ -60,9 +60,13 @@ impl MigrationTrait for Migration { GET DIAGNOSTICS moved_count = ROW_COUNT; DELETE FROM domain_events - WHERE status = 'published' - AND published_at < NOW() - (retention_days || ' days')::INTERVAL - LIMIT batch_size; + WHERE ctid IN ( + SELECT ctid FROM domain_events + WHERE status = 'published' + AND published_at < NOW() - (retention_days || ' days')::INTERVAL + ORDER BY created_at ASC + LIMIT batch_size + ); RETURN moved_count; END; diff --git a/crates/erp-server/migration/src/m20260427_000085_processed_events.rs b/crates/erp-server/migration/src/m20260427_000085_processed_events.rs index b619713..ef26026 100644 --- a/crates/erp-server/migration/src/m20260427_000085_processed_events.rs +++ b/crates/erp-server/migration/src/m20260427_000085_processed_events.rs @@ -32,8 +32,11 @@ impl MigrationTrait for Migration { deleted_count INT; BEGIN DELETE FROM processed_events - WHERE processed_at < NOW() - (retention_days || ' days')::INTERVAL - LIMIT batch_size; + WHERE ctid IN ( + SELECT ctid FROM processed_events + WHERE processed_at < NOW() - (retention_days || ' days')::INTERVAL + LIMIT batch_size + ); GET DIAGNOSTICS deleted_count = ROW_COUNT; RETURN deleted_count; diff --git a/crates/erp-server/migration/src/m20260427_000086_enable_rls_all_tables.rs b/crates/erp-server/migration/src/m20260427_000086_enable_rls_all_tables.rs index 26433a6..4ac6c4c 100644 --- a/crates/erp-server/migration/src/m20260427_000086_enable_rls_all_tables.rs +++ b/crates/erp-server/migration/src/m20260427_000086_enable_rls_all_tables.rs @@ -3,125 +3,41 @@ use sea_orm_migration::prelude::*; #[derive(DeriveMigrationName)] pub struct Migration; -/// 所有含 tenant_id 的表 — RLS 策略统一启用 -const TENANT_TABLES: &[&str] = &[ - // ── 基础层 (ERP Core) ────────────────────────── - "users", - "user_credentials", - "user_tokens", - "roles", - "permissions", - "role_permissions", - "user_roles", - "organizations", - "departments", - "positions", - "dictionaries", - "dictionary_items", - "menus", - "menu_roles", - "settings", - "numbering_rules", - "process_definitions", - "process_instances", - "tokens", - "tasks", - "process_variables", - "message_templates", - "messages", - "message_subscriptions", - "audit_logs", - "domain_events", - "plugins", - "plugin_entities", - "plugin_entity_columns", - "user_departments", - "plugin_user_views", - "plugin_market_reviews", - "tenant_crypto_keys", - "domain_events_archive", - // ── 健康层 (erp-health) ──────────────────────── - "patient", - "patient_family_member", - "patient_tag", - "patient_tag_relation", - "doctor_profile", - "patient_doctor_relation", - "health_record", - "vital_signs", - "lab_report", - "health_trend", - "appointment", - "doctor_schedule", - "follow_up_task", - "follow_up_record", - "consultation_session", - "consultation_message", - "dialysis_record", - "daily_monitoring", - "diagnosis", - "critical_value_threshold", - "consent", - "device_readings", - "vital_signs_hourly", - "patient_devices", - "alert_rules", - "alerts", - "medication_record", - "dialysis_prescription", - // ── AI + 积分 + 内容 + 微信 ──────────────────── - "ai_prompts", - "ai_analysis_results", - "ai_usage_logs", - "points_account", - "points_rule", - "points_transaction", - "points_product", - "points_order", - "points_checkin", - "offline_event", - "offline_event_registration", - "wechat_users", - "article", - "article_category", - "article_tag", - "article_revision", - "follow_up_template", - "follow_up_template_step", -]; - #[async_trait::async_trait] impl MigrationTrait for Migration { async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { let conn = manager.get_connection(); - // 创建自定义配置参数(仅用于文档,PostgreSQL 自动支持 current_setting) - // 策略设计: - // - current_setting('app.current_tenant_id', true) = '' → 未设置,允许所有(兼容迁移/后台任务) - // - 设置为 UUID → 仅允许匹配 tenant_id 的行 - - for table in TENANT_TABLES { - // 启用 RLS - conn.execute_unprepared(&format!( - "ALTER TABLE {table} ENABLE ROW LEVEL SECURITY" - )).await?; - - // 移除旧策略(如果存在,支持重复执行迁移) - conn.execute_unprepared(&format!( - "DROP POLICY IF EXISTS tenant_isolation ON {table}" - )).await?; - - // 租户隔离策略 — 未设置参数时允许所有,设置后按 tenant_id 过滤 - conn.execute_unprepared(&format!( - "CREATE POLICY tenant_isolation ON {table} USING ( - current_setting('app.current_tenant_id', true) = '' - OR tenant_id = current_setting('app.current_tenant_id', true)::uuid - )" - )).await?; - - // 超级用户绕过策略(数据库 owner 直接绕过 RLS) - // PostgreSQL 默认表 owner 绕过 RLS,无需额外策略 - } + // PL/pgSQL 动态为所有含 tenant_id 列的表启用 RLS + conn.execute_unprepared( + r#" + DO $$ + DECLARE + tbl TEXT; + BEGIN + FOR tbl IN + SELECT c.table_name FROM information_schema.columns c + JOIN information_schema.tables t + ON c.table_name = t.table_name AND c.table_schema = t.table_schema + WHERE c.column_name = 'tenant_id' + AND c.table_schema = 'public' + AND t.table_type = 'BASE TABLE' + ORDER BY c.table_name + LOOP + EXECUTE format('ALTER TABLE %I ENABLE ROW LEVEL SECURITY', tbl); + EXECUTE format('DROP POLICY IF EXISTS tenant_isolation ON %I', tbl); + EXECUTE format( + 'CREATE POLICY tenant_isolation ON %I USING ( + current_setting(''app.current_tenant_id'', true) = '''' + OR tenant_id = current_setting(''app.current_tenant_id'', true)::uuid + )', + tbl + ); + END LOOP; + END; + $$; + "#, + ).await?; Ok(()) } @@ -129,15 +45,28 @@ impl MigrationTrait for Migration { async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { let conn = manager.get_connection(); - for table in TENANT_TABLES { - conn.execute_unprepared(&format!( - "DROP POLICY IF EXISTS tenant_isolation ON {table}" - )).await?; - - conn.execute_unprepared(&format!( - "ALTER TABLE {table} DISABLE ROW LEVEL SECURITY" - )).await?; - } + conn.execute_unprepared( + r#" + DO $$ + DECLARE + tbl TEXT; + BEGIN + FOR tbl IN + SELECT c.table_name FROM information_schema.columns c + JOIN information_schema.tables t + ON c.table_name = t.table_name AND c.table_schema = t.table_schema + WHERE c.column_name = 'tenant_id' + AND c.table_schema = 'public' + AND t.table_type = 'BASE TABLE' + ORDER BY c.table_name + LOOP + EXECUTE format('DROP POLICY IF EXISTS tenant_isolation ON %I', tbl); + EXECUTE format('ALTER TABLE %I DISABLE ROW LEVEL SECURITY', tbl); + END LOOP; + END; + $$; + "#, + ).await?; Ok(()) }