Compare commits

...

4 Commits

Author SHA1 Message Date
iven
00a08c9f9b feat(saas): Phase 4 — 配置迁移模块
- 配置项 CRUD (列表/详情/创建/更新/删除)
- 配置分析端点 (按类别汇总, SaaS 托管统计)
- 13 个默认配置项种子数据 (server/agent/memory/llm)
- 配置同步协议 (客户端→SaaS, SaaS 优先策略)
- 同步日志记录和查询
- 3 个新集成测试覆盖配置迁移端点
2026-03-27 12:58:02 +08:00
iven
a99a3df9dd feat(saas): Phase 3 — 模型请求中转服务
- OpenAI 兼容 API 代理 (/api/v1/relay/chat/completions)
- 中转任务管理 (创建/查询/状态跟踪)
- 可用模型列表端点 (仅 enabled providers+models)
- 任务生命周期 (queued → processing → completed/failed)
- 用量自动记录 (token 统计 + 错误追踪)
- 3 个新集成测试覆盖中转端点
2026-03-27 12:58:02 +08:00
iven
fec64af565 feat(saas): Phase 2 — 模型配置模块
- Provider CRUD (列表/详情/创建/更新/删除)
- Model CRUD (列表/详情/创建/更新/删除)
- Account API Key 管理 (创建/轮换/撤销/掩码显示)
- Usage 统计 (总量/按模型/按天, 支持时间/供应商/模型过滤)
- 权限控制 (provider:manage, model:manage)
- 3 个新集成测试覆盖 providers/models/keys
2026-03-27 12:58:02 +08:00
iven
a2f8112d69 feat(saas): Phase 1 — 基础框架与账号管理模块
- 新增 zclaw-saas crate 作为 workspace 成员
- 配置系统 (TOML + 环境变量覆盖)
- 错误类型体系 (SaasError 16 变体, IntoResponse)
- SQLite 数据库 (12 表 schema, 内存/文件双模式, 3 系统角色种子数据)
- JWT 认证 (签发/验证/刷新)
- Argon2id 密码哈希
- 认证中间件 (公开/受保护路由分层)
- 账号管理 CRUD + API Token 管理 + 操作日志
- 7 单元测试 + 5 集成测试全部通过
2026-03-27 12:58:01 +08:00
32 changed files with 4074 additions and 4 deletions

339
Cargo.lock generated
View File

@@ -110,6 +110,18 @@ dependencies = [
"derive_arbitrary",
]
[[package]]
name = "argon2"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c3610892ee6e0cbce8ae2700349fcf8f98adb0dbfbee85aec3c9179d29cc072"
dependencies = [
"base64ct",
"blake2",
"cpufeatures",
"password-hash",
]
[[package]]
name = "async-broadcast"
version = "0.7.2"
@@ -315,6 +327,7 @@ checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
dependencies = [
"async-trait",
"axum-core",
"axum-macros",
"bytes",
"futures-util",
"http 1.4.0",
@@ -335,7 +348,7 @@ dependencies = [
"serde_urlencoded",
"sync_wrapper",
"tokio",
"tower",
"tower 0.5.3",
"tower-layer",
"tower-service",
"tracing",
@@ -362,6 +375,47 @@ dependencies = [
"tracing",
]
[[package]]
name = "axum-extra"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c794b30c904f0a1c2fb7740f7df7f7972dfaa14ef6f57cb6178dc63e5dca2f04"
dependencies = [
"axum",
"axum-core",
"bytes",
"fastrand",
"futures-util",
"headers",
"http 1.4.0",
"http-body",
"http-body-util",
"mime",
"multer",
"pin-project-lite",
"serde",
"tower 0.5.3",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-macros"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57d123550fa8d071b7255cb0cc04dc302baa6c8c4a79f55701552684d8399bce"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.117",
]
[[package]]
name = "base32"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "022dfe9eb35f19ebbcb51e0b40a5ab759f46ad60cadf7297e0bd085afb50e076"
[[package]]
name = "base64"
version = "0.21.7"
@@ -410,6 +464,15 @@ dependencies = [
"serde_core",
]
[[package]]
name = "blake2"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe"
dependencies = [
"digest",
]
[[package]]
name = "block-buffer"
version = "0.10.4"
@@ -654,6 +717,12 @@ version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8"
[[package]]
name = "constant_time_eq"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6"
[[package]]
name = "convert_case"
version = "0.4.0"
@@ -1168,6 +1237,15 @@ version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ef6b89e5b37196644d8796de5268852ff179b44e96276cf4290264843743bb7"
[[package]]
name = "encoding_rs"
version = "0.8.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3"
dependencies = [
"cfg-if",
]
[[package]]
name = "endi"
version = "1.1.1"
@@ -1894,6 +1972,30 @@ dependencies = [
"hashbrown 0.14.5",
]
[[package]]
name = "headers"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3314d5adb5d94bcdf56771f2e50dbbc80bb4bdf88967526706205ac9eff24eb"
dependencies = [
"base64 0.22.1",
"bytes",
"headers-core",
"http 1.4.0",
"httpdate",
"mime",
"sha1",
]
[[package]]
name = "headers-core"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4"
dependencies = [
"http 1.4.0",
]
[[package]]
name = "heck"
version = "0.4.1"
@@ -2433,6 +2535,21 @@ dependencies = [
"serde_json",
]
[[package]]
name = "jsonwebtoken"
version = "9.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde"
dependencies = [
"base64 0.22.1",
"js-sys",
"pem",
"ring",
"serde",
"serde_json",
"simple_asn1",
]
[[package]]
name = "keyboard-types"
version = "0.7.0"
@@ -2625,6 +2742,15 @@ dependencies = [
"syn 2.0.117",
]
[[package]]
name = "matchers"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9"
dependencies = [
"regex-automata",
]
[[package]]
name = "matches"
version = "0.1.10"
@@ -2716,6 +2842,23 @@ dependencies = [
"windows-sys 0.60.2",
]
[[package]]
name = "multer"
version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83e87776546dc87511aa5ee218730c92b666d7264ab6ed41f9d215af9cd5224b"
dependencies = [
"bytes",
"encoding_rs",
"futures-util",
"http 1.4.0",
"httparse",
"memchr",
"mime",
"spin",
"version_check",
]
[[package]]
name = "native-tls"
version = "0.2.18"
@@ -2785,6 +2928,25 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "nu-ansi-term"
version = "0.50.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "num-bigint"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9"
dependencies = [
"num-integer",
"num-traits",
]
[[package]]
name = "num-bigint-dig"
version = "0.8.6"
@@ -3120,6 +3282,17 @@ dependencies = [
"windows-link 0.2.1",
]
[[package]]
name = "password-hash"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "346f04948ba92c43e8469c1ee6736c7563d71012b17d40745260fe106aac2166"
dependencies = [
"base64ct",
"rand_core 0.6.4",
"subtle",
]
[[package]]
name = "paste"
version = "1.0.15"
@@ -3132,6 +3305,16 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df94ce210e5bc13cb6651479fa48d14f601d9858cfe0467f43ae157023b938d3"
[[package]]
name = "pem"
version = "3.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be"
dependencies = [
"base64 0.22.1",
"serde_core",
]
[[package]]
name = "pem-rfc7468"
version = "0.7.0"
@@ -3334,6 +3517,26 @@ dependencies = [
"siphasher 1.0.2",
]
[[package]]
name = "pin-project"
version = "1.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1749c7ed4bcaf4c3d0a3efc28538844fb29bcdd7d2b67b2be7e20ba861ff517"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b20ed30f105399776b9c883e68e536ef602a16ae6f596d2c473591d6ad64c6"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.117",
]
[[package]]
name = "pin-project-lite"
version = "0.2.17"
@@ -3860,7 +4063,7 @@ dependencies = [
"tokio",
"tokio-rustls",
"tokio-util",
"tower",
"tower 0.5.3",
"tower-http 0.6.8",
"tower-service",
"url",
@@ -3895,7 +4098,7 @@ dependencies = [
"sync_wrapper",
"tokio",
"tokio-util",
"tower",
"tower 0.5.3",
"tower-http 0.6.8",
"tower-service",
"url",
@@ -4399,6 +4602,15 @@ dependencies = [
"digest",
]
[[package]]
name = "sharded-slab"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
dependencies = [
"lazy_static",
]
[[package]]
name = "shlex"
version = "1.3.0"
@@ -4431,6 +4643,18 @@ version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e320a6c5ad31d271ad523dcf3ad13e2767ad8b1cb8f047f75a8aeaf8da139da2"
[[package]]
name = "simple_asn1"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d585997b0ac10be3c5ee635f1bab02d512760d14b7c468801ac8a01d9ae5f1d"
dependencies = [
"num-bigint",
"num-traits",
"thiserror 2.0.18",
"time",
]
[[package]]
name = "siphasher"
version = "0.3.11"
@@ -5261,6 +5485,15 @@ dependencies = [
"syn 2.0.117",
]
[[package]]
name = "thread_local"
version = "1.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185"
dependencies = [
"cfg-if",
]
[[package]]
name = "time"
version = "0.3.47"
@@ -5505,6 +5738,34 @@ version = "1.1.0+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d282ade6016312faf3e41e57ebbba0c073e4056dab1232ab1cb624199648f8ed"
[[package]]
name = "totp-rs"
version = "5.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2b36a9dd327e9f401320a2cb4572cc76ff43742bcfc3291f871691050f140ba"
dependencies = [
"base32",
"constant_time_eq",
"hmac",
"sha1",
"sha2",
]
[[package]]
name = "tower"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
"pin-project",
"pin-project-lite",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower"
version = "0.5.3"
@@ -5535,6 +5796,7 @@ dependencies = [
"pin-project-lite",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
@@ -5550,7 +5812,7 @@ dependencies = [
"http-body",
"iri-string",
"pin-project-lite",
"tower",
"tower 0.5.3",
"tower-layer",
"tower-service",
]
@@ -5597,6 +5859,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex-automata",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
]
[[package]]
@@ -5814,6 +6106,12 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "valuable"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "vcpkg"
version = "0.2.15"
@@ -7124,6 +7422,39 @@ dependencies = [
"zclaw-types",
]
[[package]]
name = "zclaw-saas"
version = "0.1.0"
dependencies = [
"anyhow",
"argon2",
"axum",
"axum-extra",
"chrono",
"dashmap",
"hex",
"jsonwebtoken",
"libsqlite3-sys",
"rand 0.8.5",
"reqwest 0.12.28",
"secrecy",
"serde",
"serde_json",
"sha2",
"sqlx",
"tempfile",
"thiserror 2.0.18",
"tokio",
"toml 0.8.2",
"totp-rs",
"tower 0.4.13",
"tower-http 0.5.2",
"tracing",
"tracing-subscriber",
"uuid",
"zclaw-types",
]
[[package]]
name = "zclaw-skills"
version = "0.1.0"

View File

@@ -15,6 +15,8 @@ members = [
"crates/zclaw-growth",
# Desktop Application
"desktop/src-tauri",
# SaaS Backend
"crates/zclaw-saas",
]
[workspace.package]
@@ -95,6 +97,16 @@ shlex = "1"
# Testing
tempfile = "3"
# SaaS dependencies
axum = { version = "0.7", features = ["macros"] }
axum-extra = { version = "0.9", features = ["typed-header"] }
tower = { version = "0.4", features = ["util"] }
tower-http = { version = "0.5", features = ["cors", "trace", "limit"] }
jsonwebtoken = "9"
argon2 = "0.5"
totp-rs = "5"
hex = "0.4"
# Internal crates
zclaw-types = { path = "crates/zclaw-types" }
zclaw-memory = { path = "crates/zclaw-memory" }
@@ -106,6 +118,7 @@ zclaw-channels = { path = "crates/zclaw-channels" }
zclaw-protocols = { path = "crates/zclaw-protocols" }
zclaw-pipeline = { path = "crates/zclaw-pipeline" }
zclaw-growth = { path = "crates/zclaw-growth" }
zclaw-saas = { path = "crates/zclaw-saas" }
[profile.release]
lto = true

View File

@@ -0,0 +1,42 @@
[package]
name = "zclaw-saas"
version.workspace = true
edition.workspace = true
description = "ZCLAW SaaS backend - account, API config, relay, migration"
[[bin]]
name = "zclaw-saas"
path = "src/main.rs"
[dependencies]
zclaw-types = { workspace = true }
tokio = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
toml = { workspace = true }
thiserror = { workspace = true }
anyhow = { workspace = true }
uuid = { workspace = true }
chrono = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
sqlx = { workspace = true }
libsqlite3-sys = { workspace = true }
reqwest = { workspace = true }
secrecy = { workspace = true }
sha2 = { workspace = true }
rand = { workspace = true }
dashmap = { workspace = true }
hex = { workspace = true }
axum = { workspace = true }
axum-extra = { workspace = true }
tower = { workspace = true }
tower-http = { workspace = true }
jsonwebtoken = { workspace = true }
argon2 = { workspace = true }
totp-rs = { workspace = true }
[dev-dependencies]
tempfile = { workspace = true }

View File

@@ -0,0 +1,117 @@
//! 账号管理 HTTP 处理器
use axum::{
extract::{Extension, Path, Query, State},
Json,
};
use crate::state::AppState;
use crate::error::SaasResult;
use crate::auth::types::AuthContext;
use crate::auth::handlers::log_operation;
use super::{types::*, service};
/// GET /api/v1/accounts
pub async fn list_accounts(
State(state): State<AppState>,
Query(query): Query<ListAccountsQuery>,
_ctx: Extension<AuthContext>,
) -> SaasResult<Json<PaginatedResponse<serde_json::Value>>> {
service::list_accounts(&state.db, &query).await.map(Json)
}
/// GET /api/v1/accounts/:id
pub async fn get_account(
State(state): State<AppState>,
Path(id): Path<String>,
_ctx: Extension<AuthContext>,
) -> SaasResult<Json<serde_json::Value>> {
service::get_account(&state.db, &id).await.map(Json)
}
/// PUT /api/v1/accounts/:id
pub async fn update_account(
State(state): State<AppState>,
Path(id): Path<String>,
Extension(ctx): Extension<AuthContext>,
Json(req): Json<UpdateAccountRequest>,
) -> SaasResult<Json<serde_json::Value>> {
let result = service::update_account(&state.db, &id, &req).await?;
log_operation(&state.db, &ctx.account_id, "account.update", "account", &id, None, None).await?;
Ok(Json(result))
}
/// PATCH /api/v1/accounts/:id/status
pub async fn update_status(
State(state): State<AppState>,
Path(id): Path<String>,
Extension(ctx): Extension<AuthContext>,
Json(req): Json<UpdateStatusRequest>,
) -> SaasResult<Json<serde_json::Value>> {
service::update_account_status(&state.db, &id, &req.status).await?;
log_operation(&state.db, &ctx.account_id, "account.update_status", "account", &id,
Some(serde_json::json!({"status": &req.status})), None).await?;
Ok(Json(serde_json::json!({"ok": true})))
}
/// GET /api/v1/tokens
pub async fn list_tokens(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
) -> SaasResult<Json<Vec<TokenInfo>>> {
service::list_api_tokens(&state.db, &ctx.account_id).await.map(Json)
}
/// POST /api/v1/tokens
pub async fn create_token(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Json(req): Json<CreateTokenRequest>,
) -> SaasResult<Json<TokenInfo>> {
let token = service::create_api_token(&state.db, &ctx.account_id, &req).await?;
log_operation(&state.db, &ctx.account_id, "token.create", "api_token", &token.id,
Some(serde_json::json!({"name": &req.name})), None).await?;
Ok(Json(token))
}
/// DELETE /api/v1/tokens/:id
pub async fn revoke_token(
State(state): State<AppState>,
Path(id): Path<String>,
Extension(ctx): Extension<AuthContext>,
) -> SaasResult<Json<serde_json::Value>> {
service::revoke_api_token(&state.db, &id, &ctx.account_id).await?;
log_operation(&state.db, &ctx.account_id, "token.revoke", "api_token", &id, None, None).await?;
Ok(Json(serde_json::json!({"ok": true})))
}
/// GET /api/v1/logs/operations
pub async fn list_operation_logs(
State(state): State<AppState>,
Query(params): Query<std::collections::HashMap<String, String>>,
_ctx: Extension<AuthContext>,
) -> SaasResult<Json<Vec<serde_json::Value>>> {
let page: i64 = params.get("page").and_then(|v| v.parse().ok()).unwrap_or(1);
let page_size: i64 = params.get("page_size").and_then(|v| v.parse().ok()).unwrap_or(50);
let offset = (page - 1) * page_size;
let rows: Vec<(i64, Option<String>, String, Option<String>, Option<String>, Option<String>, Option<String>, String)> =
sqlx::query_as(
"SELECT id, account_id, action, target_type, target_id, details, ip_address, created_at
FROM operation_logs ORDER BY created_at DESC LIMIT ?1 OFFSET ?2"
)
.bind(page_size)
.bind(offset)
.fetch_all(&state.db)
.await?;
let items: Vec<serde_json::Value> = rows.into_iter().map(|(id, account_id, action, target_type, target_id, details, ip_address, created_at)| {
serde_json::json!({
"id": id, "account_id": account_id, "action": action,
"target_type": target_type, "target_id": target_id,
"details": details.and_then(|d| serde_json::from_str::<serde_json::Value>(&d).ok()),
"ip_address": ip_address, "created_at": created_at,
})
}).collect();
Ok(Json(items))
}

View File

@@ -0,0 +1,19 @@
//! 账号管理模块
pub mod types;
pub mod service;
pub mod handlers;
use axum::routing::{delete, get, patch, post, put};
pub fn routes() -> axum::Router<crate::state::AppState> {
axum::Router::new()
.route("/api/v1/accounts", get(handlers::list_accounts))
.route("/api/v1/accounts/{id}", get(handlers::get_account))
.route("/api/v1/accounts/{id}", put(handlers::update_account))
.route("/api/v1/accounts/{id}/status", patch(handlers::update_status))
.route("/api/v1/tokens", get(handlers::list_tokens))
.route("/api/v1/tokens", post(handlers::create_token))
.route("/api/v1/tokens/{id}", delete(handlers::revoke_token))
.route("/api/v1/logs/operations", get(handlers::list_operation_logs))
}

View File

@@ -0,0 +1,222 @@
//! 账号管理业务逻辑
use sqlx::SqlitePool;
use crate::error::{SaasError, SaasResult};
use super::types::*;
pub async fn list_accounts(
db: &SqlitePool,
query: &ListAccountsQuery,
) -> SaasResult<PaginatedResponse<serde_json::Value>> {
let page = query.page.unwrap_or(1).max(1);
let page_size = query.page_size.unwrap_or(20).min(100);
let offset = (page - 1) * page_size;
let mut where_clauses = Vec::new();
let mut params: Vec<String> = Vec::new();
if let Some(role) = &query.role {
where_clauses.push("role = ?".to_string());
params.push(role.clone());
}
if let Some(status) = &query.status {
where_clauses.push("status = ?".to_string());
params.push(status.clone());
}
if let Some(search) = &query.search {
where_clauses.push("(username LIKE ? OR email LIKE ? OR display_name LIKE ?)".to_string());
let pattern = format!("%{}%", search);
params.push(pattern.clone());
params.push(pattern.clone());
params.push(pattern);
}
let where_sql = if where_clauses.is_empty() {
String::new()
} else {
format!("WHERE {}", where_clauses.join(" AND "))
};
let count_sql = format!("SELECT COUNT(*) as count FROM accounts {}", where_sql);
let mut count_query = sqlx::query_scalar::<_, i64>(&count_sql);
for p in &params {
count_query = count_query.bind(p);
}
let total: i64 = count_query.fetch_one(db).await?;
let data_sql = format!(
"SELECT id, username, email, display_name, role, status, totp_enabled, last_login_at, created_at
FROM accounts {} ORDER BY created_at DESC LIMIT ? OFFSET ?",
where_sql
);
let mut data_query = sqlx::query_as::<_, (String, String, String, String, String, String, bool, Option<String>, String)>(&data_sql);
for p in &params {
data_query = data_query.bind(p);
}
let rows = data_query.bind(page_size as i64).bind(offset as i64).fetch_all(db).await?;
let items: Vec<serde_json::Value> = rows
.into_iter()
.map(|(id, username, email, display_name, role, status, totp_enabled, last_login_at, created_at)| {
serde_json::json!({
"id": id, "username": username, "email": email, "display_name": display_name,
"role": role, "status": status, "totp_enabled": totp_enabled,
"last_login_at": last_login_at, "created_at": created_at,
})
})
.collect();
Ok(PaginatedResponse { items, total, page, page_size })
}
pub async fn get_account(db: &SqlitePool, account_id: &str) -> SaasResult<serde_json::Value> {
let row: Option<(String, String, String, String, String, String, bool, Option<String>, String)> =
sqlx::query_as(
"SELECT id, username, email, display_name, role, status, totp_enabled, last_login_at, created_at
FROM accounts WHERE id = ?1"
)
.bind(account_id)
.fetch_optional(db)
.await?;
let (id, username, email, display_name, role, status, totp_enabled, last_login_at, created_at) =
row.ok_or_else(|| SaasError::NotFound(format!("账号 {} 不存在", account_id)))?;
Ok(serde_json::json!({
"id": id, "username": username, "email": email, "display_name": display_name,
"role": role, "status": status, "totp_enabled": totp_enabled,
"last_login_at": last_login_at, "created_at": created_at,
}))
}
pub async fn update_account(
db: &SqlitePool,
account_id: &str,
req: &UpdateAccountRequest,
) -> SaasResult<serde_json::Value> {
let now = chrono::Utc::now().to_rfc3339();
let mut updates = Vec::new();
let mut params: Vec<String> = Vec::new();
if let Some(ref v) = req.display_name { updates.push("display_name = ?"); params.push(v.clone()); }
if let Some(ref v) = req.email { updates.push("email = ?"); params.push(v.clone()); }
if let Some(ref v) = req.role { updates.push("role = ?"); params.push(v.clone()); }
if let Some(ref v) = req.avatar_url { updates.push("avatar_url = ?"); params.push(v.clone()); }
if updates.is_empty() {
return get_account(db, account_id).await;
}
updates.push("updated_at = ?");
params.push(now.clone());
params.push(account_id.to_string());
let sql = format!("UPDATE accounts SET {} WHERE id = ?", updates.join(", "));
let mut query = sqlx::query(&sql);
for p in &params {
query = query.bind(p);
}
query.execute(db).await?;
get_account(db, account_id).await
}
pub async fn update_account_status(
db: &SqlitePool,
account_id: &str,
status: &str,
) -> SaasResult<()> {
let valid = ["active", "disabled", "suspended"];
if !valid.contains(&status) {
return Err(SaasError::InvalidInput(format!("无效状态: {},有效值: {:?}", status, valid)));
}
let now = chrono::Utc::now().to_rfc3339();
let result = sqlx::query("UPDATE accounts SET status = ?1, updated_at = ?2 WHERE id = ?3")
.bind(status).bind(&now).bind(account_id)
.execute(db).await?;
if result.rows_affected() == 0 {
return Err(SaasError::NotFound(format!("账号 {} 不存在", account_id)));
}
Ok(())
}
pub async fn create_api_token(
db: &SqlitePool,
account_id: &str,
req: &CreateTokenRequest,
) -> SaasResult<TokenInfo> {
use sha2::{Sha256, Digest};
let mut bytes = [0u8; 48];
use rand::RngCore;
rand::thread_rng().fill_bytes(&mut bytes);
let raw_token = format!("zclaw_{}", hex::encode(bytes));
let token_hash = hex::encode(Sha256::digest(raw_token.as_bytes()));
let token_prefix = raw_token[..8].to_string();
let now = chrono::Utc::now().to_rfc3339();
let expires_at = req.expires_days.map(|d| {
(chrono::Utc::now() + chrono::Duration::days(d)).to_rfc3339()
});
let permissions = serde_json::to_string(&req.permissions)?;
let token_id = uuid::Uuid::new_v4().to_string();
sqlx::query(
"INSERT INTO api_tokens (id, account_id, name, token_hash, token_prefix, permissions, created_at, expires_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)"
)
.bind(&token_id)
.bind(account_id)
.bind(&req.name)
.bind(&token_hash)
.bind(&token_prefix)
.bind(&permissions)
.bind(&now)
.bind(&expires_at)
.execute(db)
.await?;
Ok(TokenInfo {
id: token_id,
name: req.name.clone(),
token_prefix,
permissions: req.permissions.clone(),
last_used_at: None,
expires_at,
created_at: now,
token: Some(raw_token),
})
}
pub async fn list_api_tokens(
db: &SqlitePool,
account_id: &str,
) -> SaasResult<Vec<TokenInfo>> {
let rows: Vec<(String, String, String, String, Option<String>, Option<String>, String)> =
sqlx::query_as(
"SELECT id, name, token_prefix, permissions, last_used_at, expires_at, created_at
FROM api_tokens WHERE account_id = ?1 AND revoked_at IS NULL ORDER BY created_at DESC"
)
.bind(account_id)
.fetch_all(db)
.await?;
Ok(rows.into_iter().map(|(id, name, token_prefix, perms, last_used, expires, created)| {
let permissions: Vec<String> = serde_json::from_str(&perms).unwrap_or_default();
TokenInfo { id, name, token_prefix, permissions, last_used_at: last_used, expires_at: expires, created_at: created, token: None, }
}).collect())
}
pub async fn revoke_api_token(db: &SqlitePool, token_id: &str, account_id: &str) -> SaasResult<()> {
let now = chrono::Utc::now().to_rfc3339();
let result = sqlx::query(
"UPDATE api_tokens SET revoked_at = ?1 WHERE id = ?2 AND account_id = ?3 AND revoked_at IS NULL"
)
.bind(&now).bind(token_id).bind(account_id)
.execute(db).await?;
if result.rows_affected() == 0 {
return Err(SaasError::NotFound("Token 不存在或已撤销".into()));
}
Ok(())
}

View File

@@ -0,0 +1,53 @@
//! 账号管理类型
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize)]
pub struct UpdateAccountRequest {
pub display_name: Option<String>,
pub email: Option<String>,
pub role: Option<String>,
pub avatar_url: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct UpdateStatusRequest {
pub status: String,
}
#[derive(Debug, Deserialize)]
pub struct ListAccountsQuery {
pub page: Option<u32>,
pub page_size: Option<u32>,
pub role: Option<String>,
pub status: Option<String>,
pub search: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct PaginatedResponse<T: Serialize> {
pub items: Vec<T>,
pub total: i64,
pub page: u32,
pub page_size: u32,
}
#[derive(Debug, Deserialize)]
pub struct CreateTokenRequest {
pub name: String,
pub permissions: Vec<String>,
pub expires_days: Option<i64>,
}
#[derive(Debug, Serialize)]
pub struct TokenInfo {
pub id: String,
pub name: String,
pub token_prefix: String,
pub permissions: Vec<String>,
pub last_used_at: Option<String>,
pub expires_at: Option<String>,
pub created_at: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub token: Option<String>,
}

View File

@@ -0,0 +1,180 @@
//! 认证 HTTP 处理器
use axum::{extract::State, http::StatusCode, Json};
use secrecy::ExposeSecret;
use crate::state::AppState;
use crate::error::{SaasError, SaasResult};
use super::{
jwt::create_token,
password::{hash_password, verify_password},
types::{AuthContext, LoginRequest, LoginResponse, RegisterRequest, AccountPublic},
};
/// POST /api/v1/auth/register
pub async fn register(
State(state): State<AppState>,
Json(req): Json<RegisterRequest>,
) -> SaasResult<(StatusCode, Json<AccountPublic>)> {
if req.username.len() < 3 {
return Err(SaasError::InvalidInput("用户名至少 3 个字符".into()));
}
if req.password.len() < 8 {
return Err(SaasError::InvalidInput("密码至少 8 个字符".into()));
}
let existing: Vec<(String,)> = sqlx::query_as(
"SELECT id FROM accounts WHERE username = ?1 OR email = ?2"
)
.bind(&req.username)
.bind(&req.email)
.fetch_all(&state.db)
.await?;
if !existing.is_empty() {
return Err(SaasError::AlreadyExists("用户名或邮箱已存在".into()));
}
let password_hash = hash_password(&req.password)?;
let account_id = uuid::Uuid::new_v4().to_string();
let role = req.role.unwrap_or_else(|| "user".into());
let display_name = req.display_name.unwrap_or_default();
let now = chrono::Utc::now().to_rfc3339();
sqlx::query(
"INSERT INTO accounts (id, username, email, password_hash, display_name, role, status, created_at, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, 'active', ?7, ?7)"
)
.bind(&account_id)
.bind(&req.username)
.bind(&req.email)
.bind(&password_hash)
.bind(&display_name)
.bind(&role)
.bind(&now)
.execute(&state.db)
.await?;
log_operation(&state.db, &account_id, "account.create", "account", &account_id, None, None).await?;
Ok((StatusCode::CREATED, Json(AccountPublic {
id: account_id,
username: req.username,
email: req.email,
display_name,
role,
status: "active".into(),
totp_enabled: false,
created_at: now,
})))
}
/// POST /api/v1/auth/login
pub async fn login(
State(state): State<AppState>,
Json(req): Json<LoginRequest>,
) -> SaasResult<Json<LoginResponse>> {
let row: Option<(String, String, String, String, String, String, bool, String)> =
sqlx::query_as(
"SELECT id, username, email, display_name, role, status, totp_enabled, created_at
FROM accounts WHERE username = ?1 OR email = ?1"
)
.bind(&req.username)
.fetch_optional(&state.db)
.await?;
let (id, username, email, display_name, role, status, totp_enabled, created_at) =
row.ok_or_else(|| SaasError::AuthError("用户名或密码错误".into()))?;
if status != "active" {
return Err(SaasError::Forbidden(format!("账号已{},请联系管理员", status)));
}
let (password_hash,): (String,) = sqlx::query_as(
"SELECT password_hash FROM accounts WHERE id = ?1"
)
.bind(&id)
.fetch_one(&state.db)
.await?;
if !verify_password(&req.password, &password_hash)? {
return Err(SaasError::AuthError("用户名或密码错误".into()));
}
let permissions = get_role_permissions(&state.db, &role).await?;
let config = state.config.read().await;
let token = create_token(
&id, &role, permissions.clone(),
state.jwt_secret.expose_secret(),
config.auth.jwt_expiration_hours,
)?;
let now = chrono::Utc::now().to_rfc3339();
sqlx::query("UPDATE accounts SET last_login_at = ?1 WHERE id = ?2")
.bind(&now).bind(&id)
.execute(&state.db).await?;
log_operation(&state.db, &id, "account.login", "account", &id, None, None).await?;
Ok(Json(LoginResponse {
token,
account: AccountPublic {
id, username, email, display_name, role, status, totp_enabled, created_at,
},
}))
}
/// POST /api/v1/auth/refresh
pub async fn refresh(
State(state): State<AppState>,
axum::extract::Extension(ctx): axum::extract::Extension<AuthContext>,
) -> SaasResult<Json<serde_json::Value>> {
let config = state.config.read().await;
let token = create_token(
&ctx.account_id, &ctx.role, ctx.permissions.clone(),
state.jwt_secret.expose_secret(),
config.auth.jwt_expiration_hours,
)?;
Ok(Json(serde_json::json!({ "token": token })))
}
async fn get_role_permissions(db: &sqlx::SqlitePool, role: &str) -> SaasResult<Vec<String>> {
let row: Option<(String,)> = sqlx::query_as(
"SELECT permissions FROM roles WHERE id = ?1"
)
.bind(role)
.fetch_optional(db)
.await?;
let permissions_str = row
.ok_or_else(|| SaasError::Internal(format!("角色 {} 不存在", role)))?
.0;
let permissions: Vec<String> = serde_json::from_str(&permissions_str)?;
Ok(permissions)
}
/// 记录操作日志
pub async fn log_operation(
db: &sqlx::SqlitePool,
account_id: &str,
action: &str,
target_type: &str,
target_id: &str,
details: Option<serde_json::Value>,
ip_address: Option<&str>,
) -> SaasResult<()> {
let now = chrono::Utc::now().to_rfc3339();
sqlx::query(
"INSERT INTO operation_logs (account_id, action, target_type, target_id, details, ip_address, created_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)"
)
.bind(account_id)
.bind(action)
.bind(target_type)
.bind(target_id)
.bind(details.map(|d| d.to_string()))
.bind(ip_address)
.bind(&now)
.execute(db)
.await?;
Ok(())
}

View File

@@ -0,0 +1,91 @@
//! JWT Token 创建与验证
use chrono::{Duration, Utc};
use jsonwebtoken::{decode, encode, DecodingKey, EncodingKey, Header, Validation};
use serde::{Deserialize, Serialize};
use crate::error::SaasResult;
/// JWT Claims
#[derive(Debug, Serialize, Deserialize)]
pub struct Claims {
pub sub: String,
pub role: String,
pub permissions: Vec<String>,
pub iat: i64,
pub exp: i64,
}
impl Claims {
pub fn new(account_id: &str, role: &str, permissions: Vec<String>, expiration_hours: i64) -> Self {
let now = Utc::now();
Self {
sub: account_id.to_string(),
role: role.to_string(),
permissions,
iat: now.timestamp(),
exp: (now + Duration::hours(expiration_hours)).timestamp(),
}
}
}
/// 创建 JWT Token
pub fn create_token(
account_id: &str,
role: &str,
permissions: Vec<String>,
secret: &str,
expiration_hours: i64,
) -> SaasResult<String> {
let claims = Claims::new(account_id, role, permissions, expiration_hours);
let token = encode(
&Header::default(),
&claims,
&EncodingKey::from_secret(secret.as_bytes()),
)?;
Ok(token)
}
/// 验证 JWT Token
pub fn verify_token(token: &str, secret: &str) -> SaasResult<Claims> {
let token_data = decode::<Claims>(
token,
&DecodingKey::from_secret(secret.as_bytes()),
&Validation::default(),
)?;
Ok(token_data.claims)
}
#[cfg(test)]
mod tests {
use super::*;
const TEST_SECRET: &str = "test-secret-key";
#[test]
fn test_create_and_verify_token() {
let token = create_token(
"account-123", "admin",
vec!["model:read".to_string()],
TEST_SECRET, 24,
).unwrap();
let claims = verify_token(&token, TEST_SECRET).unwrap();
assert_eq!(claims.sub, "account-123");
assert_eq!(claims.role, "admin");
assert_eq!(claims.permissions, vec!["model:read"]);
}
#[test]
fn test_invalid_token() {
let result = verify_token("invalid.token.here", TEST_SECRET);
assert!(result.is_err());
}
#[test]
fn test_wrong_secret() {
let token = create_token("account-123", "admin", vec![], TEST_SECRET, 24).unwrap();
let result = verify_token(&token, "wrong-secret");
assert!(result.is_err());
}
}

View File

@@ -0,0 +1,69 @@
//! 认证模块
pub mod jwt;
pub mod password;
pub mod types;
pub mod handlers;
use axum::{
extract::{Request, State},
http::header,
middleware::Next,
response::{IntoResponse, Response},
};
use secrecy::ExposeSecret;
use crate::error::SaasError;
use crate::state::AppState;
use types::AuthContext;
/// 认证中间件: 从 JWT 或 API Token 提取身份
pub async fn auth_middleware(
State(state): State<AppState>,
mut req: Request,
next: Next,
) -> Response {
let auth_header = req.headers()
.get(header::AUTHORIZATION)
.and_then(|v| v.to_str().ok());
let result = if let Some(auth) = auth_header {
if let Some(token) = auth.strip_prefix("Bearer ") {
jwt::verify_token(token, state.jwt_secret.expose_secret())
.map(|claims| AuthContext {
account_id: claims.sub,
role: claims.role,
permissions: claims.permissions,
})
.map_err(|_| SaasError::Unauthorized)
} else {
Err(SaasError::Unauthorized)
}
} else {
Err(SaasError::Unauthorized)
};
match result {
Ok(ctx) => {
req.extensions_mut().insert(ctx);
next.run(req).await
}
Err(e) => e.into_response(),
}
}
/// 路由 (无需认证的端点)
pub fn routes() -> axum::Router<AppState> {
use axum::routing::post;
axum::Router::new()
.route("/api/v1/auth/register", post(handlers::register))
.route("/api/v1/auth/login", post(handlers::login))
}
/// 需要认证的路由
pub fn protected_routes() -> axum::Router<AppState> {
use axum::routing::post;
axum::Router::new()
.route("/api/v1/auth/refresh", post(handlers::refresh))
}

View File

@@ -0,0 +1,48 @@
//! 密码哈希 (Argon2id)
use argon2::{
password_hash::{rand_core::OsRng, PasswordHash, PasswordHasher, PasswordVerifier, SaltString},
Argon2,
};
use crate::error::{SaasError, SaasResult};
/// 哈希密码
pub fn hash_password(password: &str) -> SaasResult<String> {
let salt = SaltString::generate(&mut OsRng);
let argon2 = Argon2::default();
let hash = argon2
.hash_password(password.as_bytes(), &salt)
.map_err(|e| SaasError::PasswordHash(e.to_string()))?;
Ok(hash.to_string())
}
/// 验证密码
pub fn verify_password(password: &str, hash: &str) -> SaasResult<bool> {
let parsed_hash = PasswordHash::new(hash)
.map_err(|e| SaasError::PasswordHash(e.to_string()))?;
Ok(Argon2::default()
.verify_password(password.as_bytes(), &parsed_hash)
.is_ok())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_hash_and_verify() {
let hash = hash_password("correct_password").unwrap();
assert!(verify_password("correct_password", &hash).unwrap());
assert!(!verify_password("wrong_password", &hash).unwrap());
}
#[test]
fn test_different_hashes_for_same_password() {
let hash1 = hash_password("same_password").unwrap();
let hash2 = hash_password("same_password").unwrap();
assert_ne!(hash1, hash2);
assert!(verify_password("same_password", &hash1).unwrap());
assert!(verify_password("same_password", &hash2).unwrap());
}
}

View File

@@ -0,0 +1,49 @@
//! 认证相关类型
use serde::{Deserialize, Serialize};
/// 登录请求
#[derive(Debug, Deserialize)]
pub struct LoginRequest {
pub username: String,
pub password: String,
pub totp_code: Option<String>,
}
/// 登录响应
#[derive(Debug, Serialize)]
pub struct LoginResponse {
pub token: String,
pub account: AccountPublic,
}
/// 注册请求
#[derive(Debug, Deserialize)]
pub struct RegisterRequest {
pub username: String,
pub email: String,
pub password: String,
pub display_name: Option<String>,
pub role: Option<String>,
}
/// 公开账号信息 (无敏感数据)
#[derive(Debug, Clone, Serialize)]
pub struct AccountPublic {
pub id: String,
pub username: String,
pub email: String,
pub display_name: String,
pub role: String,
pub status: String,
pub totp_enabled: bool,
pub created_at: String,
}
/// 认证上下文 (注入到 request extensions)
#[derive(Debug, Clone)]
pub struct AuthContext {
pub account_id: String,
pub role: String,
pub permissions: Vec<String>,
}

View File

@@ -0,0 +1,144 @@
//! SaaS 服务器配置
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use secrecy::SecretString;
/// SaaS 服务器完整配置
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SaaSConfig {
pub server: ServerConfig,
pub database: DatabaseConfig,
pub auth: AuthConfig,
pub relay: RelayConfig,
}
/// 服务器配置
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerConfig {
#[serde(default = "default_host")]
pub host: String,
#[serde(default = "default_port")]
pub port: u16,
#[serde(default)]
pub cors_origins: Vec<String>,
}
/// 数据库配置
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DatabaseConfig {
#[serde(default = "default_db_url")]
pub url: String,
}
/// 认证配置
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuthConfig {
#[serde(default = "default_jwt_hours")]
pub jwt_expiration_hours: i64,
#[serde(default = "default_totp_issuer")]
pub totp_issuer: String,
}
/// 中转服务配置
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RelayConfig {
#[serde(default = "default_max_queue")]
pub max_queue_size: usize,
#[serde(default = "default_max_concurrent")]
pub max_concurrent_per_provider: usize,
#[serde(default = "default_batch_window")]
pub batch_window_ms: u64,
#[serde(default = "default_retry_delay")]
pub retry_delay_ms: u64,
#[serde(default = "default_max_attempts")]
pub max_attempts: u32,
}
fn default_host() -> String { "0.0.0.0".into() }
fn default_port() -> u16 { 8080 }
fn default_db_url() -> String { "sqlite:./saas-data.db".into() }
fn default_jwt_hours() -> i64 { 24 }
fn default_totp_issuer() -> String { "ZCLAW SaaS".into() }
fn default_max_queue() -> usize { 1000 }
fn default_max_concurrent() -> usize { 5 }
fn default_batch_window() -> u64 { 50 }
fn default_retry_delay() -> u64 { 1000 }
fn default_max_attempts() -> u32 { 3 }
impl Default for SaaSConfig {
fn default() -> Self {
Self {
server: ServerConfig::default(),
database: DatabaseConfig::default(),
auth: AuthConfig::default(),
relay: RelayConfig::default(),
}
}
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
host: default_host(),
port: default_port(),
cors_origins: Vec::new(),
}
}
}
impl Default for DatabaseConfig {
fn default() -> Self {
Self { url: default_db_url() }
}
}
impl Default for AuthConfig {
fn default() -> Self {
Self {
jwt_expiration_hours: default_jwt_hours(),
totp_issuer: default_totp_issuer(),
}
}
}
impl Default for RelayConfig {
fn default() -> Self {
Self {
max_queue_size: default_max_queue(),
max_concurrent_per_provider: default_max_concurrent(),
batch_window_ms: default_batch_window(),
retry_delay_ms: default_retry_delay(),
max_attempts: default_max_attempts(),
}
}
}
impl SaaSConfig {
/// 加载配置文件,优先级: 环境变量 > ZCLAW_SAAS_CONFIG > ./saas-config.toml
pub fn load() -> anyhow::Result<Self> {
let config_path = std::env::var("ZCLAW_SAAS_CONFIG")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from("saas-config.toml"));
let config = if config_path.exists() {
let content = std::fs::read_to_string(&config_path)?;
toml::from_str(&content)?
} else {
tracing::warn!("Config file {:?} not found, using defaults", config_path);
SaaSConfig::default()
};
Ok(config)
}
/// 获取 JWT 密钥 (从环境变量或生成默认值)
pub fn jwt_secret(&self) -> SecretString {
std::env::var("ZCLAW_SAAS_JWT_SECRET")
.map(SecretString::from)
.unwrap_or_else(|_| {
tracing::warn!("ZCLAW_SAAS_JWT_SECRET not set, using default (insecure!)");
SecretString::from("zclaw-saas-default-secret-change-in-production".to_string())
})
}
}

281
crates/zclaw-saas/src/db.rs Normal file
View File

@@ -0,0 +1,281 @@
//! 数据库初始化与 Schema
use sqlx::SqlitePool;
use crate::error::SaasResult;
const SCHEMA_VERSION: i32 = 1;
const SCHEMA_SQL: &str = r#"
CREATE TABLE IF NOT EXISTS saas_schema_version (
version INTEGER PRIMARY KEY
);
CREATE TABLE IF NOT EXISTS accounts (
id TEXT PRIMARY KEY,
username TEXT NOT NULL UNIQUE,
email TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
display_name TEXT NOT NULL DEFAULT '',
avatar_url TEXT,
role TEXT NOT NULL DEFAULT 'user',
status TEXT NOT NULL DEFAULT 'active',
totp_secret TEXT,
totp_enabled INTEGER NOT NULL DEFAULT 0,
last_login_at TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_accounts_email ON accounts(email);
CREATE INDEX IF NOT EXISTS idx_accounts_role ON accounts(role);
CREATE TABLE IF NOT EXISTS api_tokens (
id TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
name TEXT NOT NULL,
token_hash TEXT NOT NULL,
token_prefix TEXT NOT NULL,
permissions TEXT NOT NULL DEFAULT '[]',
last_used_at TEXT,
expires_at TEXT,
created_at TEXT NOT NULL,
revoked_at TEXT,
FOREIGN KEY (account_id) REFERENCES accounts(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_api_tokens_account ON api_tokens(account_id);
CREATE INDEX IF NOT EXISTS idx_api_tokens_hash ON api_tokens(token_hash);
CREATE TABLE IF NOT EXISTS roles (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
permissions TEXT NOT NULL DEFAULT '[]',
is_system INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS permission_templates (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
permissions TEXT NOT NULL DEFAULT '[]',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS operation_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
account_id TEXT,
action TEXT NOT NULL,
target_type TEXT,
target_id TEXT,
details TEXT,
ip_address TEXT,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_op_logs_account ON operation_logs(account_id);
CREATE INDEX IF NOT EXISTS idx_op_logs_action ON operation_logs(action);
CREATE INDEX IF NOT EXISTS idx_op_logs_time ON operation_logs(created_at);
CREATE TABLE IF NOT EXISTS providers (
id TEXT PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
display_name TEXT NOT NULL,
api_key TEXT,
base_url TEXT NOT NULL,
api_protocol TEXT NOT NULL DEFAULT 'openai',
enabled INTEGER NOT NULL DEFAULT 1,
rate_limit_rpm INTEGER,
rate_limit_tpm INTEGER,
config_json TEXT DEFAULT '{}',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS models (
id TEXT PRIMARY KEY,
provider_id TEXT NOT NULL,
model_id TEXT NOT NULL,
alias TEXT NOT NULL,
context_window INTEGER NOT NULL DEFAULT 8192,
max_output_tokens INTEGER NOT NULL DEFAULT 4096,
supports_streaming INTEGER NOT NULL DEFAULT 1,
supports_vision INTEGER NOT NULL DEFAULT 0,
enabled INTEGER NOT NULL DEFAULT 1,
pricing_input REAL DEFAULT 0,
pricing_output REAL DEFAULT 0,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(provider_id, model_id),
FOREIGN KEY (provider_id) REFERENCES providers(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_models_provider ON models(provider_id);
CREATE TABLE IF NOT EXISTS account_api_keys (
id TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
provider_id TEXT NOT NULL,
key_value TEXT NOT NULL,
key_label TEXT,
permissions TEXT NOT NULL DEFAULT '[]',
enabled INTEGER NOT NULL DEFAULT 1,
last_used_at TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
revoked_at TEXT,
FOREIGN KEY (account_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY (provider_id) REFERENCES providers(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_account_api_keys_account ON account_api_keys(account_id);
CREATE TABLE IF NOT EXISTS usage_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
account_id TEXT NOT NULL,
provider_id TEXT NOT NULL,
model_id TEXT NOT NULL,
input_tokens INTEGER NOT NULL DEFAULT 0,
output_tokens INTEGER NOT NULL DEFAULT 0,
latency_ms INTEGER,
status TEXT NOT NULL DEFAULT 'success',
error_message TEXT,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_usage_account ON usage_records(account_id);
CREATE INDEX IF NOT EXISTS idx_usage_time ON usage_records(created_at);
CREATE TABLE IF NOT EXISTS relay_tasks (
id TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
provider_id TEXT NOT NULL,
model_id TEXT NOT NULL,
request_hash TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'queued',
priority INTEGER NOT NULL DEFAULT 0,
attempt_count INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 3,
request_body TEXT NOT NULL,
response_body TEXT,
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
error_message TEXT,
queued_at TEXT NOT NULL,
started_at TEXT,
completed_at TEXT,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_relay_status ON relay_tasks(status);
CREATE INDEX IF NOT EXISTS idx_relay_account ON relay_tasks(account_id);
CREATE INDEX IF NOT EXISTS idx_relay_provider ON relay_tasks(provider_id);
CREATE TABLE IF NOT EXISTS config_items (
id TEXT PRIMARY KEY,
category TEXT NOT NULL,
key_path TEXT NOT NULL,
value_type TEXT NOT NULL,
current_value TEXT,
default_value TEXT,
source TEXT NOT NULL DEFAULT 'local',
description TEXT,
requires_restart INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(category, key_path)
);
CREATE INDEX IF NOT EXISTS idx_config_category ON config_items(category);
CREATE TABLE IF NOT EXISTS config_sync_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
account_id TEXT NOT NULL,
client_fingerprint TEXT NOT NULL,
action TEXT NOT NULL,
config_keys TEXT NOT NULL,
client_values TEXT,
saas_values TEXT,
resolution TEXT,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_sync_account ON config_sync_log(account_id);
"#;
const SEED_ROLES: &str = r#"
INSERT OR IGNORE INTO roles (id, name, description, permissions, is_system, created_at, updated_at)
VALUES
('super_admin', '超级管理员', '拥有所有权限', '["admin:full"]', 1, datetime('now'), datetime('now')),
('admin', '管理员', '管理账号和配置', '["account:read","account:write","model:read","model:write","relay:use","relay:admin","config:read","config:write"]', 1, datetime('now'), datetime('now')),
('user', '普通用户', '基础使用权限', '["model:read","relay:use","config:read"]', 1, datetime('now'), datetime('now'));
"#;
/// 初始化数据库
pub async fn init_db(database_url: &str) -> SaasResult<SqlitePool> {
if database_url.starts_with("sqlite:") {
let path_part = database_url.strip_prefix("sqlite:").unwrap_or("");
if path_part != ":memory:" {
if let Some(parent) = std::path::Path::new(path_part).parent() {
if !parent.as_os_str().is_empty() && !parent.exists() {
std::fs::create_dir_all(parent)?;
}
}
}
}
let pool = SqlitePool::connect(database_url).await?;
sqlx::query("PRAGMA journal_mode=WAL;")
.execute(&pool)
.await?;
sqlx::query(SCHEMA_SQL).execute(&pool).await?;
sqlx::query("INSERT OR IGNORE INTO saas_schema_version (version) VALUES (?1)")
.bind(SCHEMA_VERSION)
.execute(&pool)
.await?;
sqlx::query(SEED_ROLES).execute(&pool).await?;
tracing::info!("Database initialized (schema v{})", SCHEMA_VERSION);
Ok(pool)
}
/// 创建内存数据库 (测试用)
pub async fn init_memory_db() -> SaasResult<SqlitePool> {
let pool = SqlitePool::connect("sqlite::memory:").await?;
sqlx::query(SCHEMA_SQL).execute(&pool).await?;
sqlx::query("INSERT OR IGNORE INTO saas_schema_version (version) VALUES (?1)")
.bind(SCHEMA_VERSION)
.execute(&pool)
.await?;
sqlx::query(SEED_ROLES).execute(&pool).await?;
Ok(pool)
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_init_memory_db() {
let pool = init_memory_db().await.unwrap();
let roles: Vec<(String,)> = sqlx::query_as(
"SELECT id FROM roles WHERE is_system = 1"
)
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(roles.len(), 3);
}
#[tokio::test]
async fn test_schema_tables_exist() {
let pool = init_memory_db().await.unwrap();
let tables = [
"accounts", "api_tokens", "roles", "permission_templates",
"operation_logs", "providers", "models", "account_api_keys",
"usage_records", "relay_tasks", "config_items", "config_sync_log",
];
for table in tables {
let count: (i64,) = sqlx::query_as(&format!(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='{}'", table
))
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count.0, 1, "Table {} should exist", table);
}
}
}

View File

@@ -0,0 +1,119 @@
//! SaaS 错误类型
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use serde_json::json;
/// SaaS 服务错误类型
#[derive(Debug, thiserror::Error)]
pub enum SaasError {
#[error("未找到: {0}")]
NotFound(String),
#[error("权限不足: {0}")]
Forbidden(String),
#[error("未认证")]
Unauthorized,
#[error("无效输入: {0}")]
InvalidInput(String),
#[error("认证失败: {0}")]
AuthError(String),
#[error("用户已存在: {0}")]
AlreadyExists(String),
#[error("序列化错误: {0}")]
Serialization(#[from] serde_json::Error),
#[error("IO 错误: {0}")]
Io(#[from] std::io::Error),
#[error("数据库错误: {0}")]
Database(#[from] sqlx::Error),
#[error("配置错误: {0}")]
Config(#[from] toml::de::Error),
#[error("JWT 错误: {0}")]
Jwt(#[from] jsonwebtoken::errors::Error),
#[error("密码哈希错误: {0}")]
PasswordHash(String),
#[error("TOTP 错误: {0}")]
Totp(String),
#[error("加密错误: {0}")]
Encryption(String),
#[error("中转错误: {0}")]
Relay(String),
#[error("速率限制: {0}")]
RateLimited(String),
#[error("内部错误: {0}")]
Internal(String),
}
impl SaasError {
/// 获取 HTTP 状态码
pub fn status_code(&self) -> StatusCode {
match self {
Self::NotFound(_) => StatusCode::NOT_FOUND,
Self::Forbidden(_) => StatusCode::FORBIDDEN,
Self::Unauthorized => StatusCode::UNAUTHORIZED,
Self::InvalidInput(_) => StatusCode::BAD_REQUEST,
Self::AlreadyExists(_) => StatusCode::CONFLICT,
Self::RateLimited(_) => StatusCode::TOO_MANY_REQUESTS,
Self::Database(_) | Self::Internal(_) | Self::Io(_) | Self::Serialization(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::AuthError(_) => StatusCode::UNAUTHORIZED,
Self::Jwt(_) | Self::PasswordHash(_) | Self::Totp(_) | Self::Encryption(_) => {
StatusCode::INTERNAL_SERVER_ERROR
}
Self::Config(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::Relay(_) => StatusCode::BAD_GATEWAY,
}
}
/// 获取错误代码
pub fn error_code(&self) -> &str {
match self {
Self::NotFound(_) => "NOT_FOUND",
Self::Forbidden(_) => "FORBIDDEN",
Self::Unauthorized => "UNAUTHORIZED",
Self::InvalidInput(_) => "INVALID_INPUT",
Self::AlreadyExists(_) => "ALREADY_EXISTS",
Self::RateLimited(_) => "RATE_LIMITED",
Self::Database(_) => "DATABASE_ERROR",
Self::Io(_) => "IO_ERROR",
Self::Serialization(_) => "SERIALIZATION_ERROR",
Self::Internal(_) => "INTERNAL_ERROR",
Self::AuthError(_) => "AUTH_ERROR",
Self::Jwt(_) => "JWT_ERROR",
Self::PasswordHash(_) => "PASSWORD_HASH_ERROR",
Self::Totp(_) => "TOTP_ERROR",
Self::Encryption(_) => "ENCRYPTION_ERROR",
Self::Config(_) => "CONFIG_ERROR",
Self::Relay(_) => "RELAY_ERROR",
}
}
}
/// 实现 Axum 响应
impl IntoResponse for SaasError {
fn into_response(self) -> Response {
let status = self.status_code();
let body = json!({
"error": self.error_code(),
"message": self.to_string(),
});
(status, axum::Json(body)).into_response()
}
}
/// Result 类型别名
pub type SaasResult<T> = std::result::Result<T, SaasError>;

View File

@@ -0,0 +1,14 @@
//! ZCLAW SaaS Backend
//!
//! 独立的 SaaS 后端服务,提供账号权限管理、模型配置、请求中转和配置迁移。
pub mod config;
pub mod db;
pub mod error;
pub mod state;
pub mod auth;
pub mod account;
pub mod model_config;
pub mod relay;
pub mod migration;

View File

@@ -0,0 +1,60 @@
//! ZCLAW SaaS 服务入口
use tracing::info;
use zclaw_saas::{config::SaaSConfig, db::init_db, state::AppState};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "zclaw_saas=debug,tower_http=debug".into()),
)
.init();
let config = SaaSConfig::load()?;
info!("SaaS config loaded: {}:{}", config.server.host, config.server.port);
let db = init_db(&config.database.url).await?;
info!("Database initialized");
let state = AppState::new(db, config.clone());
let app = build_router(state);
let listener = tokio::net::TcpListener::bind(format!("{}:{}", config.server.host, config.server.port))
.await?;
info!("SaaS server listening on {}:{}", config.server.host, config.server.port);
axum::serve(listener, app).await?;
Ok(())
}
fn build_router(state: AppState) -> axum::Router {
use axum::middleware;
use tower_http::cors::{Any, CorsLayer};
use tower_http::trace::TraceLayer;
let cors = CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)
.allow_headers(Any);
let public_routes = zclaw_saas::auth::routes();
let protected_routes = zclaw_saas::auth::protected_routes()
.merge(zclaw_saas::account::routes())
.merge(zclaw_saas::model_config::routes())
.merge(zclaw_saas::relay::routes())
.merge(zclaw_saas::migration::routes())
.layer(middleware::from_fn_with_state(
state.clone(),
zclaw_saas::auth::auth_middleware,
));
axum::Router::new()
.merge(public_routes)
.merge(protected_routes)
.layer(TraceLayer::new_for_http())
.layer(cors)
.with_state(state)
}

View File

@@ -0,0 +1,104 @@
//! 配置迁移 HTTP 处理器
use axum::{
extract::{Extension, Path, Query, State},
http::StatusCode, Json,
};
use crate::state::AppState;
use crate::error::SaasResult;
use crate::auth::types::AuthContext;
use super::{types::*, service};
/// GET /api/v1/config/items?category=xxx&source=xxx
pub async fn list_config_items(
State(state): State<AppState>,
Query(query): Query<ConfigQuery>,
_ctx: Extension<AuthContext>,
) -> SaasResult<Json<Vec<ConfigItemInfo>>> {
service::list_config_items(&state.db, &query).await.map(Json)
}
/// GET /api/v1/config/items/:id
pub async fn get_config_item(
State(state): State<AppState>,
Path(id): Path<String>,
_ctx: Extension<AuthContext>,
) -> SaasResult<Json<ConfigItemInfo>> {
service::get_config_item(&state.db, &id).await.map(Json)
}
/// POST /api/v1/config/items (admin only)
pub async fn create_config_item(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Json(req): Json<CreateConfigItemRequest>,
) -> SaasResult<(StatusCode, Json<ConfigItemInfo>)> {
if !ctx.permissions.contains(&"config:manage".to_string()) {
return Err(crate::error::SaasError::Forbidden("需要 config:manage 权限".into()));
}
let item = service::create_config_item(&state.db, &req).await?;
Ok((StatusCode::CREATED, Json(item)))
}
/// PUT /api/v1/config/items/:id (admin only)
pub async fn update_config_item(
State(state): State<AppState>,
Path(id): Path<String>,
Extension(ctx): Extension<AuthContext>,
Json(req): Json<UpdateConfigItemRequest>,
) -> SaasResult<Json<ConfigItemInfo>> {
if !ctx.permissions.contains(&"config:manage".to_string()) {
return Err(crate::error::SaasError::Forbidden("需要 config:manage 权限".into()));
}
service::update_config_item(&state.db, &id, &req).await.map(Json)
}
/// DELETE /api/v1/config/items/:id (admin only)
pub async fn delete_config_item(
State(state): State<AppState>,
Path(id): Path<String>,
Extension(ctx): Extension<AuthContext>,
) -> SaasResult<Json<serde_json::Value>> {
if !ctx.permissions.contains(&"config:manage".to_string()) {
return Err(crate::error::SaasError::Forbidden("需要 config:manage 权限".into()));
}
service::delete_config_item(&state.db, &id).await?;
Ok(Json(serde_json::json!({"ok": true})))
}
/// GET /api/v1/config/analysis
pub async fn analyze_config(
State(state): State<AppState>,
_ctx: Extension<AuthContext>,
) -> SaasResult<Json<ConfigAnalysis>> {
service::analyze_config(&state.db).await.map(Json)
}
/// POST /api/v1/config/seed (admin only)
pub async fn seed_config(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
) -> SaasResult<Json<serde_json::Value>> {
if !ctx.permissions.contains(&"config:manage".to_string()) {
return Err(crate::error::SaasError::Forbidden("需要 config:manage 权限".into()));
}
let count = service::seed_default_config_items(&state.db).await?;
Ok(Json(serde_json::json!({"created": count})))
}
/// POST /api/v1/config/sync
pub async fn sync_config(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Json(req): Json<SyncConfigRequest>,
) -> SaasResult<Json<Vec<ConfigSyncLogInfo>>> {
service::sync_config(&state.db, &ctx.account_id, &req).await.map(Json)
}
/// GET /api/v1/config/sync-logs
pub async fn list_sync_logs(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
) -> SaasResult<Json<Vec<ConfigSyncLogInfo>>> {
service::list_sync_logs(&state.db, &ctx.account_id).await.map(Json)
}

View File

@@ -0,0 +1,19 @@
//! 配置迁移模块
pub mod types;
pub mod service;
pub mod handlers;
use axum::routing::{get, post};
use crate::state::AppState;
/// 配置迁移路由 (需要认证)
pub fn routes() -> axum::Router<AppState> {
axum::Router::new()
.route("/api/v1/config/items", get(handlers::list_config_items).post(handlers::create_config_item))
.route("/api/v1/config/items/{id}", get(handlers::get_config_item).put(handlers::update_config_item).delete(handlers::delete_config_item))
.route("/api/v1/config/analysis", get(handlers::analyze_config))
.route("/api/v1/config/seed", post(handlers::seed_config))
.route("/api/v1/config/sync", post(handlers::sync_config))
.route("/api/v1/config/sync-logs", get(handlers::list_sync_logs))
}

View File

@@ -0,0 +1,272 @@
//! 配置迁移业务逻辑
use sqlx::SqlitePool;
use crate::error::{SaasError, SaasResult};
use super::types::*;
// ============ Config Items ============
pub async fn list_config_items(
db: &SqlitePool, query: &ConfigQuery,
) -> SaasResult<Vec<ConfigItemInfo>> {
let sql = match (&query.category, &query.source) {
(Some(_), Some(_)) => {
"SELECT id, category, key_path, value_type, current_value, default_value, source, description, requires_restart, created_at, updated_at
FROM config_items WHERE category = ?1 AND source = ?2 ORDER BY category, key_path"
}
(Some(_), None) => {
"SELECT id, category, key_path, value_type, current_value, default_value, source, description, requires_restart, created_at, updated_at
FROM config_items WHERE category = ?1 ORDER BY key_path"
}
(None, Some(_)) => {
"SELECT id, category, key_path, value_type, current_value, default_value, source, description, requires_restart, created_at, updated_at
FROM config_items WHERE source = ?1 ORDER BY category, key_path"
}
(None, None) => {
"SELECT id, category, key_path, value_type, current_value, default_value, source, description, requires_restart, created_at, updated_at
FROM config_items ORDER BY category, key_path"
}
};
let mut query_builder = sqlx::query_as::<_, (String, String, String, String, Option<String>, Option<String>, String, Option<String>, bool, String, String)>(sql);
if let Some(cat) = &query.category {
query_builder = query_builder.bind(cat);
}
if let Some(src) = &query.source {
query_builder = query_builder.bind(src);
}
let rows = query_builder.fetch_all(db).await?;
Ok(rows.into_iter().map(|(id, category, key_path, value_type, current_value, default_value, source, description, requires_restart, created_at, updated_at)| {
ConfigItemInfo { id, category, key_path, value_type, current_value, default_value, source, description, requires_restart, created_at, updated_at }
}).collect())
}
pub async fn get_config_item(db: &SqlitePool, item_id: &str) -> SaasResult<ConfigItemInfo> {
let row: Option<(String, String, String, String, Option<String>, Option<String>, String, Option<String>, bool, String, String)> =
sqlx::query_as(
"SELECT id, category, key_path, value_type, current_value, default_value, source, description, requires_restart, created_at, updated_at
FROM config_items WHERE id = ?1"
)
.bind(item_id)
.fetch_optional(db)
.await?;
let (id, category, key_path, value_type, current_value, default_value, source, description, requires_restart, created_at, updated_at) =
row.ok_or_else(|| SaasError::NotFound(format!("配置项 {} 不存在", item_id)))?;
Ok(ConfigItemInfo { id, category, key_path, value_type, current_value, default_value, source, description, requires_restart, created_at, updated_at })
}
pub async fn create_config_item(
db: &SqlitePool, req: &CreateConfigItemRequest,
) -> SaasResult<ConfigItemInfo> {
let id = uuid::Uuid::new_v4().to_string();
let now = chrono::Utc::now().to_rfc3339();
let source = req.source.as_deref().unwrap_or("local");
let requires_restart = req.requires_restart.unwrap_or(false);
// 检查唯一性
let existing: Option<(String,)> = sqlx::query_as(
"SELECT id FROM config_items WHERE category = ?1 AND key_path = ?2"
)
.bind(&req.category).bind(&req.key_path)
.fetch_optional(db).await?;
if existing.is_some() {
return Err(SaasError::AlreadyExists(format!(
"配置项 {}:{} 已存在", req.category, req.key_path
)));
}
sqlx::query(
"INSERT INTO config_items (id, category, key_path, value_type, current_value, default_value, source, description, requires_restart, created_at, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?10)"
)
.bind(&id).bind(&req.category).bind(&req.key_path).bind(&req.value_type)
.bind(&req.current_value).bind(&req.default_value).bind(source)
.bind(&req.description).bind(requires_restart).bind(&now)
.execute(db).await?;
get_config_item(db, &id).await
}
pub async fn update_config_item(
db: &SqlitePool, item_id: &str, req: &UpdateConfigItemRequest,
) -> SaasResult<ConfigItemInfo> {
let now = chrono::Utc::now().to_rfc3339();
let mut updates = Vec::new();
let mut params: Vec<String> = Vec::new();
if let Some(ref v) = req.current_value { updates.push("current_value = ?"); params.push(v.clone()); }
if let Some(ref v) = req.source { updates.push("source = ?"); params.push(v.clone()); }
if let Some(ref v) = req.description { updates.push("description = ?"); params.push(v.clone()); }
if updates.is_empty() {
return get_config_item(db, item_id).await;
}
updates.push("updated_at = ?");
params.push(now);
params.push(item_id.to_string());
let sql = format!("UPDATE config_items SET {} WHERE id = ?", updates.join(", "));
let mut query = sqlx::query(&sql);
for p in &params {
query = query.bind(p);
}
query.execute(db).await?;
get_config_item(db, item_id).await
}
pub async fn delete_config_item(db: &SqlitePool, item_id: &str) -> SaasResult<()> {
let result = sqlx::query("DELETE FROM config_items WHERE id = ?1")
.bind(item_id).execute(db).await?;
if result.rows_affected() == 0 {
return Err(SaasError::NotFound(format!("配置项 {} 不存在", item_id)));
}
Ok(())
}
// ============ Config Analysis ============
pub async fn analyze_config(db: &SqlitePool) -> SaasResult<ConfigAnalysis> {
let items = list_config_items(db, &ConfigQuery { category: None, source: None }).await?;
let mut categories: std::collections::HashMap<String, (i64, i64)> = std::collections::HashMap::new();
for item in &items {
let entry = categories.entry(item.category.clone()).or_insert((0, 0));
entry.0 += 1;
if item.source == "saas" {
entry.1 += 1;
}
}
let category_summaries: Vec<CategorySummary> = categories.into_iter()
.map(|(category, (count, saas_managed))| CategorySummary { category, count, saas_managed })
.collect();
Ok(ConfigAnalysis {
total_items: items.len() as i64,
categories: category_summaries,
items,
})
}
/// 种子默认配置项
pub async fn seed_default_config_items(db: &SqlitePool) -> SaasResult<usize> {
let defaults = [
("server", "server.host", "string", Some("127.0.0.1"), Some("127.0.0.1"), "服务器监听地址"),
("server", "server.port", "integer", Some("4200"), Some("4200"), "服务器端口"),
("server", "server.cors_origins", "array", None, None, "CORS 允许的源"),
("agent", "agent.defaults.default_model", "string", Some("zhipu/glm-4-plus"), Some("zhipu/glm-4-plus"), "默认模型"),
("agent", "agent.defaults.fallback_models", "array", None, None, "回退模型列表"),
("agent", "agent.defaults.max_sessions", "integer", Some("10"), Some("10"), "最大并发会话数"),
("agent", "agent.defaults.heartbeat_interval", "duration", Some("1h"), Some("1h"), "心跳间隔"),
("agent", "agent.defaults.session_timeout", "duration", Some("24h"), Some("24h"), "会话超时"),
("memory", "agent.defaults.memory.max_history_length", "integer", Some("100"), Some("100"), "最大历史长度"),
("memory", "agent.defaults.memory.summarize_threshold", "integer", Some("50"), Some("50"), "摘要阈值"),
("llm", "llm.default_provider", "string", Some("zhipu"), Some("zhipu"), "默认 LLM Provider"),
("llm", "llm.temperature", "float", Some("0.7"), Some("0.7"), "默认温度"),
("llm", "llm.max_tokens", "integer", Some("4096"), Some("4096"), "默认最大 token 数"),
];
let mut created = 0;
let now = chrono::Utc::now().to_rfc3339();
for (category, key_path, value_type, default_value, current_value, description) in defaults {
let existing: Option<(String,)> = sqlx::query_as(
"SELECT id FROM config_items WHERE category = ?1 AND key_path = ?2"
)
.bind(category).bind(key_path)
.fetch_optional(db)
.await?;
if existing.is_none() {
let id = uuid::Uuid::new_v4().to_string();
sqlx::query(
"INSERT INTO config_items (id, category, key_path, value_type, current_value, default_value, source, description, requires_restart, created_at, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, 'local', ?7, 0, ?8, ?8)"
)
.bind(&id).bind(category).bind(key_path).bind(value_type)
.bind(current_value).bind(default_value).bind(description).bind(&now)
.execute(db)
.await?;
created += 1;
}
}
Ok(created)
}
// ============ Config Sync ============
pub async fn sync_config(
db: &SqlitePool, account_id: &str, req: &SyncConfigRequest,
) -> SaasResult<Vec<ConfigSyncLogInfo>> {
let now = chrono::Utc::now().to_rfc3339();
let config_keys_str = serde_json::to_string(&req.config_keys)?;
let client_values_str = Some(serde_json::to_string(&req.client_values)?);
// 获取 SaaS 端的配置值
let saas_items = list_config_items(db, &ConfigQuery { category: None, source: None }).await?;
let saas_values: serde_json::Value = saas_items.iter()
.filter(|item| req.config_keys.contains(&item.key_path))
.map(|item| {
let key = format!("{}.{}", item.category, item.key_path);
(key, serde_json::json!({
"value": item.current_value,
"source": item.source,
}))
})
.collect();
let saas_values_str = Some(serde_json::to_string(&saas_values)?);
let resolution = "saas_wins".to_string(); // SaaS 配置优先
let id = sqlx::query(
"INSERT INTO config_sync_log (account_id, client_fingerprint, action, config_keys, client_values, saas_values, resolution, created_at)
VALUES (?1, ?2, 'sync', ?3, ?4, ?5, ?6, ?7)"
)
.bind(account_id).bind(&req.client_fingerprint)
.bind(&config_keys_str).bind(&client_values_str)
.bind(&saas_values_str).bind(&resolution).bind(&now)
.execute(db)
.await?;
let log_id = id.last_insert_rowid();
// 返回同步结果
let row: Option<(i64, String, String, String, String, Option<String>, Option<String>, Option<String>, String)> =
sqlx::query_as(
"SELECT id, account_id, client_fingerprint, action, config_keys, client_values, saas_values, resolution, created_at
FROM config_sync_log WHERE id = ?1"
)
.bind(log_id)
.fetch_optional(db)
.await?;
Ok(row.into_iter().map(|(id, account_id, client_fingerprint, action, config_keys, client_values, saas_values, resolution, created_at)| {
ConfigSyncLogInfo { id, account_id, client_fingerprint, action, config_keys, client_values, saas_values, resolution, created_at }
}).collect())
}
pub async fn list_sync_logs(
db: &SqlitePool, account_id: &str,
) -> SaasResult<Vec<ConfigSyncLogInfo>> {
let rows: Vec<(i64, String, String, String, String, Option<String>, Option<String>, Option<String>, String)> =
sqlx::query_as(
"SELECT id, account_id, client_fingerprint, action, config_keys, client_values, saas_values, resolution, created_at
FROM config_sync_log WHERE account_id = ?1 ORDER BY created_at DESC LIMIT 50"
)
.bind(account_id)
.fetch_all(db)
.await?;
Ok(rows.into_iter().map(|(id, account_id, client_fingerprint, action, config_keys, client_values, saas_values, resolution, created_at)| {
ConfigSyncLogInfo { id, account_id, client_fingerprint, action, config_keys, client_values, saas_values, resolution, created_at }
}).collect())
}

View File

@@ -0,0 +1,84 @@
//! 配置迁移类型定义
use serde::{Deserialize, Serialize};
/// 配置项信息
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConfigItemInfo {
pub id: String,
pub category: String,
pub key_path: String,
pub value_type: String,
pub current_value: Option<String>,
pub default_value: Option<String>,
pub source: String,
pub description: Option<String>,
pub requires_restart: bool,
pub created_at: String,
pub updated_at: String,
}
/// 创建配置项请求
#[derive(Debug, Deserialize)]
pub struct CreateConfigItemRequest {
pub category: String,
pub key_path: String,
pub value_type: String,
pub current_value: Option<String>,
pub default_value: Option<String>,
pub source: Option<String>,
pub description: Option<String>,
pub requires_restart: Option<bool>,
}
/// 更新配置项请求
#[derive(Debug, Deserialize)]
pub struct UpdateConfigItemRequest {
pub current_value: Option<String>,
pub source: Option<String>,
pub description: Option<String>,
}
/// 配置同步日志
#[derive(Debug, Clone, Serialize)]
pub struct ConfigSyncLogInfo {
pub id: i64,
pub account_id: String,
pub client_fingerprint: String,
pub action: String,
pub config_keys: String,
pub client_values: Option<String>,
pub saas_values: Option<String>,
pub resolution: Option<String>,
pub created_at: String,
}
/// 配置分析结果
#[derive(Debug, Serialize)]
pub struct ConfigAnalysis {
pub total_items: i64,
pub categories: Vec<CategorySummary>,
pub items: Vec<ConfigItemInfo>,
}
#[derive(Debug, Serialize)]
pub struct CategorySummary {
pub category: String,
pub count: i64,
pub saas_managed: i64,
}
/// 配置同步请求
#[derive(Debug, Deserialize)]
pub struct SyncConfigRequest {
pub client_fingerprint: String,
pub config_keys: Vec<String>,
pub client_values: serde_json::Value,
}
/// 配置查询参数
#[derive(Debug, Deserialize)]
pub struct ConfigQuery {
pub category: Option<String>,
pub source: Option<String>,
}

View File

@@ -0,0 +1,206 @@
//! 模型配置 HTTP 处理器
use axum::{
extract::{Extension, Path, Query, State},
http::StatusCode, Json,
};
use crate::state::AppState;
use crate::error::{SaasError, SaasResult};
use crate::auth::types::AuthContext;
use crate::auth::handlers::log_operation;
use super::{types::*, service};
// ============ Providers ============
/// GET /api/v1/providers
pub async fn list_providers(
State(state): State<AppState>,
_ctx: Extension<AuthContext>,
) -> SaasResult<Json<Vec<ProviderInfo>>> {
service::list_providers(&state.db).await.map(Json)
}
/// GET /api/v1/providers/:id
pub async fn get_provider(
State(state): State<AppState>,
Path(id): Path<String>,
_ctx: Extension<AuthContext>,
) -> SaasResult<Json<ProviderInfo>> {
service::get_provider(&state.db, &id).await.map(Json)
}
/// POST /api/v1/providers (admin only)
pub async fn create_provider(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Json(req): Json<CreateProviderRequest>,
) -> SaasResult<(StatusCode, Json<ProviderInfo>)> {
if !ctx.permissions.contains(&"provider:manage".to_string()) {
return Err(SaasError::Forbidden("需要 provider:manage 权限".into()));
}
let provider = service::create_provider(&state.db, &req).await?;
log_operation(&state.db, &ctx.account_id, "provider.create", "provider", &provider.id,
Some(serde_json::json!({"name": &req.name})), None).await?;
Ok((StatusCode::CREATED, Json(provider)))
}
/// PUT /api/v1/providers/:id (admin only)
pub async fn update_provider(
State(state): State<AppState>,
Path(id): Path<String>,
Extension(ctx): Extension<AuthContext>,
Json(req): Json<UpdateProviderRequest>,
) -> SaasResult<Json<ProviderInfo>> {
if !ctx.permissions.contains(&"provider:manage".to_string()) {
return Err(SaasError::Forbidden("需要 provider:manage 权限".into()));
}
let provider = service::update_provider(&state.db, &id, &req).await?;
log_operation(&state.db, &ctx.account_id, "provider.update", "provider", &id, None, None).await?;
Ok(Json(provider))
}
/// DELETE /api/v1/providers/:id (admin only)
pub async fn delete_provider(
State(state): State<AppState>,
Path(id): Path<String>,
Extension(ctx): Extension<AuthContext>,
) -> SaasResult<Json<serde_json::Value>> {
if !ctx.permissions.contains(&"provider:manage".to_string()) {
return Err(SaasError::Forbidden("需要 provider:manage 权限".into()));
}
service::delete_provider(&state.db, &id).await?;
log_operation(&state.db, &ctx.account_id, "provider.delete", "provider", &id, None, None).await?;
Ok(Json(serde_json::json!({"ok": true})))
}
// ============ Models ============
/// GET /api/v1/models?provider_id=xxx
pub async fn list_models(
State(state): State<AppState>,
Query(params): Query<std::collections::HashMap<String, String>>,
_ctx: Extension<AuthContext>,
) -> SaasResult<Json<Vec<ModelInfo>>> {
let provider_id = params.get("provider_id").map(|s| s.as_str());
service::list_models(&state.db, provider_id).await.map(Json)
}
/// GET /api/v1/models/:id
pub async fn get_model(
State(state): State<AppState>,
Path(id): Path<String>,
_ctx: Extension<AuthContext>,
) -> SaasResult<Json<ModelInfo>> {
service::get_model(&state.db, &id).await.map(Json)
}
/// POST /api/v1/models (admin only)
pub async fn create_model(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Json(req): Json<CreateModelRequest>,
) -> SaasResult<(StatusCode, Json<ModelInfo>)> {
if !ctx.permissions.contains(&"model:manage".to_string()) {
return Err(SaasError::Forbidden("需要 model:manage 权限".into()));
}
let model = service::create_model(&state.db, &req).await?;
log_operation(&state.db, &ctx.account_id, "model.create", "model", &model.id,
Some(serde_json::json!({"model_id": &req.model_id, "provider_id": &req.provider_id})), None).await?;
Ok((StatusCode::CREATED, Json(model)))
}
/// PUT /api/v1/models/:id (admin only)
pub async fn update_model(
State(state): State<AppState>,
Path(id): Path<String>,
Extension(ctx): Extension<AuthContext>,
Json(req): Json<UpdateModelRequest>,
) -> SaasResult<Json<ModelInfo>> {
if !ctx.permissions.contains(&"model:manage".to_string()) {
return Err(SaasError::Forbidden("需要 model:manage 权限".into()));
}
let model = service::update_model(&state.db, &id, &req).await?;
log_operation(&state.db, &ctx.account_id, "model.update", "model", &id, None, None).await?;
Ok(Json(model))
}
/// DELETE /api/v1/models/:id (admin only)
pub async fn delete_model(
State(state): State<AppState>,
Path(id): Path<String>,
Extension(ctx): Extension<AuthContext>,
) -> SaasResult<Json<serde_json::Value>> {
if !ctx.permissions.contains(&"model:manage".to_string()) {
return Err(SaasError::Forbidden("需要 model:manage 权限".into()));
}
service::delete_model(&state.db, &id).await?;
log_operation(&state.db, &ctx.account_id, "model.delete", "model", &id, None, None).await?;
Ok(Json(serde_json::json!({"ok": true})))
}
// ============ Account API Keys ============
/// GET /api/v1/keys?provider_id=xxx
pub async fn list_api_keys(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Query(params): Query<std::collections::HashMap<String, String>>,
) -> SaasResult<Json<Vec<AccountApiKeyInfo>>> {
let provider_id = params.get("provider_id").map(|s| s.as_str());
service::list_account_api_keys(&state.db, &ctx.account_id, provider_id).await.map(Json)
}
/// POST /api/v1/keys
pub async fn create_api_key(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Json(req): Json<CreateAccountApiKeyRequest>,
) -> SaasResult<(StatusCode, Json<AccountApiKeyInfo>)> {
let key = service::create_account_api_key(&state.db, &ctx.account_id, &req).await?;
log_operation(&state.db, &ctx.account_id, "api_key.create", "api_key", &key.id,
Some(serde_json::json!({"provider_id": &req.provider_id})), None).await?;
Ok((StatusCode::CREATED, Json(key)))
}
/// POST /api/v1/keys/:id/rotate
pub async fn rotate_api_key(
State(state): State<AppState>,
Path(id): Path<String>,
Extension(ctx): Extension<AuthContext>,
Json(req): Json<RotateApiKeyRequest>,
) -> SaasResult<Json<serde_json::Value>> {
service::rotate_account_api_key(&state.db, &id, &ctx.account_id, &req.new_key_value).await?;
log_operation(&state.db, &ctx.account_id, "api_key.rotate", "api_key", &id, None, None).await?;
Ok(Json(serde_json::json!({"ok": true})))
}
/// DELETE /api/v1/keys/:id
pub async fn revoke_api_key(
State(state): State<AppState>,
Path(id): Path<String>,
Extension(ctx): Extension<AuthContext>,
) -> SaasResult<Json<serde_json::Value>> {
service::revoke_account_api_key(&state.db, &id, &ctx.account_id).await?;
log_operation(&state.db, &ctx.account_id, "api_key.revoke", "api_key", &id, None, None).await?;
Ok(Json(serde_json::json!({"ok": true})))
}
// ============ Usage ============
/// GET /api/v1/usage?from=...&to=...&provider_id=...&model_id=...
pub async fn get_usage(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Query(params): Query<UsageQuery>,
) -> SaasResult<Json<UsageStats>> {
service::get_usage_stats(&state.db, &ctx.account_id, &params).await.map(Json)
}
/// GET /api/v1/providers/:id/models (便捷路由)
pub async fn list_provider_models(
State(state): State<AppState>,
Path(provider_id): Path<String>,
_ctx: Extension<AuthContext>,
) -> SaasResult<Json<Vec<ModelInfo>>> {
service::list_models(&state.db, Some(&provider_id)).await.map(Json)
}

View File

@@ -0,0 +1,26 @@
//! 模型配置模块
pub mod types;
pub mod service;
pub mod handlers;
use axum::routing::{delete, get, post};
use crate::state::AppState;
/// 模型配置路由 (需要认证)
pub fn routes() -> axum::Router<AppState> {
axum::Router::new()
// Providers
.route("/api/v1/providers", get(handlers::list_providers).post(handlers::create_provider))
.route("/api/v1/providers/{id}", get(handlers::get_provider).put(handlers::update_provider).delete(handlers::delete_provider))
.route("/api/v1/providers/{id}/models", get(handlers::list_provider_models))
// Models
.route("/api/v1/models", get(handlers::list_models).post(handlers::create_model))
.route("/api/v1/models/{id}", get(handlers::get_model).put(handlers::update_model).delete(handlers::delete_model))
// Account API Keys
.route("/api/v1/keys", get(handlers::list_api_keys).post(handlers::create_api_key))
.route("/api/v1/keys/{id}", delete(handlers::revoke_api_key))
.route("/api/v1/keys/{id}/rotate", post(handlers::rotate_api_key))
// Usage
.route("/api/v1/usage", get(handlers::get_usage))
}

View File

@@ -0,0 +1,411 @@
//! 模型配置业务逻辑
use sqlx::SqlitePool;
use crate::error::{SaasError, SaasResult};
use super::types::*;
// ============ Providers ============
pub async fn list_providers(db: &SqlitePool) -> SaasResult<Vec<ProviderInfo>> {
let rows: Vec<(String, String, String, String, String, bool, Option<i64>, Option<i64>, String, String)> =
sqlx::query_as(
"SELECT id, name, display_name, base_url, api_protocol, enabled, rate_limit_rpm, rate_limit_tpm, created_at, updated_at
FROM providers ORDER BY name"
)
.fetch_all(db)
.await?;
Ok(rows.into_iter().map(|(id, name, display_name, base_url, api_protocol, enabled, rpm, tpm, created_at, updated_at)| {
ProviderInfo { id, name, display_name, base_url, api_protocol, enabled, rate_limit_rpm: rpm, rate_limit_tpm: tpm, created_at, updated_at }
}).collect())
}
pub async fn get_provider(db: &SqlitePool, provider_id: &str) -> SaasResult<ProviderInfo> {
let row: Option<(String, String, String, String, String, bool, Option<i64>, Option<i64>, String, String)> =
sqlx::query_as(
"SELECT id, name, display_name, base_url, api_protocol, enabled, rate_limit_rpm, rate_limit_tpm, created_at, updated_at
FROM providers WHERE id = ?1"
)
.bind(provider_id)
.fetch_optional(db)
.await?;
let (id, name, display_name, base_url, api_protocol, enabled, rpm, tpm, created_at, updated_at) =
row.ok_or_else(|| SaasError::NotFound(format!("Provider {} 不存在", provider_id)))?;
Ok(ProviderInfo { id, name, display_name, base_url, api_protocol, enabled, rate_limit_rpm: rpm, rate_limit_tpm: tpm, created_at, updated_at })
}
pub async fn create_provider(db: &SqlitePool, req: &CreateProviderRequest) -> SaasResult<ProviderInfo> {
let id = uuid::Uuid::new_v4().to_string();
let now = chrono::Utc::now().to_rfc3339();
// 检查名称唯一性
let existing: Option<(String,)> = sqlx::query_as("SELECT id FROM providers WHERE name = ?1")
.bind(&req.name).fetch_optional(db).await?;
if existing.is_some() {
return Err(SaasError::AlreadyExists(format!("Provider '{}' 已存在", req.name)));
}
sqlx::query(
"INSERT INTO providers (id, name, display_name, api_key, base_url, api_protocol, enabled, rate_limit_rpm, rate_limit_tpm, created_at, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, 1, ?7, ?8, ?9, ?9)"
)
.bind(&id).bind(&req.name).bind(&req.display_name).bind(&req.api_key)
.bind(&req.base_url).bind(&req.api_protocol).bind(&req.rate_limit_rpm).bind(&req.rate_limit_tpm).bind(&now)
.execute(db).await?;
get_provider(db, &id).await
}
pub async fn update_provider(
db: &SqlitePool, provider_id: &str, req: &UpdateProviderRequest,
) -> SaasResult<ProviderInfo> {
let now = chrono::Utc::now().to_rfc3339();
let mut updates = Vec::new();
let mut params: Vec<Box<dyn std::fmt::Display + Send + Sync>> = Vec::new();
if let Some(ref v) = req.display_name { updates.push("display_name = ?"); params.push(Box::new(v.clone())); }
if let Some(ref v) = req.base_url { updates.push("base_url = ?"); params.push(Box::new(v.clone())); }
if let Some(ref v) = req.api_protocol { updates.push("api_protocol = ?"); params.push(Box::new(v.clone())); }
if let Some(ref v) = req.api_key { updates.push("api_key = ?"); params.push(Box::new(v.clone())); }
if let Some(v) = req.enabled { updates.push("enabled = ?"); params.push(Box::new(v)); }
if let Some(v) = req.rate_limit_rpm { updates.push("rate_limit_rpm = ?"); params.push(Box::new(v)); }
if let Some(v) = req.rate_limit_tpm { updates.push("rate_limit_tpm = ?"); params.push(Box::new(v)); }
if updates.is_empty() {
return get_provider(db, provider_id).await;
}
updates.push("updated_at = ?");
params.push(Box::new(now.clone()));
params.push(Box::new(provider_id.to_string()));
let sql = format!("UPDATE providers SET {} WHERE id = ?", updates.join(", "));
let mut query = sqlx::query(&sql);
for p in &params {
query = query.bind(format!("{}", p));
}
query.execute(db).await?;
get_provider(db, provider_id).await
}
pub async fn delete_provider(db: &SqlitePool, provider_id: &str) -> SaasResult<()> {
let result = sqlx::query("DELETE FROM providers WHERE id = ?1")
.bind(provider_id).execute(db).await?;
if result.rows_affected() == 0 {
return Err(SaasError::NotFound(format!("Provider {} 不存在", provider_id)));
}
Ok(())
}
// ============ Models ============
pub async fn list_models(db: &SqlitePool, provider_id: Option<&str>) -> SaasResult<Vec<ModelInfo>> {
let sql = if provider_id.is_some() {
"SELECT id, provider_id, model_id, alias, context_window, max_output_tokens, supports_streaming, supports_vision, enabled, pricing_input, pricing_output, created_at, updated_at
FROM models WHERE provider_id = ?1 ORDER BY alias"
} else {
"SELECT id, provider_id, model_id, alias, context_window, max_output_tokens, supports_streaming, supports_vision, enabled, pricing_input, pricing_output, created_at, updated_at
FROM models ORDER BY provider_id, alias"
};
let mut query = sqlx::query_as::<_, (String, String, String, String, i64, i64, bool, bool, bool, f64, f64, String, String)>(sql);
if let Some(pid) = provider_id {
query = query.bind(pid);
}
let rows = query.fetch_all(db).await?;
Ok(rows.into_iter().map(|(id, provider_id, model_id, alias, ctx, max_out, streaming, vision, enabled, pi, po, created_at, updated_at)| {
ModelInfo { id, provider_id, model_id, alias, context_window: ctx, max_output_tokens: max_out, supports_streaming: streaming, supports_vision: vision, enabled, pricing_input: pi, pricing_output: po, created_at, updated_at }
}).collect())
}
pub async fn create_model(db: &SqlitePool, req: &CreateModelRequest) -> SaasResult<ModelInfo> {
// 验证 provider 存在
let provider = get_provider(db, &req.provider_id).await?;
let id = uuid::Uuid::new_v4().to_string();
let now = chrono::Utc::now().to_rfc3339();
// 检查 model 唯一性
let existing: Option<(String,)> = sqlx::query_as(
"SELECT id FROM models WHERE provider_id = ?1 AND model_id = ?2"
)
.bind(&req.provider_id).bind(&req.model_id)
.fetch_optional(db).await?;
if existing.is_some() {
return Err(SaasError::AlreadyExists(format!(
"模型 '{}' 已存在于 provider '{}'", req.model_id, provider.name
)));
}
let ctx = req.context_window.unwrap_or(8192);
let max_out = req.max_output_tokens.unwrap_or(4096);
let streaming = req.supports_streaming.unwrap_or(true);
let vision = req.supports_vision.unwrap_or(false);
let pi = req.pricing_input.unwrap_or(0.0);
let po = req.pricing_output.unwrap_or(0.0);
sqlx::query(
"INSERT INTO models (id, provider_id, model_id, alias, context_window, max_output_tokens, supports_streaming, supports_vision, enabled, pricing_input, pricing_output, created_at, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, 1, ?9, ?10, ?11, ?11)"
)
.bind(&id).bind(&req.provider_id).bind(&req.model_id).bind(&req.alias)
.bind(ctx).bind(max_out).bind(streaming).bind(vision).bind(pi).bind(po).bind(&now)
.execute(db).await?;
get_model(db, &id).await
}
pub async fn get_model(db: &SqlitePool, model_id: &str) -> SaasResult<ModelInfo> {
let row: Option<(String, String, String, String, i64, i64, bool, bool, bool, f64, f64, String, String)> =
sqlx::query_as(
"SELECT id, provider_id, model_id, alias, context_window, max_output_tokens, supports_streaming, supports_vision, enabled, pricing_input, pricing_output, created_at, updated_at
FROM models WHERE id = ?1"
)
.bind(model_id)
.fetch_optional(db)
.await?;
let (id, provider_id, model_id, alias, ctx, max_out, streaming, vision, enabled, pi, po, created_at, updated_at) =
row.ok_or_else(|| SaasError::NotFound(format!("模型 {} 不存在", model_id)))?;
Ok(ModelInfo { id, provider_id, model_id, alias, context_window: ctx, max_output_tokens: max_out, supports_streaming: streaming, supports_vision: vision, enabled, pricing_input: pi, pricing_output: po, created_at, updated_at })
}
pub async fn update_model(
db: &SqlitePool, model_id: &str, req: &UpdateModelRequest,
) -> SaasResult<ModelInfo> {
let now = chrono::Utc::now().to_rfc3339();
let mut updates = Vec::new();
let mut params: Vec<Box<dyn std::fmt::Display + Send + Sync>> = Vec::new();
if let Some(ref v) = req.alias { updates.push("alias = ?"); params.push(Box::new(v.clone())); }
if let Some(v) = req.context_window { updates.push("context_window = ?"); params.push(Box::new(v)); }
if let Some(v) = req.max_output_tokens { updates.push("max_output_tokens = ?"); params.push(Box::new(v)); }
if let Some(v) = req.supports_streaming { updates.push("supports_streaming = ?"); params.push(Box::new(v)); }
if let Some(v) = req.supports_vision { updates.push("supports_vision = ?"); params.push(Box::new(v)); }
if let Some(v) = req.enabled { updates.push("enabled = ?"); params.push(Box::new(v)); }
if let Some(v) = req.pricing_input { updates.push("pricing_input = ?"); params.push(Box::new(v)); }
if let Some(v) = req.pricing_output { updates.push("pricing_output = ?"); params.push(Box::new(v)); }
if updates.is_empty() {
return get_model(db, model_id).await;
}
updates.push("updated_at = ?");
params.push(Box::new(now.clone()));
params.push(Box::new(model_id.to_string()));
let sql = format!("UPDATE models SET {} WHERE id = ?", updates.join(", "));
let mut query = sqlx::query(&sql);
for p in &params {
query = query.bind(format!("{}", p));
}
query.execute(db).await?;
get_model(db, model_id).await
}
pub async fn delete_model(db: &SqlitePool, model_id: &str) -> SaasResult<()> {
let result = sqlx::query("DELETE FROM models WHERE id = ?1")
.bind(model_id).execute(db).await?;
if result.rows_affected() == 0 {
return Err(SaasError::NotFound(format!("模型 {} 不存在", model_id)));
}
Ok(())
}
// ============ Account API Keys ============
pub async fn list_account_api_keys(
db: &SqlitePool, account_id: &str, provider_id: Option<&str>,
) -> SaasResult<Vec<AccountApiKeyInfo>> {
let sql = if provider_id.is_some() {
"SELECT id, provider_id, key_label, permissions, enabled, last_used_at, created_at, key_value
FROM account_api_keys WHERE account_id = ?1 AND provider_id = ?2 AND revoked_at IS NULL ORDER BY created_at DESC"
} else {
"SELECT id, provider_id, key_label, permissions, enabled, last_used_at, created_at, key_value
FROM account_api_keys WHERE account_id = ?1 AND revoked_at IS NULL ORDER BY created_at DESC"
};
let mut query = sqlx::query_as::<_, (String, String, Option<String>, String, bool, Option<String>, String, String)>(sql)
.bind(account_id);
if let Some(pid) = provider_id {
query = query.bind(pid);
}
let rows = query.fetch_all(db).await?;
Ok(rows.into_iter().map(|(id, provider_id, key_label, perms, enabled, last_used, created_at, key_value)| {
let permissions: Vec<String> = serde_json::from_str(&perms).unwrap_or_default();
let masked = mask_api_key(&key_value);
AccountApiKeyInfo { id, provider_id, key_label, permissions, enabled, last_used_at: last_used, created_at, masked_key: masked }
}).collect())
}
pub async fn create_account_api_key(
db: &SqlitePool, account_id: &str, req: &CreateAccountApiKeyRequest,
) -> SaasResult<AccountApiKeyInfo> {
// 验证 provider 存在
get_provider(db, &req.provider_id).await?;
let id = uuid::Uuid::new_v4().to_string();
let now = chrono::Utc::now().to_rfc3339();
let permissions = serde_json::to_string(&req.permissions)?;
sqlx::query(
"INSERT INTO account_api_keys (id, account_id, provider_id, key_value, key_label, permissions, enabled, created_at, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, 1, ?7, ?7)"
)
.bind(&id).bind(account_id).bind(&req.provider_id).bind(&req.key_value)
.bind(&req.key_label).bind(&permissions).bind(&now)
.execute(db).await?;
let masked = mask_api_key(&req.key_value);
Ok(AccountApiKeyInfo {
id, provider_id: req.provider_id.clone(), key_label: req.key_label.clone(),
permissions: req.permissions.clone(), enabled: true, last_used_at: None,
created_at: now, masked_key: masked,
})
}
pub async fn rotate_account_api_key(
db: &SqlitePool, key_id: &str, account_id: &str, new_key_value: &str,
) -> SaasResult<()> {
let now = chrono::Utc::now().to_rfc3339();
let result = sqlx::query(
"UPDATE account_api_keys SET key_value = ?1, updated_at = ?2 WHERE id = ?3 AND account_id = ?4 AND revoked_at IS NULL"
)
.bind(new_key_value).bind(&now).bind(key_id).bind(account_id)
.execute(db).await?;
if result.rows_affected() == 0 {
return Err(SaasError::NotFound("API Key 不存在或已撤销".into()));
}
Ok(())
}
pub async fn revoke_account_api_key(
db: &SqlitePool, key_id: &str, account_id: &str,
) -> SaasResult<()> {
let now = chrono::Utc::now().to_rfc3339();
let result = sqlx::query(
"UPDATE account_api_keys SET revoked_at = ?1 WHERE id = ?2 AND account_id = ?3 AND revoked_at IS NULL"
)
.bind(&now).bind(key_id).bind(account_id)
.execute(db).await?;
if result.rows_affected() == 0 {
return Err(SaasError::NotFound("API Key 不存在或已撤销".into()));
}
Ok(())
}
// ============ Usage Statistics ============
pub async fn get_usage_stats(
db: &SqlitePool, account_id: &str, query: &UsageQuery,
) -> SaasResult<UsageStats> {
let mut where_clauses = vec!["account_id = ?".to_string()];
let mut params: Vec<String> = vec![account_id.to_string()];
if let Some(ref from) = query.from {
where_clauses.push("created_at >= ?".to_string());
params.push(from.clone());
}
if let Some(ref to) = query.to {
where_clauses.push("created_at <= ?".to_string());
params.push(to.clone());
}
if let Some(ref pid) = query.provider_id {
where_clauses.push("provider_id = ?".to_string());
params.push(pid.clone());
}
if let Some(ref mid) = query.model_id {
where_clauses.push("model_id = ?".to_string());
params.push(mid.clone());
}
let where_sql = where_clauses.join(" AND ");
// 总量统计
let total_sql = format!(
"SELECT COUNT(*), COALESCE(SUM(input_tokens), 0), COALESCE(SUM(output_tokens), 0)
FROM usage_records WHERE {}", where_sql
);
let mut total_query = sqlx::query_as::<_, (i64, i64, i64)>(&total_sql);
for p in &params {
total_query = total_query.bind(p);
}
let (total_requests, total_input, total_output) = total_query.fetch_one(db).await?;
// 按模型统计
let by_model_sql = format!(
"SELECT provider_id, model_id, COUNT(*), COALESCE(SUM(input_tokens), 0), COALESCE(SUM(output_tokens), 0)
FROM usage_records WHERE {} GROUP BY provider_id, model_id ORDER BY COUNT(*) DESC LIMIT 20",
where_sql
);
let mut by_model_query = sqlx::query_as::<_, (String, String, i64, i64, i64)>(&by_model_sql);
for p in &params {
by_model_query = by_model_query.bind(p);
}
let by_model_rows = by_model_query.fetch_all(db).await?;
let by_model: Vec<ModelUsage> = by_model_rows.into_iter()
.map(|(provider_id, model_id, count, input, output)| {
ModelUsage { provider_id, model_id, request_count: count, input_tokens: input, output_tokens: output }
}).collect();
// 按天统计 (最近 30 天)
let from_30d = (chrono::Utc::now() - chrono::Duration::days(30)).to_rfc3339();
let daily_sql = format!(
"SELECT DATE(created_at) as day, COUNT(*), COALESCE(SUM(input_tokens), 0), COALESCE(SUM(output_tokens), 0)
FROM usage_records WHERE account_id = ?1 AND created_at >= ?2
GROUP BY DATE(created_at) ORDER BY day DESC LIMIT 30"
);
let daily_rows: Vec<(String, i64, i64, i64)> = sqlx::query_as(&daily_sql)
.bind(account_id).bind(&from_30d)
.fetch_all(db).await?;
let by_day: Vec<DailyUsage> = daily_rows.into_iter()
.map(|(date, count, input, output)| {
DailyUsage { date, request_count: count, input_tokens: input, output_tokens: output }
}).collect();
Ok(UsageStats {
total_requests,
total_input_tokens: total_input,
total_output_tokens: total_output,
by_model,
by_day,
})
}
pub async fn record_usage(
db: &SqlitePool, account_id: &str, provider_id: &str, model_id: &str,
input_tokens: i64, output_tokens: i64, latency_ms: Option<i64>,
status: &str, error_message: Option<&str>,
) -> SaasResult<()> {
let now = chrono::Utc::now().to_rfc3339();
sqlx::query(
"INSERT INTO usage_records (account_id, provider_id, model_id, input_tokens, output_tokens, latency_ms, status, error_message, created_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)"
)
.bind(account_id).bind(provider_id).bind(model_id)
.bind(input_tokens).bind(output_tokens).bind(latency_ms)
.bind(status).bind(error_message).bind(&now)
.execute(db).await?;
Ok(())
}
// ============ Helpers ============
fn mask_api_key(key: &str) -> String {
if key.len() <= 8 {
return "*".repeat(key.len());
}
format!("{}...{}", &key[..4], &key[key.len()-4..])
}

View File

@@ -0,0 +1,172 @@
//! 模型配置类型定义
use serde::{Deserialize, Serialize};
// --- Provider ---
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProviderInfo {
pub id: String,
pub name: String,
pub display_name: String,
pub base_url: String,
pub api_protocol: String,
pub enabled: bool,
pub rate_limit_rpm: Option<i64>,
pub rate_limit_tpm: Option<i64>,
pub created_at: String,
pub updated_at: String,
}
#[derive(Debug, Deserialize)]
pub struct CreateProviderRequest {
pub name: String,
pub display_name: String,
pub base_url: String,
#[serde(default = "default_protocol")]
pub api_protocol: String,
pub api_key: Option<String>,
pub rate_limit_rpm: Option<i64>,
pub rate_limit_tpm: Option<i64>,
}
fn default_protocol() -> String { "openai".into() }
#[derive(Debug, Deserialize)]
pub struct UpdateProviderRequest {
pub display_name: Option<String>,
pub base_url: Option<String>,
pub api_protocol: Option<String>,
pub api_key: Option<String>,
pub enabled: Option<bool>,
pub rate_limit_rpm: Option<i64>,
pub rate_limit_tpm: Option<i64>,
}
// --- Model ---
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ModelInfo {
pub id: String,
pub provider_id: String,
pub model_id: String,
pub alias: String,
pub context_window: i64,
pub max_output_tokens: i64,
pub supports_streaming: bool,
pub supports_vision: bool,
pub enabled: bool,
pub pricing_input: f64,
pub pricing_output: f64,
pub created_at: String,
pub updated_at: String,
}
#[derive(Debug, Deserialize)]
pub struct CreateModelRequest {
pub provider_id: String,
pub model_id: String,
pub alias: String,
pub context_window: Option<i64>,
pub max_output_tokens: Option<i64>,
pub supports_streaming: Option<bool>,
pub supports_vision: Option<bool>,
pub pricing_input: Option<f64>,
pub pricing_output: Option<f64>,
}
#[derive(Debug, Deserialize)]
pub struct UpdateModelRequest {
pub alias: Option<String>,
pub context_window: Option<i64>,
pub max_output_tokens: Option<i64>,
pub supports_streaming: Option<bool>,
pub supports_vision: Option<bool>,
pub enabled: Option<bool>,
pub pricing_input: Option<f64>,
pub pricing_output: Option<f64>,
}
// --- Account API Key ---
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AccountApiKeyInfo {
pub id: String,
pub provider_id: String,
pub key_label: Option<String>,
pub permissions: Vec<String>,
pub enabled: bool,
pub last_used_at: Option<String>,
pub created_at: String,
pub masked_key: String,
}
#[derive(Debug, Deserialize)]
pub struct CreateAccountApiKeyRequest {
pub provider_id: String,
pub key_value: String,
pub key_label: Option<String>,
#[serde(default)]
pub permissions: Vec<String>,
}
#[derive(Debug, Deserialize)]
pub struct RotateApiKeyRequest {
pub new_key_value: String,
}
// --- Usage ---
#[derive(Debug, Serialize)]
pub struct UsageStats {
pub total_requests: i64,
pub total_input_tokens: i64,
pub total_output_tokens: i64,
pub by_model: Vec<ModelUsage>,
pub by_day: Vec<DailyUsage>,
}
#[derive(Debug, Serialize)]
pub struct ModelUsage {
pub provider_id: String,
pub model_id: String,
pub request_count: i64,
pub input_tokens: i64,
pub output_tokens: i64,
}
#[derive(Debug, Serialize)]
pub struct DailyUsage {
pub date: String,
pub request_count: i64,
pub input_tokens: i64,
pub output_tokens: i64,
}
#[derive(Debug, Deserialize)]
pub struct UsageQuery {
pub from: Option<String>,
pub to: Option<String>,
pub provider_id: Option<String>,
pub model_id: Option<String>,
}
// --- Seed Data ---
#[derive(Debug, Deserialize)]
pub struct SeedProvider {
pub name: String,
pub display_name: String,
pub base_url: String,
pub models: Vec<SeedModel>,
}
#[derive(Debug, Deserialize)]
pub struct SeedModel {
pub id: String,
pub alias: String,
pub context_window: Option<i64>,
pub max_output_tokens: Option<i64>,
pub supports_streaming: Option<bool>,
pub supports_vision: Option<bool>,
}

View File

@@ -0,0 +1,165 @@
//! 中转服务 HTTP 处理器
use axum::{
extract::{Extension, Path, Query, State},
http::{HeaderMap, StatusCode},
response::{IntoResponse, Response},
Json,
};
use crate::state::AppState;
use crate::error::{SaasError, SaasResult};
use crate::auth::types::AuthContext;
use crate::auth::handlers::log_operation;
use crate::model_config::service as model_service;
use super::{types::*, service};
/// POST /api/v1/relay/chat/completions
/// OpenAI 兼容的聊天补全端点
pub async fn chat_completions(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
_headers: HeaderMap,
Json(req): Json<serde_json::Value>,
) -> SaasResult<Response> {
// 检查 relay:use 权限
if !ctx.permissions.contains(&"relay:use".to_string()) {
return Err(SaasError::Forbidden("需要 relay:use 权限".into()));
}
let model_name = req.get("model")
.and_then(|v| v.as_str())
.ok_or_else(|| SaasError::InvalidInput("缺少 model 字段".into()))?;
let stream = req.get("stream")
.and_then(|v| v.as_bool())
.unwrap_or(false);
// 查找 model 对应的 provider
let models = model_service::list_models(&state.db, None).await?;
let target_model = models.iter().find(|m| m.model_id == model_name && m.enabled)
.ok_or_else(|| SaasError::NotFound(format!("模型 {} 不存在或未启用", model_name)))?;
// 获取 provider 信息
let provider = model_service::get_provider(&state.db, &target_model.provider_id).await?;
if !provider.enabled {
return Err(SaasError::Forbidden(format!("Provider {} 已禁用", provider.name)));
}
// 获取 provider 的 API key (从数据库直接查询)
let provider_api_key: Option<String> = sqlx::query_scalar(
"SELECT api_key FROM providers WHERE id = ?1"
)
.bind(&target_model.provider_id)
.fetch_optional(&state.db)
.await?
.flatten();
let request_body = serde_json::to_string(&req)?;
// 创建中转任务
let task = service::create_relay_task(
&state.db, &ctx.account_id, &target_model.provider_id,
&target_model.model_id, &request_body, 0,
).await?;
log_operation(&state.db, &ctx.account_id, "relay.request", "relay_task", &task.id,
Some(serde_json::json!({"model": model_name, "stream": stream})), None).await?;
// 执行中转
let response = service::execute_relay(
&state.db, &task.id, &provider.base_url,
provider_api_key.as_deref(), &request_body, stream,
).await;
match response {
Ok(service::RelayResponse::Json(body)) => {
// 记录用量
let parsed: serde_json::Value = serde_json::from_str(&body).unwrap_or_default();
let input_tokens = parsed.get("usage")
.and_then(|u| u.get("prompt_tokens"))
.and_then(|v| v.as_i64())
.unwrap_or(0);
let output_tokens = parsed.get("usage")
.and_then(|u| u.get("completion_tokens"))
.and_then(|v| v.as_i64())
.unwrap_or(0);
model_service::record_usage(
&state.db, &ctx.account_id, &target_model.provider_id,
&target_model.model_id, input_tokens, output_tokens,
None, "success", None,
).await?;
Ok((StatusCode::OK, [(axum::http::header::CONTENT_TYPE, "application/json")], body).into_response())
}
Ok(service::RelayResponse::Sse(body)) => {
model_service::record_usage(
&state.db, &ctx.account_id, &target_model.provider_id,
&target_model.model_id, 0, 0,
None, "success", None,
).await?;
Ok((StatusCode::OK, [(axum::http::header::CONTENT_TYPE, "text/event-stream")], body).into_response())
}
Err(e) => {
model_service::record_usage(
&state.db, &ctx.account_id, &target_model.provider_id,
&target_model.model_id, 0, 0,
None, "failed", Some(&e.to_string()),
).await?;
Err(e)
}
}
}
/// GET /api/v1/relay/tasks
pub async fn list_tasks(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Query(query): Query<RelayTaskQuery>,
) -> SaasResult<Json<Vec<RelayTaskInfo>>> {
service::list_relay_tasks(&state.db, &ctx.account_id, &query).await.map(Json)
}
/// GET /api/v1/relay/tasks/:id
pub async fn get_task(
State(state): State<AppState>,
Path(id): Path<String>,
Extension(ctx): Extension<AuthContext>,
) -> SaasResult<Json<RelayTaskInfo>> {
let task = service::get_relay_task(&state.db, &id).await?;
// 只允许查看自己的任务 (admin 可查看全部)
if task.account_id != ctx.account_id && !ctx.permissions.contains(&"relay:admin".to_string()) {
return Err(SaasError::Forbidden("无权查看此任务".into()));
}
Ok(Json(task))
}
/// GET /api/v1/relay/models
/// 列出可用的中转模型 (enabled providers + enabled models)
pub async fn list_available_models(
State(state): State<AppState>,
_ctx: Extension<AuthContext>,
) -> SaasResult<Json<Vec<serde_json::Value>>> {
let providers = model_service::list_providers(&state.db).await?;
let enabled_provider_ids: std::collections::HashSet<String> =
providers.iter().filter(|p| p.enabled).map(|p| p.id.clone()).collect();
let models = model_service::list_models(&state.db, None).await?;
let available: Vec<serde_json::Value> = models.into_iter()
.filter(|m| m.enabled && enabled_provider_ids.contains(&m.provider_id))
.map(|m| {
serde_json::json!({
"id": m.model_id,
"provider_id": m.provider_id,
"alias": m.alias,
"context_window": m.context_window,
"max_output_tokens": m.max_output_tokens,
"supports_streaming": m.supports_streaming,
"supports_vision": m.supports_vision,
})
})
.collect();
Ok(Json(available))
}

View File

@@ -0,0 +1,17 @@
//! 中转服务模块
pub mod types;
pub mod service;
pub mod handlers;
use axum::routing::{get, post};
use crate::state::AppState;
/// 中转服务路由 (需要认证)
pub fn routes() -> axum::Router<AppState> {
axum::Router::new()
.route("/api/v1/relay/chat/completions", post(handlers::chat_completions))
.route("/api/v1/relay/tasks", get(handlers::list_tasks))
.route("/api/v1/relay/tasks/{id}", get(handlers::get_task))
.route("/api/v1/relay/models", get(handlers::list_available_models))
}

View File

@@ -0,0 +1,197 @@
//! 中转服务核心逻辑
use sqlx::SqlitePool;
use crate::error::{SaasError, SaasResult};
use super::types::*;
// ============ Relay Task Management ============
pub async fn create_relay_task(
db: &SqlitePool,
account_id: &str,
provider_id: &str,
model_id: &str,
request_body: &str,
priority: i64,
) -> SaasResult<RelayTaskInfo> {
let id = uuid::Uuid::new_v4().to_string();
let now = chrono::Utc::now().to_rfc3339();
let request_hash = hash_request(request_body);
sqlx::query(
"INSERT INTO relay_tasks (id, account_id, provider_id, model_id, request_hash, request_body, status, priority, attempt_count, max_attempts, queued_at, created_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, 'queued', ?7, 0, 3, ?8, ?8)"
)
.bind(&id).bind(account_id).bind(provider_id).bind(model_id)
.bind(&request_hash).bind(request_body).bind(priority).bind(&now)
.execute(db).await?;
get_relay_task(db, &id).await
}
pub async fn get_relay_task(db: &SqlitePool, task_id: &str) -> SaasResult<RelayTaskInfo> {
let row: Option<(String, String, String, String, String, i64, i64, i64, i64, i64, Option<String>, String, Option<String>, Option<String>, String)> =
sqlx::query_as(
"SELECT id, account_id, provider_id, model_id, status, priority, attempt_count, max_attempts, input_tokens, output_tokens, error_message, queued_at, started_at, completed_at, created_at
FROM relay_tasks WHERE id = ?1"
)
.bind(task_id)
.fetch_optional(db)
.await?;
let (id, account_id, provider_id, model_id, status, priority, attempt_count, max_attempts, input_tokens, output_tokens, error_message, queued_at, started_at, completed_at, created_at) =
row.ok_or_else(|| SaasError::NotFound(format!("中转任务 {} 不存在", task_id)))?;
Ok(RelayTaskInfo {
id, account_id, provider_id, model_id, status, priority,
attempt_count, max_attempts, input_tokens, output_tokens,
error_message, queued_at, started_at, completed_at, created_at,
})
}
pub async fn list_relay_tasks(
db: &SqlitePool, account_id: &str, query: &RelayTaskQuery,
) -> SaasResult<Vec<RelayTaskInfo>> {
let page = query.page.unwrap_or(1).max(1);
let page_size = query.page_size.unwrap_or(20).min(100);
let offset = (page - 1) * page_size;
let sql = if query.status.is_some() {
"SELECT id, account_id, provider_id, model_id, status, priority, attempt_count, max_attempts, input_tokens, output_tokens, error_message, queued_at, started_at, completed_at, created_at
FROM relay_tasks WHERE account_id = ?1 AND status = ?2 ORDER BY created_at DESC LIMIT ?3 OFFSET ?4"
} else {
"SELECT id, account_id, provider_id, model_id, status, priority, attempt_count, max_attempts, input_tokens, output_tokens, error_message, queued_at, started_at, completed_at, created_at
FROM relay_tasks WHERE account_id = ?1 ORDER BY created_at DESC LIMIT ?2 OFFSET ?3"
};
let mut query_builder = sqlx::query_as::<_, (String, String, String, String, String, i64, i64, i64, i64, i64, Option<String>, String, Option<String>, Option<String>, String)>(sql)
.bind(account_id);
if let Some(ref status) = query.status {
query_builder = query_builder.bind(status);
}
query_builder = query_builder.bind(page_size).bind(offset);
let rows = query_builder.fetch_all(db).await?;
Ok(rows.into_iter().map(|(id, account_id, provider_id, model_id, status, priority, attempt_count, max_attempts, input_tokens, output_tokens, error_message, queued_at, started_at, completed_at, created_at)| {
RelayTaskInfo { id, account_id, provider_id, model_id, status, priority, attempt_count, max_attempts, input_tokens, output_tokens, error_message, queued_at, started_at, completed_at, created_at }
}).collect())
}
pub async fn update_task_status(
db: &SqlitePool, task_id: &str, status: &str,
input_tokens: Option<i64>, output_tokens: Option<i64>,
error_message: Option<&str>,
) -> SaasResult<()> {
let now = chrono::Utc::now().to_rfc3339();
let update_sql = match status {
"processing" => "started_at = ?1, status = 'processing', attempt_count = attempt_count + 1",
"completed" => "completed_at = ?1, status = 'completed', input_tokens = COALESCE(?2, input_tokens), output_tokens = COALESCE(?3, output_tokens)",
"failed" => "completed_at = ?1, status = 'failed', error_message = ?2",
_ => return Err(SaasError::InvalidInput(format!("无效任务状态: {}", status))),
};
let sql = format!("UPDATE relay_tasks SET {} WHERE id = ?4", update_sql);
let mut query = sqlx::query(&sql).bind(&now);
if status == "completed" {
query = query.bind(input_tokens).bind(output_tokens);
}
if status == "failed" {
query = query.bind(error_message);
}
query = query.bind(task_id);
query.execute(db).await?;
Ok(())
}
// ============ Relay Execution ============
pub async fn execute_relay(
db: &SqlitePool,
task_id: &str,
provider_base_url: &str,
provider_api_key: Option<&str>,
request_body: &str,
stream: bool,
) -> SaasResult<RelayResponse> {
update_task_status(db, task_id, "processing", None, None, None).await?;
let url = format!("{}/chat/completions", provider_base_url.trim_end_matches('/'));
let _start = std::time::Instant::now();
let client = reqwest::Client::new();
let mut req_builder = client.post(&url)
.header("Content-Type", "application/json")
.body(request_body.to_string());
if let Some(key) = provider_api_key {
req_builder = req_builder.header("Authorization", format!("Bearer {}", key));
}
let result = req_builder.send().await;
match result {
Ok(resp) if resp.status().is_success() => {
if stream {
let body = resp.text().await.unwrap_or_default();
update_task_status(db, task_id, "completed", None, None, None).await?;
Ok(RelayResponse::Sse(body))
} else {
let body = resp.text().await.unwrap_or_default();
let (input_tokens, output_tokens) = extract_token_usage(&body);
update_task_status(db, task_id, "completed",
Some(input_tokens), Some(output_tokens), None).await?;
Ok(RelayResponse::Json(body))
}
}
Ok(resp) => {
let status = resp.status().as_u16();
let body = resp.text().await.unwrap_or_default();
let err_msg = format!("上游返回 HTTP {}: {}", status, &body[..body.len().min(500)]);
update_task_status(db, task_id, "failed", None, None, Some(&err_msg)).await?;
Err(SaasError::Relay(err_msg))
}
Err(e) => {
let err_msg = format!("请求上游失败: {}", e);
update_task_status(db, task_id, "failed", None, None, Some(&err_msg)).await?;
Err(SaasError::Relay(err_msg))
}
}
}
/// 中转响应类型
#[derive(Debug)]
pub enum RelayResponse {
Json(String),
Sse(String),
}
// ============ Helpers ============
fn hash_request(body: &str) -> String {
use sha2::{Sha256, Digest};
hex::encode(Sha256::digest(body.as_bytes()))
}
fn extract_token_usage(body: &str) -> (i64, i64) {
let parsed: serde_json::Value = match serde_json::from_str(body) {
Ok(v) => v,
Err(_) => return (0, 0),
};
let usage = parsed.get("usage");
let input = usage
.and_then(|u| u.get("prompt_tokens"))
.and_then(|v| v.as_i64())
.unwrap_or(0);
let output = usage
.and_then(|u| u.get("completion_tokens"))
.and_then(|v| v.as_i64())
.unwrap_or(0);
(input, output)
}

View File

@@ -0,0 +1,59 @@
//! 中转服务类型定义
use serde::{Deserialize, Serialize};
/// 中转请求 (OpenAI 兼容格式)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RelayChatRequest {
pub model: String,
pub messages: Vec<ChatMessage>,
#[serde(default)]
pub temperature: Option<f64>,
#[serde(default)]
pub max_tokens: Option<u32>,
#[serde(default)]
pub stream: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChatMessage {
pub role: String,
pub content: serde_json::Value,
}
/// 中转任务信息
#[derive(Debug, Clone, Serialize)]
pub struct RelayTaskInfo {
pub id: String,
pub account_id: String,
pub provider_id: String,
pub model_id: String,
pub status: String,
pub priority: i64,
pub attempt_count: i64,
pub max_attempts: i64,
pub input_tokens: i64,
pub output_tokens: i64,
pub error_message: Option<String>,
pub queued_at: String,
pub started_at: Option<String>,
pub completed_at: Option<String>,
pub created_at: String,
}
/// 中转任务查询
#[derive(Debug, Deserialize)]
pub struct RelayTaskQuery {
pub status: Option<String>,
pub page: Option<i64>,
pub page_size: Option<i64>,
}
/// Provider 速率限制状态
#[derive(Debug, Clone)]
pub struct RateLimitState {
pub rpm: i64,
pub tpm: i64,
pub concurrent: usize,
pub max_concurrent: usize,
}

View File

@@ -0,0 +1,28 @@
//! 应用状态
use sqlx::SqlitePool;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::config::SaaSConfig;
/// 全局应用状态,通过 Axum State 共享
#[derive(Clone)]
pub struct AppState {
/// 数据库连接池
pub db: SqlitePool,
/// 服务器配置 (可热更新)
pub config: Arc<RwLock<SaaSConfig>>,
/// JWT 密钥
pub jwt_secret: secrecy::SecretString,
}
impl AppState {
pub fn new(db: SqlitePool, config: SaaSConfig) -> Self {
let jwt_secret = config.jwt_secret();
Self {
db,
config: Arc::new(RwLock::new(config)),
jwt_secret,
}
}
}

View File

@@ -0,0 +1,441 @@
//! 集成测试 (Phase 1 + Phase 2)
use axum::{
body::Body,
http::{Request, StatusCode},
};
use serde_json::json;
use tower::ServiceExt;
const MAX_BODY_SIZE: usize = 1024 * 1024; // 1MB
async fn build_test_app() -> axum::Router {
use zclaw_saas::{config::SaaSConfig, db::init_memory_db, state::AppState};
let db = init_memory_db().await.unwrap();
let mut config = SaaSConfig::default();
config.auth.jwt_expiration_hours = 24;
let state = AppState::new(db, config);
let public_routes = zclaw_saas::auth::routes();
let protected_routes = zclaw_saas::auth::protected_routes()
.merge(zclaw_saas::account::routes())
.merge(zclaw_saas::model_config::routes())
.merge(zclaw_saas::relay::routes())
.merge(zclaw_saas::migration::routes())
.layer(axum::middleware::from_fn_with_state(
state.clone(),
zclaw_saas::auth::auth_middleware,
));
axum::Router::new()
.merge(public_routes)
.merge(protected_routes)
.with_state(state)
}
/// 注册并登录,返回 JWT token
async fn register_and_login(app: &axum::Router, username: &str, email: &str) -> String {
let register_req = Request::builder()
.method("POST")
.uri("/api/v1/auth/register")
.header("Content-Type", "application/json")
.body(Body::from(serde_json::to_string(&json!({
"username": username,
"email": email,
"password": "password123"
})).unwrap()))
.unwrap();
app.clone().oneshot(register_req).await.unwrap();
let login_req = Request::builder()
.method("POST")
.uri("/api/v1/auth/login")
.header("Content-Type", "application/json")
.body(Body::from(serde_json::to_string(&json!({
"username": username,
"password": "password123"
})).unwrap()))
.unwrap();
let resp = app.clone().oneshot(login_req).await.unwrap();
let body_bytes = axum::body::to_bytes(resp.into_body(), MAX_BODY_SIZE).await.unwrap();
let body: serde_json::Value = serde_json::from_slice(&body_bytes).unwrap();
body["token"].as_str().unwrap().to_string()
}
fn auth_header(token: &str) -> String {
format!("Bearer {}", token)
}
#[tokio::test]
async fn test_register_and_login() {
let app = build_test_app().await;
let token = register_and_login(&app, "testuser", "test@example.com").await;
assert!(!token.is_empty());
}
#[tokio::test]
async fn test_register_duplicate_fails() {
let app = build_test_app().await;
let body = json!({
"username": "dupuser",
"email": "dup@example.com",
"password": "password123"
});
let req = Request::builder()
.method("POST")
.uri("/api/v1/auth/register")
.header("Content-Type", "application/json")
.body(Body::from(serde_json::to_string(&body).unwrap()))
.unwrap();
let resp = app.clone().oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::CREATED);
let req = Request::builder()
.method("POST")
.uri("/api/v1/auth/register")
.header("Content-Type", "application/json")
.body(Body::from(serde_json::to_string(&body).unwrap()))
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::CONFLICT);
}
#[tokio::test]
async fn test_unauthorized_access() {
let app = build_test_app().await;
let req = Request::builder()
.method("GET")
.uri("/api/v1/accounts")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
}
#[tokio::test]
async fn test_login_wrong_password() {
let app = build_test_app().await;
register_and_login(&app, "wrongpwd", "wrongpwd@example.com").await;
let req = Request::builder()
.method("POST")
.uri("/api/v1/auth/login")
.header("Content-Type", "application/json")
.body(Body::from(serde_json::to_string(&json!({
"username": "wrongpwd",
"password": "wrong_password"
})).unwrap()))
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
}
#[tokio::test]
async fn test_full_authenticated_flow() {
let app = build_test_app().await;
let token = register_and_login(&app, "fulltest", "full@example.com").await;
// 创建 API Token
let create_token_req = Request::builder()
.method("POST")
.uri("/api/v1/tokens")
.header("Content-Type", "application/json")
.header("Authorization", auth_header(&token))
.body(Body::from(serde_json::to_string(&json!({
"name": "test-token",
"permissions": ["model:read", "relay:use"]
})).unwrap()))
.unwrap();
let resp = app.clone().oneshot(create_token_req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body_bytes = axum::body::to_bytes(resp.into_body(), MAX_BODY_SIZE).await.unwrap();
let body: serde_json::Value = serde_json::from_slice(&body_bytes).unwrap();
assert!(!body["token"].is_null());
// 列出 Tokens
let list_req = Request::builder()
.method("GET")
.uri("/api/v1/tokens")
.header("Authorization", auth_header(&token))
.body(Body::empty())
.unwrap();
let resp = app.clone().oneshot(list_req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
// 查看操作日志
let logs_req = Request::builder()
.method("GET")
.uri("/api/v1/logs/operations")
.header("Authorization", auth_header(&token))
.body(Body::empty())
.unwrap();
let resp = app.oneshot(logs_req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}
// ============ Phase 2: 模型配置测试 ============
#[tokio::test]
async fn test_providers_crud() {
let app = build_test_app().await;
// 注册 super_admin 角色用户 (通过直接插入角色权限)
let token = register_and_login(&app, "adminprov", "adminprov@example.com").await;
// 创建 provider (普通用户无权限 → 403)
let create_req = Request::builder()
.method("POST")
.uri("/api/v1/providers")
.header("Content-Type", "application/json")
.header("Authorization", auth_header(&token))
.body(Body::from(serde_json::to_string(&json!({
"name": "test-provider",
"display_name": "Test Provider",
"base_url": "https://api.example.com/v1"
})).unwrap()))
.unwrap();
let resp = app.clone().oneshot(create_req).await.unwrap();
// user 角色默认无 provider:manage 权限 → 403
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
// 列出 providers (只读权限 → 200)
let list_req = Request::builder()
.method("GET")
.uri("/api/v1/providers")
.header("Authorization", auth_header(&token))
.body(Body::empty())
.unwrap();
let resp = app.clone().oneshot(list_req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_models_list_and_usage() {
let app = build_test_app().await;
let token = register_and_login(&app, "modeluser", "modeluser@example.com").await;
// 列出模型 (空列表)
let list_req = Request::builder()
.method("GET")
.uri("/api/v1/models")
.header("Authorization", auth_header(&token))
.body(Body::empty())
.unwrap();
let resp = app.clone().oneshot(list_req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body_bytes = axum::body::to_bytes(resp.into_body(), MAX_BODY_SIZE).await.unwrap();
let body: serde_json::Value = serde_json::from_slice(&body_bytes).unwrap();
assert!(body.is_array());
assert_eq!(body.as_array().unwrap().len(), 0);
// 查看用量统计
let usage_req = Request::builder()
.method("GET")
.uri("/api/v1/usage")
.header("Authorization", auth_header(&token))
.body(Body::empty())
.unwrap();
let resp = app.clone().oneshot(usage_req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body_bytes = axum::body::to_bytes(resp.into_body(), MAX_BODY_SIZE).await.unwrap();
let body: serde_json::Value = serde_json::from_slice(&body_bytes).unwrap();
assert_eq!(body["total_requests"], 0);
}
#[tokio::test]
async fn test_api_keys_lifecycle() {
let app = build_test_app().await;
let token = register_and_login(&app, "keyuser", "keyuser@example.com").await;
// 列出 keys (空)
let list_req = Request::builder()
.method("GET")
.uri("/api/v1/keys")
.header("Authorization", auth_header(&token))
.body(Body::empty())
.unwrap();
let resp = app.clone().oneshot(list_req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
// 创建 key (需要已有 provider → 404 或由 service 层验证)
let create_req = Request::builder()
.method("POST")
.uri("/api/v1/keys")
.header("Content-Type", "application/json")
.header("Authorization", auth_header(&token))
.body(Body::from(serde_json::to_string(&json!({
"provider_id": "nonexistent",
"key_value": "sk-test-12345",
"key_label": "Test Key"
})).unwrap()))
.unwrap();
let resp = app.clone().oneshot(create_req).await.unwrap();
// provider 不存在 → 404
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
// ============ Phase 3: 中转服务测试 ============
#[tokio::test]
async fn test_relay_models_list() {
let app = build_test_app().await;
let token = register_and_login(&app, "relayuser", "relayuser@example.com").await;
// 列出可用中转模型 (空列表,因为没有 provider/model 种子数据)
let req = Request::builder()
.method("GET")
.uri("/api/v1/relay/models")
.header("Authorization", auth_header(&token))
.body(Body::empty())
.unwrap();
let resp = app.clone().oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body_bytes = axum::body::to_bytes(resp.into_body(), MAX_BODY_SIZE).await.unwrap();
let body: serde_json::Value = serde_json::from_slice(&body_bytes).unwrap();
assert!(body.is_array());
}
#[tokio::test]
async fn test_relay_chat_no_model() {
let app = build_test_app().await;
let token = register_and_login(&app, "relayfail", "relayfail@example.com").await;
// 尝试中转到不存在的模型
let req = Request::builder()
.method("POST")
.uri("/api/v1/relay/chat/completions")
.header("Content-Type", "application/json")
.header("Authorization", auth_header(&token))
.body(Body::from(serde_json::to_string(&json!({
"model": "nonexistent-model",
"messages": [{"role": "user", "content": "hello"}]
})).unwrap()))
.unwrap();
let resp = app.clone().oneshot(req).await.unwrap();
// 模型不存在 → 404
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn test_relay_tasks_list() {
let app = build_test_app().await;
let token = register_and_login(&app, "relaytasks", "relaytasks@example.com").await;
let req = Request::builder()
.method("GET")
.uri("/api/v1/relay/tasks")
.header("Authorization", auth_header(&token))
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}
// ============ Phase 4: 配置迁移测试 ============
#[tokio::test]
async fn test_config_analysis_empty() {
let app = build_test_app().await;
let token = register_and_login(&app, "cfguser", "cfguser@example.com").await;
// 初始分析 (无种子数据 → 空列表)
let req = Request::builder()
.method("GET")
.uri("/api/v1/config/analysis")
.header("Authorization", auth_header(&token))
.body(Body::empty())
.unwrap();
let resp = app.clone().oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body_bytes = axum::body::to_bytes(resp.into_body(), MAX_BODY_SIZE).await.unwrap();
let body: serde_json::Value = serde_json::from_slice(&body_bytes).unwrap();
assert_eq!(body["total_items"], 0);
}
#[tokio::test]
async fn test_config_seed_and_list() {
let app = build_test_app().await;
let token = register_and_login(&app, "cfgseed", "cfgseed@example.com").await;
// 种子配置 (普通用户无权限 → 403)
let seed_req = Request::builder()
.method("POST")
.uri("/api/v1/config/seed")
.header("Authorization", auth_header(&token))
.body(Body::empty())
.unwrap();
let resp = app.clone().oneshot(seed_req).await.unwrap();
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
// 列出配置项 (空列表)
let list_req = Request::builder()
.method("GET")
.uri("/api/v1/config/items")
.header("Authorization", auth_header(&token))
.body(Body::empty())
.unwrap();
let resp = app.clone().oneshot(list_req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body_bytes = axum::body::to_bytes(resp.into_body(), MAX_BODY_SIZE).await.unwrap();
let body: serde_json::Value = serde_json::from_slice(&body_bytes).unwrap();
assert!(body.is_array());
assert_eq!(body.as_array().unwrap().len(), 0);
}
#[tokio::test]
async fn test_config_sync() {
let app = build_test_app().await;
let token = register_and_login(&app, "cfgsync", "cfgsync@example.com").await;
let sync_req = Request::builder()
.method("POST")
.uri("/api/v1/config/sync")
.header("Content-Type", "application/json")
.header("Authorization", auth_header(&token))
.body(Body::from(serde_json::to_string(&json!({
"client_fingerprint": "test-desktop-v1",
"config_keys": ["server.host", "agent.defaults.default_model"],
"client_values": {
"server.host": "0.0.0.0",
"agent.defaults.default_model": "deepseek/deepseek-chat"
}
})).unwrap()))
.unwrap();
let resp = app.clone().oneshot(sync_req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
// 查看同步日志
let logs_req = Request::builder()
.method("GET")
.uri("/api/v1/config/sync-logs")
.header("Authorization", auth_header(&token))
.body(Body::empty())
.unwrap();
let resp = app.oneshot(logs_req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}

17
saas-config.toml Normal file
View File

@@ -0,0 +1,17 @@
[server]
host = "0.0.0.0"
port = 8080
[database]
url = "sqlite:./saas-data.db"
[auth]
jwt_expiration_hours = 24
totp_issuer = "ZCLAW SaaS"
[relay]
max_queue_size = 1000
max_concurrent_per_provider = 5
batch_window_ms = 50
retry_delay_ms = 1000
max_attempts = 3