use chrono::Utc; use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, Set}; use uuid::Uuid; use crate::dto::{CreateUserReq, RoleResp, UpdateUserReq, UserResp}; use crate::entity::{role, user, user_credential, user_role}; use crate::error::AuthError; use erp_core::audit::AuditLog; use erp_core::audit_service; use erp_core::error::check_version; use erp_core::events::EventBus; use erp_core::types::Pagination; use crate::error::AuthResult; use super::password; /// User CRUD service — create, read, update, soft-delete users within a tenant. pub struct UserService; impl UserService { /// Create a new user with a password credential. /// /// Validates username uniqueness within the tenant, hashes the password, /// and publishes a `user.created` event. pub async fn create( tenant_id: Uuid, operator_id: Uuid, req: &CreateUserReq, db: &sea_orm::DatabaseConnection, event_bus: &EventBus, ) -> AuthResult { // Check username uniqueness within tenant let existing = user::Entity::find() .filter(user::Column::TenantId.eq(tenant_id)) .filter(user::Column::Username.eq(&req.username)) .filter(user::Column::DeletedAt.is_null()) .one(db) .await .map_err(|e| AuthError::Validation(e.to_string()))?; if existing.is_some() { return Err(AuthError::Validation("用户名已存在".to_string())); } let now = Utc::now(); let user_id = Uuid::now_v7(); // Insert user record let user_model = user::ActiveModel { id: Set(user_id), tenant_id: Set(tenant_id), username: Set(req.username.clone()), email: Set(req.email.clone()), phone: Set(req.phone.clone()), display_name: Set(req.display_name.clone()), avatar_url: Set(None), status: Set("active".to_string()), last_login_at: Set(None), created_at: Set(now), updated_at: Set(now), created_by: Set(operator_id), updated_by: Set(operator_id), deleted_at: Set(None), version: Set(1), }; user_model .insert(db) .await .map_err(|e| AuthError::Validation(e.to_string()))?; // Insert password credential let hash = password::hash_password(&req.password)?; let cred = user_credential::ActiveModel { id: Set(Uuid::now_v7()), tenant_id: Set(tenant_id), user_id: Set(user_id), credential_type: Set("password".to_string()), credential_data: Set(Some(serde_json::json!({ "hash": hash }))), verified: Set(true), created_at: Set(now), updated_at: Set(now), created_by: Set(operator_id), updated_by: Set(operator_id), deleted_at: Set(None), version: Set(1), }; cred.insert(db) .await .map_err(|e| AuthError::Validation(e.to_string()))?; // Publish domain event event_bus .publish( erp_core::events::DomainEvent::new( "user.created", tenant_id, serde_json::json!({ "user_id": user_id, "username": req.username }), ), db, ) .await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "user.create", "user") .with_resource_id(user_id), db, ) .await; Ok(UserResp { id: user_id, username: req.username.clone(), email: req.email.clone(), phone: req.phone.clone(), display_name: req.display_name.clone(), avatar_url: None, status: "active".to_string(), roles: vec![], version: 1, }) } /// Fetch a single user by ID, scoped to the given tenant. pub async fn get_by_id( id: Uuid, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> AuthResult { let user_model = user::Entity::find() .filter(user::Column::Id.eq(id)) .filter(user::Column::TenantId.eq(tenant_id)) .filter(user::Column::DeletedAt.is_null()) .one(db) .await .map_err(|e| AuthError::Validation(e.to_string()))? .ok_or_else(|| AuthError::Validation("用户不存在".to_string()))?; let roles = Self::fetch_user_role_resps(id, tenant_id, db).await?; Ok(model_to_resp(&user_model, roles)) } /// List users within a tenant with pagination and optional search. /// /// Returns `(users, total_count)`. When `search` is provided, filters /// by username using case-insensitive substring match. pub async fn list( tenant_id: Uuid, pagination: &Pagination, search: Option<&str>, db: &sea_orm::DatabaseConnection, ) -> AuthResult<(Vec, u64)> { let mut query = user::Entity::find() .filter(user::Column::TenantId.eq(tenant_id)) .filter(user::Column::DeletedAt.is_null()); if let Some(term) = search && !term.is_empty() { use sea_orm::sea_query::Expr; query = query.filter(Expr::col(user::Column::Username).like(format!("%{}%", term))); } let paginator = query.paginate(db, pagination.limit()); let total = paginator .num_items() .await .map_err(|e| AuthError::Validation(e.to_string()))?; let page_index = pagination.page.unwrap_or(1).saturating_sub(1); let models = paginator .fetch_page(page_index) .await .map_err(|e| AuthError::Validation(e.to_string()))?; let mut resps = Vec::with_capacity(models.len()); for m in models { let roles: Vec = Self::fetch_user_role_resps(m.id, tenant_id, db) .await .unwrap_or_default(); resps.push(model_to_resp(&m, roles)); } Ok((resps, total)) } /// Update editable user fields. /// /// Supports updating email, phone, display_name, and status. /// Status must be one of: "active", "disabled", "locked". pub async fn update( id: Uuid, tenant_id: Uuid, operator_id: Uuid, req: &UpdateUserReq, db: &sea_orm::DatabaseConnection, ) -> AuthResult { let user_model = user::Entity::find() .filter(user::Column::Id.eq(id)) .filter(user::Column::TenantId.eq(tenant_id)) .filter(user::Column::DeletedAt.is_null()) .one(db) .await .map_err(|e| AuthError::Validation(e.to_string()))? .ok_or_else(|| AuthError::Validation("用户不存在".to_string()))?; let old_json = serde_json::to_value(&user_model).unwrap_or(serde_json::Value::Null); let next_ver = check_version(req.version, user_model.version) .map_err(|e| AuthError::Validation(e.to_string()))?; let mut active: user::ActiveModel = user_model.into(); if let Some(email) = &req.email { active.email = Set(Some(email.clone())); } if let Some(phone) = &req.phone { active.phone = Set(Some(phone.clone())); } if let Some(display_name) = &req.display_name { active.display_name = Set(Some(display_name.clone())); } if let Some(status) = &req.status { if !["active", "disabled", "locked"].contains(&status.as_str()) { return Err(AuthError::Validation("无效的状态值".to_string())); } active.status = Set(status.clone()); } active.updated_at = Set(Utc::now()); active.updated_by = Set(operator_id); active.version = Set(next_ver); let updated = active .update(db) .await .map_err(|e| AuthError::Validation(e.to_string()))?; let new_json = serde_json::to_value(&updated).unwrap_or(serde_json::Value::Null); audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "user.update", "user") .with_resource_id(id) .with_changes(Some(old_json), Some(new_json)), db, ) .await; let roles = Self::fetch_user_role_resps(id, tenant_id, db).await?; Ok(model_to_resp(&updated, roles)) } /// Soft-delete a user by setting the `deleted_at` timestamp. pub async fn delete( id: Uuid, tenant_id: Uuid, operator_id: Uuid, db: &sea_orm::DatabaseConnection, event_bus: &EventBus, ) -> AuthResult<()> { let user_model = user::Entity::find() .filter(user::Column::Id.eq(id)) .filter(user::Column::TenantId.eq(tenant_id)) .filter(user::Column::DeletedAt.is_null()) .one(db) .await .map_err(|e| AuthError::Validation(e.to_string()))? .ok_or_else(|| AuthError::Validation("用户不存在".to_string()))?; let current_version = user_model.version; let mut active: user::ActiveModel = user_model.into(); active.deleted_at = Set(Some(Utc::now())); active.updated_at = Set(Utc::now()); active.updated_by = Set(operator_id); active.version = Set(current_version + 1); active .update(db) .await .map_err(|e| AuthError::Validation(e.to_string()))?; event_bus .publish( erp_core::events::DomainEvent::new( "user.deleted", tenant_id, serde_json::json!({ "user_id": id }), ), db, ) .await; audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "user.delete", "user").with_resource_id(id), db, ) .await; Ok(()) } /// Replace all role assignments for a user within a tenant. pub async fn assign_roles( user_id: Uuid, tenant_id: Uuid, operator_id: Uuid, role_ids: &[Uuid], db: &sea_orm::DatabaseConnection, ) -> AuthResult> { // 验证用户存在 let _user = user::Entity::find() .filter(user::Column::Id.eq(user_id)) .filter(user::Column::TenantId.eq(tenant_id)) .filter(user::Column::DeletedAt.is_null()) .one(db) .await .map_err(|e| AuthError::Validation(e.to_string()))? .ok_or_else(|| AuthError::Validation("用户不存在".to_string()))?; // 验证所有角色存在且属于当前租户 if !role_ids.is_empty() { let found = role::Entity::find() .filter(role::Column::Id.is_in(role_ids.iter().copied())) .filter(role::Column::TenantId.eq(tenant_id)) .all(db) .await .map_err(|e| AuthError::Validation(e.to_string()))?; if found.len() != role_ids.len() { return Err(AuthError::Validation( "部分角色不存在或不属于当前租户".to_string(), )); } } // 删除旧的角色分配 user_role::Entity::delete_many() .filter(user_role::Column::UserId.eq(user_id)) .filter(user_role::Column::TenantId.eq(tenant_id)) .exec(db) .await .map_err(|e| AuthError::Validation(e.to_string()))?; // 创建新的角色分配 let now = chrono::Utc::now(); for &role_id in role_ids { let assignment = user_role::ActiveModel { user_id: Set(user_id), role_id: Set(role_id), tenant_id: Set(tenant_id), created_at: Set(now), updated_at: Set(now), created_by: Set(operator_id), updated_by: Set(operator_id), deleted_at: Set(None), version: Set(1), }; assignment .insert(db) .await .map_err(|e| AuthError::Validation(e.to_string()))?; } audit_service::record( AuditLog::new(tenant_id, Some(operator_id), "user.assign_roles", "user") .with_resource_id(user_id), db, ) .await; Self::fetch_user_role_resps(user_id, tenant_id, db).await } /// Fetch RoleResp DTOs for a given user within a tenant. async fn fetch_user_role_resps( user_id: Uuid, tenant_id: Uuid, db: &sea_orm::DatabaseConnection, ) -> AuthResult> { let user_roles = user_role::Entity::find() .filter(user_role::Column::UserId.eq(user_id)) .filter(user_role::Column::TenantId.eq(tenant_id)) .all(db) .await .map_err(|e| AuthError::Validation(e.to_string()))?; let role_ids: Vec = user_roles.iter().map(|ur| ur.role_id).collect(); if role_ids.is_empty() { return Ok(vec![]); } let roles = role::Entity::find() .filter(role::Column::Id.is_in(role_ids)) .filter(role::Column::TenantId.eq(tenant_id)) .all(db) .await .map_err(|e| AuthError::Validation(e.to_string()))?; Ok(roles .iter() .map(|r| RoleResp { id: r.id, name: r.name.clone(), code: r.code.clone(), description: r.description.clone(), is_system: r.is_system, version: r.version, }) .collect()) } } /// Convert a SeaORM user Model and its role DTOs into a UserResp. pub(crate) fn model_to_resp(m: &user::Model, roles: Vec) -> UserResp { UserResp { id: m.id, username: m.username.clone(), email: m.email.clone(), phone: m.phone.clone(), display_name: m.display_name.clone(), avatar_url: m.avatar_url.clone(), status: m.status.clone(), roles, version: m.version, } } #[cfg(test)] mod tests { use chrono::Utc; use uuid::Uuid; use crate::dto::RoleResp; use crate::entity::user; use super::*; fn make_user_model( id: Uuid, tenant_id: Uuid, username: &str, status: &str, version: i32, ) -> user::Model { user::Model { id, tenant_id, username: username.to_string(), email: None, phone: None, display_name: None, avatar_url: None, status: status.to_string(), last_login_at: None, created_at: Utc::now(), updated_at: Utc::now(), created_by: Uuid::now_v7(), updated_by: Uuid::now_v7(), deleted_at: None, version, } } #[test] fn model_to_resp_maps_basic_fields() { let id = Uuid::now_v7(); let tid = Uuid::now_v7(); let m = make_user_model(id, tid, "alice", "active", 1); let resp = model_to_resp(&m, vec![]); assert_eq!(resp.id, id); assert_eq!(resp.username, "alice"); assert_eq!(resp.status, "active"); assert_eq!(resp.version, 1); assert!(resp.roles.is_empty()); } #[test] fn model_to_resp_includes_roles() { let id = Uuid::now_v7(); let tid = Uuid::now_v7(); let m = make_user_model(id, tid, "bob", "active", 2); let roles = vec![ RoleResp { id: Uuid::now_v7(), name: "管理员".to_string(), code: "admin".to_string(), description: None, is_system: true, version: 1, }, RoleResp { id: Uuid::now_v7(), name: "用户".to_string(), code: "user".to_string(), description: None, is_system: false, version: 1, }, ]; let resp = model_to_resp(&m, roles); assert_eq!(resp.roles.len(), 2); assert_eq!(resp.roles[0].code, "admin"); assert_eq!(resp.version, 2); } }