Files
zclaw_openfang/crates/zclaw-saas/src/account/service.rs
iven a2f8112d69 feat(saas): Phase 1 — 基础框架与账号管理模块
- 新增 zclaw-saas crate 作为 workspace 成员
- 配置系统 (TOML + 环境变量覆盖)
- 错误类型体系 (SaasError 16 变体, IntoResponse)
- SQLite 数据库 (12 表 schema, 内存/文件双模式, 3 系统角色种子数据)
- JWT 认证 (签发/验证/刷新)
- Argon2id 密码哈希
- 认证中间件 (公开/受保护路由分层)
- 账号管理 CRUD + API Token 管理 + 操作日志
- 7 单元测试 + 5 集成测试全部通过
2026-03-27 12:58:01 +08:00

223 lines
7.8 KiB
Rust

//! 账号管理业务逻辑
use sqlx::SqlitePool;
use crate::error::{SaasError, SaasResult};
use super::types::*;
pub async fn list_accounts(
db: &SqlitePool,
query: &ListAccountsQuery,
) -> SaasResult<PaginatedResponse<serde_json::Value>> {
let page = query.page.unwrap_or(1).max(1);
let page_size = query.page_size.unwrap_or(20).min(100);
let offset = (page - 1) * page_size;
let mut where_clauses = Vec::new();
let mut params: Vec<String> = Vec::new();
if let Some(role) = &query.role {
where_clauses.push("role = ?".to_string());
params.push(role.clone());
}
if let Some(status) = &query.status {
where_clauses.push("status = ?".to_string());
params.push(status.clone());
}
if let Some(search) = &query.search {
where_clauses.push("(username LIKE ? OR email LIKE ? OR display_name LIKE ?)".to_string());
let pattern = format!("%{}%", search);
params.push(pattern.clone());
params.push(pattern.clone());
params.push(pattern);
}
let where_sql = if where_clauses.is_empty() {
String::new()
} else {
format!("WHERE {}", where_clauses.join(" AND "))
};
let count_sql = format!("SELECT COUNT(*) as count FROM accounts {}", where_sql);
let mut count_query = sqlx::query_scalar::<_, i64>(&count_sql);
for p in &params {
count_query = count_query.bind(p);
}
let total: i64 = count_query.fetch_one(db).await?;
let data_sql = format!(
"SELECT id, username, email, display_name, role, status, totp_enabled, last_login_at, created_at
FROM accounts {} ORDER BY created_at DESC LIMIT ? OFFSET ?",
where_sql
);
let mut data_query = sqlx::query_as::<_, (String, String, String, String, String, String, bool, Option<String>, String)>(&data_sql);
for p in &params {
data_query = data_query.bind(p);
}
let rows = data_query.bind(page_size as i64).bind(offset as i64).fetch_all(db).await?;
let items: Vec<serde_json::Value> = rows
.into_iter()
.map(|(id, username, email, display_name, role, status, totp_enabled, last_login_at, created_at)| {
serde_json::json!({
"id": id, "username": username, "email": email, "display_name": display_name,
"role": role, "status": status, "totp_enabled": totp_enabled,
"last_login_at": last_login_at, "created_at": created_at,
})
})
.collect();
Ok(PaginatedResponse { items, total, page, page_size })
}
pub async fn get_account(db: &SqlitePool, account_id: &str) -> SaasResult<serde_json::Value> {
let row: Option<(String, String, String, String, String, String, bool, Option<String>, String)> =
sqlx::query_as(
"SELECT id, username, email, display_name, role, status, totp_enabled, last_login_at, created_at
FROM accounts WHERE id = ?1"
)
.bind(account_id)
.fetch_optional(db)
.await?;
let (id, username, email, display_name, role, status, totp_enabled, last_login_at, created_at) =
row.ok_or_else(|| SaasError::NotFound(format!("账号 {} 不存在", account_id)))?;
Ok(serde_json::json!({
"id": id, "username": username, "email": email, "display_name": display_name,
"role": role, "status": status, "totp_enabled": totp_enabled,
"last_login_at": last_login_at, "created_at": created_at,
}))
}
pub async fn update_account(
db: &SqlitePool,
account_id: &str,
req: &UpdateAccountRequest,
) -> SaasResult<serde_json::Value> {
let now = chrono::Utc::now().to_rfc3339();
let mut updates = Vec::new();
let mut params: Vec<String> = Vec::new();
if let Some(ref v) = req.display_name { updates.push("display_name = ?"); params.push(v.clone()); }
if let Some(ref v) = req.email { updates.push("email = ?"); params.push(v.clone()); }
if let Some(ref v) = req.role { updates.push("role = ?"); params.push(v.clone()); }
if let Some(ref v) = req.avatar_url { updates.push("avatar_url = ?"); params.push(v.clone()); }
if updates.is_empty() {
return get_account(db, account_id).await;
}
updates.push("updated_at = ?");
params.push(now.clone());
params.push(account_id.to_string());
let sql = format!("UPDATE accounts SET {} WHERE id = ?", updates.join(", "));
let mut query = sqlx::query(&sql);
for p in &params {
query = query.bind(p);
}
query.execute(db).await?;
get_account(db, account_id).await
}
pub async fn update_account_status(
db: &SqlitePool,
account_id: &str,
status: &str,
) -> SaasResult<()> {
let valid = ["active", "disabled", "suspended"];
if !valid.contains(&status) {
return Err(SaasError::InvalidInput(format!("无效状态: {},有效值: {:?}", status, valid)));
}
let now = chrono::Utc::now().to_rfc3339();
let result = sqlx::query("UPDATE accounts SET status = ?1, updated_at = ?2 WHERE id = ?3")
.bind(status).bind(&now).bind(account_id)
.execute(db).await?;
if result.rows_affected() == 0 {
return Err(SaasError::NotFound(format!("账号 {} 不存在", account_id)));
}
Ok(())
}
pub async fn create_api_token(
db: &SqlitePool,
account_id: &str,
req: &CreateTokenRequest,
) -> SaasResult<TokenInfo> {
use sha2::{Sha256, Digest};
let mut bytes = [0u8; 48];
use rand::RngCore;
rand::thread_rng().fill_bytes(&mut bytes);
let raw_token = format!("zclaw_{}", hex::encode(bytes));
let token_hash = hex::encode(Sha256::digest(raw_token.as_bytes()));
let token_prefix = raw_token[..8].to_string();
let now = chrono::Utc::now().to_rfc3339();
let expires_at = req.expires_days.map(|d| {
(chrono::Utc::now() + chrono::Duration::days(d)).to_rfc3339()
});
let permissions = serde_json::to_string(&req.permissions)?;
let token_id = uuid::Uuid::new_v4().to_string();
sqlx::query(
"INSERT INTO api_tokens (id, account_id, name, token_hash, token_prefix, permissions, created_at, expires_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)"
)
.bind(&token_id)
.bind(account_id)
.bind(&req.name)
.bind(&token_hash)
.bind(&token_prefix)
.bind(&permissions)
.bind(&now)
.bind(&expires_at)
.execute(db)
.await?;
Ok(TokenInfo {
id: token_id,
name: req.name.clone(),
token_prefix,
permissions: req.permissions.clone(),
last_used_at: None,
expires_at,
created_at: now,
token: Some(raw_token),
})
}
pub async fn list_api_tokens(
db: &SqlitePool,
account_id: &str,
) -> SaasResult<Vec<TokenInfo>> {
let rows: Vec<(String, String, String, String, Option<String>, Option<String>, String)> =
sqlx::query_as(
"SELECT id, name, token_prefix, permissions, last_used_at, expires_at, created_at
FROM api_tokens WHERE account_id = ?1 AND revoked_at IS NULL ORDER BY created_at DESC"
)
.bind(account_id)
.fetch_all(db)
.await?;
Ok(rows.into_iter().map(|(id, name, token_prefix, perms, last_used, expires, created)| {
let permissions: Vec<String> = serde_json::from_str(&perms).unwrap_or_default();
TokenInfo { id, name, token_prefix, permissions, last_used_at: last_used, expires_at: expires, created_at: created, token: None, }
}).collect())
}
pub async fn revoke_api_token(db: &SqlitePool, token_id: &str, account_id: &str) -> SaasResult<()> {
let now = chrono::Utc::now().to_rfc3339();
let result = sqlx::query(
"UPDATE api_tokens SET revoked_at = ?1 WHERE id = ?2 AND account_id = ?3 AND revoked_at IS NULL"
)
.bind(&now).bind(token_id).bind(account_id)
.execute(db).await?;
if result.rows_affected() == 0 {
return Err(SaasError::NotFound("Token 不存在或已撤销".into()));
}
Ok(())
}