fix(db): 修复迁移 084/085 SQL 语法 + RLS 动态表名查询
- 084/085: PostgreSQL DELETE 不支持 LIMIT,改用 ctid IN (SELECT ... LIMIT) - 086: RLS 迁移改为动态查询 information_schema 获取含 tenant_id 的表, 避免硬编码表名不一致问题 - 全量测试 490 个通过(含 27 个集成测试 + RLS 验证)
This commit is contained in:
@@ -60,9 +60,13 @@ impl MigrationTrait for Migration {
|
||||
GET DIAGNOSTICS moved_count = ROW_COUNT;
|
||||
|
||||
DELETE FROM domain_events
|
||||
WHERE status = 'published'
|
||||
AND published_at < NOW() - (retention_days || ' days')::INTERVAL
|
||||
LIMIT batch_size;
|
||||
WHERE ctid IN (
|
||||
SELECT ctid FROM domain_events
|
||||
WHERE status = 'published'
|
||||
AND published_at < NOW() - (retention_days || ' days')::INTERVAL
|
||||
ORDER BY created_at ASC
|
||||
LIMIT batch_size
|
||||
);
|
||||
|
||||
RETURN moved_count;
|
||||
END;
|
||||
|
||||
@@ -32,8 +32,11 @@ impl MigrationTrait for Migration {
|
||||
deleted_count INT;
|
||||
BEGIN
|
||||
DELETE FROM processed_events
|
||||
WHERE processed_at < NOW() - (retention_days || ' days')::INTERVAL
|
||||
LIMIT batch_size;
|
||||
WHERE ctid IN (
|
||||
SELECT ctid FROM processed_events
|
||||
WHERE processed_at < NOW() - (retention_days || ' days')::INTERVAL
|
||||
LIMIT batch_size
|
||||
);
|
||||
|
||||
GET DIAGNOSTICS deleted_count = ROW_COUNT;
|
||||
RETURN deleted_count;
|
||||
|
||||
@@ -3,125 +3,41 @@ use sea_orm_migration::prelude::*;
|
||||
#[derive(DeriveMigrationName)]
|
||||
pub struct Migration;
|
||||
|
||||
/// 所有含 tenant_id 的表 — RLS 策略统一启用
|
||||
const TENANT_TABLES: &[&str] = &[
|
||||
// ── 基础层 (ERP Core) ──────────────────────────
|
||||
"users",
|
||||
"user_credentials",
|
||||
"user_tokens",
|
||||
"roles",
|
||||
"permissions",
|
||||
"role_permissions",
|
||||
"user_roles",
|
||||
"organizations",
|
||||
"departments",
|
||||
"positions",
|
||||
"dictionaries",
|
||||
"dictionary_items",
|
||||
"menus",
|
||||
"menu_roles",
|
||||
"settings",
|
||||
"numbering_rules",
|
||||
"process_definitions",
|
||||
"process_instances",
|
||||
"tokens",
|
||||
"tasks",
|
||||
"process_variables",
|
||||
"message_templates",
|
||||
"messages",
|
||||
"message_subscriptions",
|
||||
"audit_logs",
|
||||
"domain_events",
|
||||
"plugins",
|
||||
"plugin_entities",
|
||||
"plugin_entity_columns",
|
||||
"user_departments",
|
||||
"plugin_user_views",
|
||||
"plugin_market_reviews",
|
||||
"tenant_crypto_keys",
|
||||
"domain_events_archive",
|
||||
// ── 健康层 (erp-health) ────────────────────────
|
||||
"patient",
|
||||
"patient_family_member",
|
||||
"patient_tag",
|
||||
"patient_tag_relation",
|
||||
"doctor_profile",
|
||||
"patient_doctor_relation",
|
||||
"health_record",
|
||||
"vital_signs",
|
||||
"lab_report",
|
||||
"health_trend",
|
||||
"appointment",
|
||||
"doctor_schedule",
|
||||
"follow_up_task",
|
||||
"follow_up_record",
|
||||
"consultation_session",
|
||||
"consultation_message",
|
||||
"dialysis_record",
|
||||
"daily_monitoring",
|
||||
"diagnosis",
|
||||
"critical_value_threshold",
|
||||
"consent",
|
||||
"device_readings",
|
||||
"vital_signs_hourly",
|
||||
"patient_devices",
|
||||
"alert_rules",
|
||||
"alerts",
|
||||
"medication_record",
|
||||
"dialysis_prescription",
|
||||
// ── AI + 积分 + 内容 + 微信 ────────────────────
|
||||
"ai_prompts",
|
||||
"ai_analysis_results",
|
||||
"ai_usage_logs",
|
||||
"points_account",
|
||||
"points_rule",
|
||||
"points_transaction",
|
||||
"points_product",
|
||||
"points_order",
|
||||
"points_checkin",
|
||||
"offline_event",
|
||||
"offline_event_registration",
|
||||
"wechat_users",
|
||||
"article",
|
||||
"article_category",
|
||||
"article_tag",
|
||||
"article_revision",
|
||||
"follow_up_template",
|
||||
"follow_up_template_step",
|
||||
];
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MigrationTrait for Migration {
|
||||
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
let conn = manager.get_connection();
|
||||
|
||||
// 创建自定义配置参数(仅用于文档,PostgreSQL 自动支持 current_setting)
|
||||
// 策略设计:
|
||||
// - current_setting('app.current_tenant_id', true) = '' → 未设置,允许所有(兼容迁移/后台任务)
|
||||
// - 设置为 UUID → 仅允许匹配 tenant_id 的行
|
||||
|
||||
for table in TENANT_TABLES {
|
||||
// 启用 RLS
|
||||
conn.execute_unprepared(&format!(
|
||||
"ALTER TABLE {table} ENABLE ROW LEVEL SECURITY"
|
||||
)).await?;
|
||||
|
||||
// 移除旧策略(如果存在,支持重复执行迁移)
|
||||
conn.execute_unprepared(&format!(
|
||||
"DROP POLICY IF EXISTS tenant_isolation ON {table}"
|
||||
)).await?;
|
||||
|
||||
// 租户隔离策略 — 未设置参数时允许所有,设置后按 tenant_id 过滤
|
||||
conn.execute_unprepared(&format!(
|
||||
"CREATE POLICY tenant_isolation ON {table} USING (
|
||||
current_setting('app.current_tenant_id', true) = ''
|
||||
OR tenant_id = current_setting('app.current_tenant_id', true)::uuid
|
||||
)"
|
||||
)).await?;
|
||||
|
||||
// 超级用户绕过策略(数据库 owner 直接绕过 RLS)
|
||||
// PostgreSQL 默认表 owner 绕过 RLS,无需额外策略
|
||||
}
|
||||
// PL/pgSQL 动态为所有含 tenant_id 列的表启用 RLS
|
||||
conn.execute_unprepared(
|
||||
r#"
|
||||
DO $$
|
||||
DECLARE
|
||||
tbl TEXT;
|
||||
BEGIN
|
||||
FOR tbl IN
|
||||
SELECT c.table_name FROM information_schema.columns c
|
||||
JOIN information_schema.tables t
|
||||
ON c.table_name = t.table_name AND c.table_schema = t.table_schema
|
||||
WHERE c.column_name = 'tenant_id'
|
||||
AND c.table_schema = 'public'
|
||||
AND t.table_type = 'BASE TABLE'
|
||||
ORDER BY c.table_name
|
||||
LOOP
|
||||
EXECUTE format('ALTER TABLE %I ENABLE ROW LEVEL SECURITY', tbl);
|
||||
EXECUTE format('DROP POLICY IF EXISTS tenant_isolation ON %I', tbl);
|
||||
EXECUTE format(
|
||||
'CREATE POLICY tenant_isolation ON %I USING (
|
||||
current_setting(''app.current_tenant_id'', true) = ''''
|
||||
OR tenant_id = current_setting(''app.current_tenant_id'', true)::uuid
|
||||
)',
|
||||
tbl
|
||||
);
|
||||
END LOOP;
|
||||
END;
|
||||
$$;
|
||||
"#,
|
||||
).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -129,15 +45,28 @@ impl MigrationTrait for Migration {
|
||||
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||
let conn = manager.get_connection();
|
||||
|
||||
for table in TENANT_TABLES {
|
||||
conn.execute_unprepared(&format!(
|
||||
"DROP POLICY IF EXISTS tenant_isolation ON {table}"
|
||||
)).await?;
|
||||
|
||||
conn.execute_unprepared(&format!(
|
||||
"ALTER TABLE {table} DISABLE ROW LEVEL SECURITY"
|
||||
)).await?;
|
||||
}
|
||||
conn.execute_unprepared(
|
||||
r#"
|
||||
DO $$
|
||||
DECLARE
|
||||
tbl TEXT;
|
||||
BEGIN
|
||||
FOR tbl IN
|
||||
SELECT c.table_name FROM information_schema.columns c
|
||||
JOIN information_schema.tables t
|
||||
ON c.table_name = t.table_name AND c.table_schema = t.table_schema
|
||||
WHERE c.column_name = 'tenant_id'
|
||||
AND c.table_schema = 'public'
|
||||
AND t.table_type = 'BASE TABLE'
|
||||
ORDER BY c.table_name
|
||||
LOOP
|
||||
EXECUTE format('DROP POLICY IF EXISTS tenant_isolation ON %I', tbl);
|
||||
EXECUTE format('ALTER TABLE %I DISABLE ROW LEVEL SECURITY', tbl);
|
||||
END LOOP;
|
||||
END;
|
||||
$$;
|
||||
"#,
|
||||
).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user