use sea_orm_migration::prelude::*; #[derive(DeriveMigrationName)] pub struct Migration; #[async_trait::async_trait] impl MigrationTrait for Migration { async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { manager .create_table( Table::create() .table(Alias::new("processed_events")) .if_not_exists() .col(ColumnDef::new(Alias::new("event_id")).uuid().not_null()) .col( ColumnDef::new(Alias::new("consumer_id")) .string_len(200) .not_null(), ) .col( ColumnDef::new(Alias::new("processed_at")) .timestamp_with_time_zone() .not_null() .default(Expr::current_timestamp()), ) .primary_key( Index::create() .col(Alias::new("event_id")) .col(Alias::new("consumer_id")), ) .to_owned(), ) .await?; // 7 天 TTL 清理函数 manager .get_connection() .execute_unprepared( r#" CREATE OR REPLACE FUNCTION cleanup_old_processed_events( retention_days INT DEFAULT 7, batch_size INT DEFAULT 1000 ) RETURNS INT AS $$ DECLARE deleted_count INT; BEGIN DELETE FROM processed_events WHERE ctid IN ( SELECT ctid FROM processed_events WHERE processed_at < NOW() - (retention_days || ' days')::INTERVAL LIMIT batch_size ); GET DIAGNOSTICS deleted_count = ROW_COUNT; RETURN deleted_count; END; $$ LANGUAGE plpgsql; "#, ) .await?; Ok(()) } async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { manager .get_connection() .execute_unprepared("DROP FUNCTION IF EXISTS cleanup_old_processed_events(INT, INT);") .await?; manager .drop_table( Table::drop() .table(Alias::new("processed_events")) .to_owned(), ) .await?; Ok(()) } }