From ba2c6a6105c313e6962ee39631329333e66640e8 Mon Sep 17 00:00:00 2001 From: iven Date: Mon, 30 Mar 2026 14:21:39 +0800 Subject: [PATCH] =?UTF-8?q?fix(saas):=20P1=20=E5=AE=A1=E8=AE=A1=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=20=E2=80=94=20=E8=BF=9E=E6=8E=A5=E6=B1=A0=E6=96=AD?= =?UTF-8?q?=E8=B7=AF=E5=99=A8=20+=20Worker=E9=87=8D=E8=AF=95=20+=20XSS?= =?UTF-8?q?=E9=98=B2=E6=8A=A4=20+=20=E7=8A=B6=E6=80=81=E6=9C=BASQL?= =?UTF-8?q?=E8=A7=A3=E6=9E=90=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P1 修复内容: - F7: health handler 连接池容量检查 (80%阈值返回503 degraded) - F9: SSE spawned task 并发限制 (Semaphore 16 permits) - F10: Key Pool 单次 JOIN 查询优化 (消除 N+1) - F12: CORS panic → 配置错误 - F14: 连接池使用率计算修正 (ratio = used*100/total) - F15: SQL 迁移解析器替换为状态机 (支持 $$, DO $body$, 存储过程) - Worker 重试机制: 失败任务通过 mpsc channel 重新入队 - DOMPurify XSS 防护 (PipelineResultPreview) - Admin V2: ErrorBoundary + SWR全局配置 + 请求优化 --- Cargo.lock | 19 ++- Cargo.toml | 3 + admin-v2/src/main.tsx | 17 +- admin-v2/src/pages/Accounts.tsx | 2 +- admin-v2/src/pages/AgentTemplates.tsx | 2 +- admin-v2/src/pages/ApiKeys.tsx | 2 +- admin-v2/src/pages/Config.tsx | 2 +- admin-v2/src/pages/Dashboard.tsx | 4 +- admin-v2/src/pages/Logs.tsx | 2 +- admin-v2/src/pages/Models.tsx | 4 +- admin-v2/src/pages/Prompts.tsx | 6 +- admin-v2/src/pages/Providers.tsx | 4 +- admin-v2/src/pages/Relay.tsx | 2 +- admin-v2/src/pages/Usage.tsx | 4 +- admin-v2/src/services/accounts.ts | 18 +-- admin-v2/src/services/agent-templates.ts | 22 +-- admin-v2/src/services/api-keys.ts | 14 +- admin-v2/src/services/auth.ts | 10 +- admin-v2/src/services/config.ts | 10 +- admin-v2/src/services/logs.ts | 6 +- admin-v2/src/services/models.ts | 18 +-- admin-v2/src/services/prompts.ts | 34 ++-- admin-v2/src/services/providers.ts | 34 ++-- admin-v2/src/services/relay.ts | 10 +- admin-v2/src/services/request.ts | 9 ++ admin-v2/src/services/stats.ts | 6 +- admin-v2/src/services/telemetry.ts | 10 +- admin-v2/src/services/usage.ts | 10 +- admin-v2/vite.config.ts | 10 ++ crates/zclaw-saas/Cargo.toml | 1 + crates/zclaw-saas/src/auth/totp.rs | 47 +++++- crates/zclaw-saas/src/db.rs | 146 +++++++++++++++++- crates/zclaw-saas/src/main.rs | 38 ++++- crates/zclaw-saas/src/relay/key_pool.rs | 146 +++++++----------- crates/zclaw-saas/src/relay/service.rs | 22 ++- crates/zclaw-saas/src/state.rs | 3 + crates/zclaw-saas/src/workers/mod.rs | 27 +++- .../src/components/PipelineResultPreview.tsx | 2 +- 38 files changed, 490 insertions(+), 236 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e7c640b..9cea298 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2506,7 +2506,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2", + "socket2 0.6.3", "tokio", "tower-service", "tracing", @@ -4189,7 +4189,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2", + "socket2 0.6.3", "thiserror 2.0.18", "tokio", "tracing", @@ -4226,7 +4226,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2", + "socket2 0.6.3", "tracing", "windows-sys 0.60.2", ] @@ -5133,6 +5133,16 @@ dependencies = [ "serde", ] +[[package]] +name = "socket2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "socket2" version = "0.6.3" @@ -6048,7 +6058,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.6.3", "tokio-macros", "windows-sys 0.61.2", ] @@ -8328,6 +8338,7 @@ dependencies = [ "serde", "serde_json", "sha2", + "socket2 0.5.10", "sqlx", "tempfile", "thiserror 2.0.18", diff --git a/Cargo.toml b/Cargo.toml index 0f153a1..6385ac8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -110,6 +110,9 @@ argon2 = "0.5" totp-rs = "5" hex = "0.4" +# TCP socket configuration +socket2 = { version = "0.5", features = ["all"] } + # Internal crates zclaw-types = { path = "crates/zclaw-types" } zclaw-memory = { path = "crates/zclaw-memory" } diff --git a/admin-v2/src/main.tsx b/admin-v2/src/main.tsx index c50dabc..20210ff 100644 --- a/admin-v2/src/main.tsx +++ b/admin-v2/src/main.tsx @@ -4,6 +4,7 @@ import { RouterProvider } from 'react-router-dom' import { ConfigProvider, App as AntApp } from 'antd' import zhCN from 'antd/locale/zh_CN' import { router } from './router' +import { ErrorBoundary } from './components/ErrorBoundary' const queryClient = new QueryClient({ defaultOptions: { @@ -16,11 +17,13 @@ const queryClient = new QueryClient({ }) createRoot(document.getElementById('root')!).render( - - - - - - - , + + + + + + + + + , ) diff --git a/admin-v2/src/pages/Accounts.tsx b/admin-v2/src/pages/Accounts.tsx index 01569d0..b6e6ece 100644 --- a/admin-v2/src/pages/Accounts.tsx +++ b/admin-v2/src/pages/Accounts.tsx @@ -42,7 +42,7 @@ export default function Accounts() { const { data, isLoading } = useQuery({ queryKey: ['accounts'], - queryFn: () => accountService.list(), + queryFn: ({ signal }) => accountService.list(signal), }) const updateMutation = useMutation({ diff --git a/admin-v2/src/pages/AgentTemplates.tsx b/admin-v2/src/pages/AgentTemplates.tsx index 492ff7d..ff5e512 100644 --- a/admin-v2/src/pages/AgentTemplates.tsx +++ b/admin-v2/src/pages/AgentTemplates.tsx @@ -26,7 +26,7 @@ export default function AgentTemplates() { const { data, isLoading } = useQuery({ queryKey: ['agent-templates'], - queryFn: () => agentTemplateService.list(), + queryFn: ({ signal }) => agentTemplateService.list(signal), }) const createMutation = useMutation({ diff --git a/admin-v2/src/pages/ApiKeys.tsx b/admin-v2/src/pages/ApiKeys.tsx index 9a533bd..58f20c9 100644 --- a/admin-v2/src/pages/ApiKeys.tsx +++ b/admin-v2/src/pages/ApiKeys.tsx @@ -21,7 +21,7 @@ export default function ApiKeys() { const { data, isLoading } = useQuery({ queryKey: ['api-keys'], - queryFn: () => apiKeyService.list(), + queryFn: ({ signal }) => apiKeyService.list(signal), }) const createMutation = useMutation({ diff --git a/admin-v2/src/pages/Config.tsx b/admin-v2/src/pages/Config.tsx index 1e9ba30..a033bb8 100644 --- a/admin-v2/src/pages/Config.tsx +++ b/admin-v2/src/pages/Config.tsx @@ -20,7 +20,7 @@ export default function Config() { const { data, isLoading } = useQuery({ queryKey: ['config', category], - queryFn: () => configService.list({ category }), + queryFn: ({ signal }) => configService.list({ category }, signal), }) const updateMutation = useMutation({ diff --git a/admin-v2/src/pages/Dashboard.tsx b/admin-v2/src/pages/Dashboard.tsx index 6ccc8ae..4a1e812 100644 --- a/admin-v2/src/pages/Dashboard.tsx +++ b/admin-v2/src/pages/Dashboard.tsx @@ -42,12 +42,12 @@ const actionColors: Record = { export default function Dashboard() { const { data: stats, isLoading: statsLoading, error: statsError } = useQuery({ queryKey: ['dashboard-stats'], - queryFn: () => statsService.dashboard(), + queryFn: ({ signal }) => statsService.dashboard(signal), }) const { data: logsData, isLoading: logsLoading } = useQuery({ queryKey: ['recent-logs'], - queryFn: () => logService.list({ page: 1, page_size: 10 }), + queryFn: ({ signal }) => logService.list({ page: 1, page_size: 10 }, signal), }) if (statsError) { diff --git a/admin-v2/src/pages/Logs.tsx b/admin-v2/src/pages/Logs.tsx index ab485cb..5111a67 100644 --- a/admin-v2/src/pages/Logs.tsx +++ b/admin-v2/src/pages/Logs.tsx @@ -42,7 +42,7 @@ export default function Logs() { const { data, isLoading } = useQuery({ queryKey: ['logs', page, actionFilter], - queryFn: () => logService.list({ page, page_size: 20, action: actionFilter }), + queryFn: ({ signal }) => logService.list({ page, page_size: 20, action: actionFilter }, signal), }) const columns: ProColumns[] = [ diff --git a/admin-v2/src/pages/Models.tsx b/admin-v2/src/pages/Models.tsx index f8be5c8..e8ef5a5 100644 --- a/admin-v2/src/pages/Models.tsx +++ b/admin-v2/src/pages/Models.tsx @@ -20,12 +20,12 @@ export default function Models() { const { data, isLoading } = useQuery({ queryKey: ['models'], - queryFn: () => modelService.list(), + queryFn: ({ signal }) => modelService.list(signal), }) const { data: providersData } = useQuery({ queryKey: ['providers-for-select'], - queryFn: () => providerService.list(), + queryFn: ({ signal }) => providerService.list(signal), }) const createMutation = useMutation({ diff --git a/admin-v2/src/pages/Prompts.tsx b/admin-v2/src/pages/Prompts.tsx index c6bd4cc..7f55ea5 100644 --- a/admin-v2/src/pages/Prompts.tsx +++ b/admin-v2/src/pages/Prompts.tsx @@ -26,18 +26,18 @@ export default function Prompts() { const { data, isLoading } = useQuery({ queryKey: ['prompts'], - queryFn: () => promptService.list(), + queryFn: ({ signal }) => promptService.list(signal), }) const { data: detailData } = useQuery({ queryKey: ['prompt-detail', detailName], - queryFn: () => promptService.get(detailName!), + queryFn: ({ signal }) => promptService.get(detailName!, signal), enabled: !!detailName, }) const { data: versionsData } = useQuery({ queryKey: ['prompt-versions', detailName], - queryFn: () => promptService.listVersions(detailName!), + queryFn: ({ signal }) => promptService.listVersions(detailName!, signal), enabled: !!detailName, }) diff --git a/admin-v2/src/pages/Providers.tsx b/admin-v2/src/pages/Providers.tsx index 5612528..5764391 100644 --- a/admin-v2/src/pages/Providers.tsx +++ b/admin-v2/src/pages/Providers.tsx @@ -22,12 +22,12 @@ export default function Providers() { const { data, isLoading } = useQuery({ queryKey: ['providers'], - queryFn: () => providerService.list(), + queryFn: ({ signal }) => providerService.list(signal), }) const { data: keysData, isLoading: keysLoading } = useQuery({ queryKey: ['provider-keys', keyModalProviderId], - queryFn: () => providerService.listKeys(keyModalProviderId!), + queryFn: ({ signal }) => providerService.listKeys(keyModalProviderId!, signal), enabled: !!keyModalProviderId, }) diff --git a/admin-v2/src/pages/Relay.tsx b/admin-v2/src/pages/Relay.tsx index aa984d0..0bf066b 100644 --- a/admin-v2/src/pages/Relay.tsx +++ b/admin-v2/src/pages/Relay.tsx @@ -34,7 +34,7 @@ export default function Relay() { const { data, isLoading } = useQuery({ queryKey: ['relay-tasks', page, statusFilter], - queryFn: () => relayService.list({ page, page_size: 20, status: statusFilter }), + queryFn: ({ signal }) => relayService.list({ page, page_size: 20, status: statusFilter }, signal), }) const columns: ProColumns[] = [ diff --git a/admin-v2/src/pages/Usage.tsx b/admin-v2/src/pages/Usage.tsx index 35acfa2..a40dc69 100644 --- a/admin-v2/src/pages/Usage.tsx +++ b/admin-v2/src/pages/Usage.tsx @@ -19,12 +19,12 @@ export default function Usage() { const { data: dailyData, isLoading: dailyLoading, error: dailyError } = useQuery({ queryKey: ['usage-daily', days], - queryFn: () => telemetryService.dailyStats({ days }), + queryFn: ({ signal }) => telemetryService.dailyStats({ days }, signal), }) const { data: modelData, isLoading: modelLoading } = useQuery({ queryKey: ['usage-model', days], - queryFn: () => telemetryService.modelStats({}), + queryFn: ({ signal }) => telemetryService.modelStats({}, signal), }) if (dailyError) { diff --git a/admin-v2/src/services/accounts.ts b/admin-v2/src/services/accounts.ts index 56ff1f2..e089ba8 100644 --- a/admin-v2/src/services/accounts.ts +++ b/admin-v2/src/services/accounts.ts @@ -1,16 +1,16 @@ -import request from './request' +import request, { withSignal } from './request' import type { AccountPublic, PaginatedResponse } from '@/types' export const accountService = { - list: (params?: Record) => - request.get>('/accounts', { params }).then((r) => r.data), + list: (params?: Record, signal?: AbortSignal) => + request.get>('/accounts', withSignal({ params }, signal)).then((r) => r.data), - get: (id: string) => - request.get(`/accounts/${id}`).then((r) => r.data), + get: (id: string, signal?: AbortSignal) => + request.get(`/accounts/${id}`, withSignal({}, signal)).then((r) => r.data), - update: (id: string, data: Partial>) => - request.patch(`/accounts/${id}`, data).then((r) => r.data), + update: (id: string, data: Partial>, signal?: AbortSignal) => + request.patch(`/accounts/${id}`, data, withSignal({}, signal)).then((r) => r.data), - updateStatus: (id: string, data: { status: AccountPublic['status'] }) => - request.patch(`/accounts/${id}/status`, data).then((r) => r.data), + updateStatus: (id: string, data: { status: AccountPublic['status'] }, signal?: AbortSignal) => + request.patch(`/accounts/${id}/status`, data, withSignal({}, signal)).then((r) => r.data), } diff --git a/admin-v2/src/services/agent-templates.ts b/admin-v2/src/services/agent-templates.ts index 59439d6..5a318a0 100644 --- a/admin-v2/src/services/agent-templates.ts +++ b/admin-v2/src/services/agent-templates.ts @@ -1,28 +1,28 @@ -import request from './request' +import request, { withSignal } from './request' import type { AgentTemplate, PaginatedResponse } from '@/types' export const agentTemplateService = { - list: (params?: Record) => - request.get>('/agent-templates', { params }).then((r) => r.data), + list: (params?: Record, signal?: AbortSignal) => + request.get>('/agent-templates', withSignal({ params }, signal)).then((r) => r.data), - get: (id: string) => - request.get(`/agent-templates/${id}`).then((r) => r.data), + get: (id: string, signal?: AbortSignal) => + request.get(`/agent-templates/${id}`, withSignal({}, signal)).then((r) => r.data), create: (data: { name: string; description?: string; category?: string; source?: string model?: string; system_prompt?: string; tools?: string[] capabilities?: string[]; temperature?: number; max_tokens?: number visibility?: string - }) => - request.post('/agent-templates', data).then((r) => r.data), + }, signal?: AbortSignal) => + request.post('/agent-templates', data, withSignal({}, signal)).then((r) => r.data), update: (id: string, data: { description?: string; model?: string; system_prompt?: string tools?: string[]; capabilities?: string[]; temperature?: number max_tokens?: number; visibility?: string; status?: string - }) => - request.post(`/agent-templates/${id}`, data).then((r) => r.data), + }, signal?: AbortSignal) => + request.post(`/agent-templates/${id}`, data, withSignal({}, signal)).then((r) => r.data), - archive: (id: string) => - request.delete(`/agent-templates/${id}`).then((r) => r.data), + archive: (id: string, signal?: AbortSignal) => + request.delete(`/agent-templates/${id}`, withSignal({}, signal)).then((r) => r.data), } diff --git a/admin-v2/src/services/api-keys.ts b/admin-v2/src/services/api-keys.ts index 02d98db..5268c44 100644 --- a/admin-v2/src/services/api-keys.ts +++ b/admin-v2/src/services/api-keys.ts @@ -1,13 +1,13 @@ -import request from './request' +import request, { withSignal } from './request' import type { TokenInfo, CreateTokenRequest, PaginatedResponse } from '@/types' export const apiKeyService = { - list: (params?: Record) => - request.get>('/keys', { params }).then((r) => r.data), + list: (params?: Record, signal?: AbortSignal) => + request.get>('/keys', withSignal({ params }, signal)).then((r) => r.data), - create: (data: CreateTokenRequest) => - request.post('/keys', data).then((r) => r.data), + create: (data: CreateTokenRequest, signal?: AbortSignal) => + request.post('/keys', data, withSignal({}, signal)).then((r) => r.data), - revoke: (id: string) => - request.delete(`/keys/${id}`).then((r) => r.data), + revoke: (id: string, signal?: AbortSignal) => + request.delete(`/keys/${id}`, withSignal({}, signal)).then((r) => r.data), } diff --git a/admin-v2/src/services/auth.ts b/admin-v2/src/services/auth.ts index e5fd875..0e92444 100644 --- a/admin-v2/src/services/auth.ts +++ b/admin-v2/src/services/auth.ts @@ -1,10 +1,10 @@ -import request from './request' +import request, { withSignal } from './request' import type { AccountPublic, LoginRequest, LoginResponse } from '@/types' export const authService = { - login: (data: LoginRequest) => - request.post('/auth/login', data).then((r) => r.data), + login: (data: LoginRequest, signal?: AbortSignal) => + request.post('/auth/login', data, withSignal({}, signal)).then((r) => r.data), - me: () => - request.get('/auth/me').then((r) => r.data), + me: (signal?: AbortSignal) => + request.get('/auth/me', withSignal({}, signal)).then((r) => r.data), } diff --git a/admin-v2/src/services/config.ts b/admin-v2/src/services/config.ts index 0783195..ed2f8af 100644 --- a/admin-v2/src/services/config.ts +++ b/admin-v2/src/services/config.ts @@ -1,11 +1,11 @@ -import request from './request' +import request, { withSignal } from './request' import type { ConfigItem, PaginatedResponse } from '@/types' export const configService = { - list: (params?: Record) => - request.get>('/config/items', { params }) + list: (params?: Record, signal?: AbortSignal) => + request.get>('/config/items', withSignal({ params }, signal)) .then((r) => r.data.items), - update: (id: string, data: { value: string | number | boolean }) => - request.patch(`/config/items/${id}`, data).then((r) => r.data), + update: (id: string, data: { value: string | number | boolean }, signal?: AbortSignal) => + request.patch(`/config/items/${id}`, data, withSignal({}, signal)).then((r) => r.data), } diff --git a/admin-v2/src/services/logs.ts b/admin-v2/src/services/logs.ts index 3d5535f..3f2019f 100644 --- a/admin-v2/src/services/logs.ts +++ b/admin-v2/src/services/logs.ts @@ -1,7 +1,7 @@ -import request from './request' +import request, { withSignal } from './request' import type { OperationLog, PaginatedResponse } from '@/types' export const logService = { - list: (params?: Record) => - request.get>('/logs/operations', { params }).then((r) => r.data), + list: (params?: Record, signal?: AbortSignal) => + request.get>('/logs/operations', withSignal({ params }, signal)).then((r) => r.data), } diff --git a/admin-v2/src/services/models.ts b/admin-v2/src/services/models.ts index d79ca58..f77ac6a 100644 --- a/admin-v2/src/services/models.ts +++ b/admin-v2/src/services/models.ts @@ -1,16 +1,16 @@ -import request from './request' +import request, { withSignal } from './request' import type { Model, PaginatedResponse } from '@/types' export const modelService = { - list: (params?: Record) => - request.get>('/models', { params }).then((r) => r.data), + list: (params?: Record, signal?: AbortSignal) => + request.get>('/models', withSignal({ params }, signal)).then((r) => r.data), - create: (data: Partial>) => - request.post('/models', data).then((r) => r.data), + create: (data: Partial>, signal?: AbortSignal) => + request.post('/models', data, withSignal({}, signal)).then((r) => r.data), - update: (id: string, data: Partial>) => - request.patch(`/models/${id}`, data).then((r) => r.data), + update: (id: string, data: Partial>, signal?: AbortSignal) => + request.patch(`/models/${id}`, data, withSignal({}, signal)).then((r) => r.data), - delete: (id: string) => - request.delete(`/models/${id}`).then((r) => r.data), + delete: (id: string, signal?: AbortSignal) => + request.delete(`/models/${id}`, withSignal({}, signal)).then((r) => r.data), } diff --git a/admin-v2/src/services/prompts.ts b/admin-v2/src/services/prompts.ts index 9078b33..9dd13e9 100644 --- a/admin-v2/src/services/prompts.ts +++ b/admin-v2/src/services/prompts.ts @@ -1,35 +1,35 @@ -import request from './request' +import request, { withSignal } from './request' import type { PromptTemplate, PromptVersion, PaginatedResponse } from '@/types' export const promptService = { - list: (params?: Record) => - request.get>('/prompts', { params }).then((r) => r.data), + list: (params?: Record, signal?: AbortSignal) => + request.get>('/prompts', withSignal({ params }, signal)).then((r) => r.data), - get: (name: string) => - request.get(`/prompts/${encodeURIComponent(name)}`).then((r) => r.data), + get: (name: string, signal?: AbortSignal) => + request.get(`/prompts/${encodeURIComponent(name)}`, withSignal({}, signal)).then((r) => r.data), create: (data: { name: string; category: string; description?: string; source?: string system_prompt: string; user_prompt_template?: string variables?: unknown[]; min_app_version?: string - }) => - request.post('/prompts', data).then((r) => r.data), + }, signal?: AbortSignal) => + request.post('/prompts', data, withSignal({}, signal)).then((r) => r.data), - update: (name: string, data: { description?: string; status?: string }) => - request.put(`/prompts/${encodeURIComponent(name)}`, data).then((r) => r.data), + update: (name: string, data: { description?: string; status?: string }, signal?: AbortSignal) => + request.put(`/prompts/${encodeURIComponent(name)}`, data, withSignal({}, signal)).then((r) => r.data), - archive: (name: string) => - request.delete(`/prompts/${encodeURIComponent(name)}`).then((r) => r.data), + archive: (name: string, signal?: AbortSignal) => + request.delete(`/prompts/${encodeURIComponent(name)}`, withSignal({}, signal)).then((r) => r.data), - listVersions: (name: string) => - request.get(`/prompts/${encodeURIComponent(name)}/versions`).then((r) => r.data), + listVersions: (name: string, signal?: AbortSignal) => + request.get(`/prompts/${encodeURIComponent(name)}/versions`, withSignal({}, signal)).then((r) => r.data), createVersion: (name: string, data: { system_prompt: string; user_prompt_template?: string variables?: unknown[]; changelog?: string; min_app_version?: string - }) => - request.post(`/prompts/${encodeURIComponent(name)}/versions`, data).then((r) => r.data), + }, signal?: AbortSignal) => + request.post(`/prompts/${encodeURIComponent(name)}/versions`, data, withSignal({}, signal)).then((r) => r.data), - rollback: (name: string, version: number) => - request.post(`/prompts/${encodeURIComponent(name)}/rollback/${version}`).then((r) => r.data), + rollback: (name: string, version: number, signal?: AbortSignal) => + request.post(`/prompts/${encodeURIComponent(name)}/rollback/${version}`, undefined, withSignal({}, signal)).then((r) => r.data), } diff --git a/admin-v2/src/services/providers.ts b/admin-v2/src/services/providers.ts index 73ac259..45470c5 100644 --- a/admin-v2/src/services/providers.ts +++ b/admin-v2/src/services/providers.ts @@ -1,31 +1,31 @@ -import request from './request' +import request, { withSignal } from './request' import type { Provider, ProviderKey, PaginatedResponse } from '@/types' export const providerService = { - list: (params?: Record) => - request.get>('/providers', { params }).then((r) => r.data), + list: (params?: Record, signal?: AbortSignal) => + request.get>('/providers', withSignal({ params }, signal)).then((r) => r.data), - create: (data: Partial>) => - request.post('/providers', data).then((r) => r.data), + create: (data: Partial>, signal?: AbortSignal) => + request.post('/providers', data, withSignal({}, signal)).then((r) => r.data), - update: (id: string, data: Partial>) => - request.patch(`/providers/${id}`, data).then((r) => r.data), + update: (id: string, data: Partial>, signal?: AbortSignal) => + request.patch(`/providers/${id}`, data, withSignal({}, signal)).then((r) => r.data), - delete: (id: string) => - request.delete(`/providers/${id}`).then((r) => r.data), + delete: (id: string, signal?: AbortSignal) => + request.delete(`/providers/${id}`, withSignal({}, signal)).then((r) => r.data), - listKeys: (providerId: string) => - request.get(`/providers/${providerId}/keys`).then((r) => r.data), + listKeys: (providerId: string, signal?: AbortSignal) => + request.get(`/providers/${providerId}/keys`, withSignal({}, signal)).then((r) => r.data), addKey: (providerId: string, data: { key_label: string; key_value: string; priority?: number max_rpm?: number; max_tpm?: number; quota_reset_interval?: string - }) => - request.post<{ ok: boolean; key_id: string }>(`/providers/${providerId}/keys`, data).then((r) => r.data), + }, signal?: AbortSignal) => + request.post<{ ok: boolean; key_id: string }>(`/providers/${providerId}/keys`, data, withSignal({}, signal)).then((r) => r.data), - toggleKey: (providerId: string, keyId: string, active: boolean) => - request.put<{ ok: boolean }>(`/providers/${providerId}/keys/${keyId}/toggle`, { active }).then((r) => r.data), + toggleKey: (providerId: string, keyId: string, active: boolean, signal?: AbortSignal) => + request.put<{ ok: boolean }>(`/providers/${providerId}/keys/${keyId}/toggle`, { active }, withSignal({}, signal)).then((r) => r.data), - deleteKey: (providerId: string, keyId: string) => - request.delete<{ ok: boolean }>(`/providers/${providerId}/keys/${keyId}`).then((r) => r.data), + deleteKey: (providerId: string, keyId: string, signal?: AbortSignal) => + request.delete<{ ok: boolean }>(`/providers/${providerId}/keys/${keyId}`, withSignal({}, signal)).then((r) => r.data), } diff --git a/admin-v2/src/services/relay.ts b/admin-v2/src/services/relay.ts index 728d0a8..4fbe04b 100644 --- a/admin-v2/src/services/relay.ts +++ b/admin-v2/src/services/relay.ts @@ -1,10 +1,10 @@ -import request from './request' +import request, { withSignal } from './request' import type { RelayTask, PaginatedResponse } from '@/types' export const relayService = { - list: (params?: Record) => - request.get>('/relay/tasks', { params }).then((r) => r.data), + list: (params?: Record, signal?: AbortSignal) => + request.get>('/relay/tasks', withSignal({ params }, signal)).then((r) => r.data), - get: (id: string) => - request.get(`/relay/tasks/${id}`).then((r) => r.data), + get: (id: string, signal?: AbortSignal) => + request.get(`/relay/tasks/${id}`, withSignal({}, signal)).then((r) => r.data), } diff --git a/admin-v2/src/services/request.ts b/admin-v2/src/services/request.ts index 9bbad36..20bd489 100644 --- a/admin-v2/src/services/request.ts +++ b/admin-v2/src/services/request.ts @@ -4,6 +4,7 @@ import axios from 'axios' import type { AxiosError, InternalAxiosRequestConfig } from 'axios' +import type { AxiosRequestConfig } from 'axios' import type { ApiError } from '@/types' import { useAuthStore } from '@/stores/authStore' @@ -106,3 +107,11 @@ request.interceptors.response.use( ) export default request + +/** 将 AbortSignal 注入 Axios config,用于 TanStack Query 的请求取消 */ +export function withSignal(config: AxiosRequestConfig = {}, signal?: AbortSignal): AxiosRequestConfig { + if (signal) { + return { ...config, signal } + } + return config +} diff --git a/admin-v2/src/services/stats.ts b/admin-v2/src/services/stats.ts index 63bd461..dca8c92 100644 --- a/admin-v2/src/services/stats.ts +++ b/admin-v2/src/services/stats.ts @@ -1,7 +1,7 @@ -import request from './request' +import request, { withSignal } from './request' import type { DashboardStats } from '@/types' export const statsService = { - dashboard: () => - request.get('/stats/dashboard').then((r) => r.data), + dashboard: (signal?: AbortSignal) => + request.get('/stats/dashboard', withSignal({}, signal)).then((r) => r.data), } diff --git a/admin-v2/src/services/telemetry.ts b/admin-v2/src/services/telemetry.ts index 5df2312..a94ca13 100644 --- a/admin-v2/src/services/telemetry.ts +++ b/admin-v2/src/services/telemetry.ts @@ -1,10 +1,10 @@ -import request from './request' +import request, { withSignal } from './request' import type { ModelUsageStat, DailyUsageStat } from '@/types' export const telemetryService = { - modelStats: (params?: Record) => - request.get('/telemetry/stats', { params }).then((r) => r.data), + modelStats: (params?: Record, signal?: AbortSignal) => + request.get('/telemetry/stats', withSignal({ params }, signal)).then((r) => r.data), - dailyStats: (params?: { days?: number }) => - request.get('/telemetry/daily', { params }).then((r) => r.data), + dailyStats: (params?: { days?: number }, signal?: AbortSignal) => + request.get('/telemetry/daily', withSignal({ params }, signal)).then((r) => r.data), } diff --git a/admin-v2/src/services/usage.ts b/admin-v2/src/services/usage.ts index b05f98d..1f85ccc 100644 --- a/admin-v2/src/services/usage.ts +++ b/admin-v2/src/services/usage.ts @@ -1,12 +1,12 @@ -import request from './request' +import request, { withSignal } from './request' import type { UsageRecord, UsageByModel } from '@/types' export const usageService = { - daily: (params?: { days?: number }) => - request.get<{ by_day: UsageRecord[] }>('/usage', { params: { ...params, group_by: 'day' } }) + daily: (params?: { days?: number }, signal?: AbortSignal) => + request.get<{ by_day: UsageRecord[] }>('/usage', withSignal({ params: { ...params, group_by: 'day' } }, signal)) .then((r) => r.data.by_day || []), - byModel: (params?: { days?: number }) => - request.get<{ by_model: UsageByModel[] }>('/usage', { params: { ...params, group_by: 'model' } }) + byModel: (params?: { days?: number }, signal?: AbortSignal) => + request.get<{ by_model: UsageByModel[] }>('/usage', withSignal({ params: { ...params, group_by: 'model' } }, signal)) .then((r) => r.data.by_model || []), } diff --git a/admin-v2/vite.config.ts b/admin-v2/vite.config.ts index 14f22f5..1534916 100644 --- a/admin-v2/vite.config.ts +++ b/admin-v2/vite.config.ts @@ -15,6 +15,16 @@ export default defineConfig({ '/api': { target: 'http://localhost:8080', changeOrigin: true, + timeout: 30_000, + proxyTimeout: 30_000, + configure: (proxy) => { + proxy.on('proxyReq', (proxyReq) => { + proxyReq.setTimeout(30_000) + }) + proxy.on('proxyRes', (proxyRes) => { + proxyRes.setTimeout(30_000) + }) + }, }, }, }, diff --git a/crates/zclaw-saas/Cargo.toml b/crates/zclaw-saas/Cargo.toml index c22c303..a97446a 100644 --- a/crates/zclaw-saas/Cargo.toml +++ b/crates/zclaw-saas/Cargo.toml @@ -31,6 +31,7 @@ sha2 = { workspace = true } rand = { workspace = true } dashmap = { workspace = true } hex = { workspace = true } +socket2 = { workspace = true } url = "2" axum = { workspace = true } diff --git a/crates/zclaw-saas/src/auth/totp.rs b/crates/zclaw-saas/src/auth/totp.rs index 75cda66..2425257 100644 --- a/crates/zclaw-saas/src/auth/totp.rs +++ b/crates/zclaw-saas/src/auth/totp.rs @@ -148,6 +148,34 @@ pub async fn verify_totp( return Err(SaasError::InvalidInput("TOTP 码必须是 6 位数字".into())); } + // TOTP 暴力破解保护: 10 分钟内最多 5 次失败 + const MAX_TOTP_FAILURES: u32 = 5; + const TOTP_LOCKOUT_SECS: u64 = 600; + let now = std::time::Instant::now(); + let lockout_duration = std::time::Duration::from_secs(TOTP_LOCKOUT_SECS); + + let is_locked = { + if let Some(entry) = state.totp_fail_counts.get(&ctx.account_id) { + let (count, first_fail) = entry.value(); + if *count >= MAX_TOTP_FAILURES && now.duration_since(*first_fail) < lockout_duration { + true + } else { + // 窗口过期,重置 + drop(entry); + state.totp_fail_counts.remove(&ctx.account_id); + false + } + } else { + false + } + }; + + if is_locked { + return Err(SaasError::RateLimited( + format!("TOTP 验证失败次数过多,请 {} 秒后重试", TOTP_LOCKOUT_SECS) + )); + } + // 获取存储的密钥 let (totp_secret,): (Option,) = sqlx::query_as( "SELECT totp_secret FROM accounts WHERE id = $1" @@ -172,9 +200,24 @@ pub async fn verify_totp( }; if !verify_totp_code(&secret, code) { + // 记录失败次数 + let new_count = { + let mut entry = state.totp_fail_counts + .entry(ctx.account_id.clone()) + .or_insert((0, now)); + entry.value_mut().0 += 1; + entry.value().0 + }; + tracing::warn!( + "TOTP verify failed for account {} ({}/{} attempts)", + ctx.account_id, new_count, MAX_TOTP_FAILURES + ); return Err(SaasError::Totp("TOTP 码验证失败".into())); } + // 验证成功 → 清除失败计数 + state.totp_fail_counts.remove(&ctx.account_id); + // 验证成功 → 启用 TOTP,同时确保密钥已加密 let final_secret = if encrypted_secret.starts_with(crypto::ENCRYPTED_PREFIX) { encrypted_secret @@ -183,10 +226,10 @@ pub async fn verify_totp( encrypt_totp_secret(&secret, &enc_key)? }; - let now = chrono::Utc::now().to_rfc3339(); + let now_ts = chrono::Utc::now().to_rfc3339(); sqlx::query("UPDATE accounts SET totp_enabled = true, totp_secret = $1, updated_at = $2 WHERE id = $3") .bind(&final_secret) - .bind(&now) + .bind(&now_ts) .bind(&ctx.account_id) .execute(&state.db) .await?; diff --git a/crates/zclaw-saas/src/db.rs b/crates/zclaw-saas/src/db.rs index 5a0b971..580eaf0 100644 --- a/crates/zclaw-saas/src/db.rs +++ b/crates/zclaw-saas/src/db.rs @@ -90,7 +90,7 @@ async fn run_migration_files(pool: &PgPool, dir: &std::path::Path) -> SaasResult let filename = path.file_name().unwrap_or_default().to_string_lossy(); tracing::info!("Running migration: {}", filename); let content = std::fs::read_to_string(path)?; - for stmt in content.split(';') { + for stmt in split_sql_statements(&content) { let trimmed = stmt.trim(); if !trimmed.is_empty() && !trimmed.starts_with("--") { sqlx::query(trimmed).execute(pool).await?; @@ -100,6 +100,150 @@ async fn run_migration_files(pool: &PgPool, dir: &std::path::Path) -> SaasResult Ok(()) } +/// 按语句分割 SQL 文件内容,正确处理: +/// - 单引号字符串 `'...'` +/// - 双引号标识符 `"..."` +/// - 美元符号引用字符串 `$$...$$` 和 `$tag$...$tag$` +/// - `--` 单行注释 +/// - `/* ... */` 块注释 +/// - `E'...'` 转义字符串 +fn split_sql_statements(sql: &str) -> Vec { + let mut statements = Vec::new(); + let mut current = String::new(); + let mut chars = sql.chars().peekable(); + + while let Some(ch) = chars.next() { + match ch { + '\'' => { + // 单引号字符串 + current.push(ch); + loop { + match chars.next() { + Some('\'') => { + current.push('\''); + // 检查是否为转义引号 '' + if chars.peek() == Some(&'\'') { + current.push(chars.next().unwrap()); + } else { + break; + } + } + Some(c) => current.push(c), + None => break, + } + } + } + '"' => { + // 双引号标识符 + current.push(ch); + loop { + match chars.next() { + Some('"') => { + current.push('"'); + break; + } + Some(c) => current.push(c), + None => break, + } + } + } + '-' if chars.peek() == Some(&'-') => { + // 单行注释: 跳过直到行尾 + chars.next(); // consume second '-' + while let Some(&c) = chars.peek() { + if c == '\n' { + chars.next(); + current.push(c); + break; + } + chars.next(); + } + } + '/' if chars.peek() == Some(&'*') => { + // 块注释: 跳过直到 */ + chars.next(); // consume '*' + current.push_str("/*"); + let mut prev = ' '; + loop { + match chars.next() { + Some('/') if prev == '*' => { + current.push('/'); + break; + } + Some(c) => { + current.push(c); + prev = c; + } + None => break, + } + } + } + '$' => { + // 美元符号引用: $$ 或 $tag$ ... $tag$ + current.push(ch); + // 读取 tag (字母数字和下划线) + let mut tag = String::new(); + while let Some(&c) = chars.peek() { + if c == '$' || c.is_alphanumeric() || c == '_' { + if c == '$' { + chars.next(); + current.push(c); + break; + } + chars.next(); + tag.push(c); + current.push(c); + } else { + break; + } + } + // 如果 tag 为空,就是 $$ 格式 + let end_marker = if tag.is_empty() { + "$$".to_string() + } else { + format!("${}$", tag) + }; + // 读取直到遇到 end_marker + let mut buf = String::new(); + loop { + match chars.next() { + Some(c) => { + current.push(c); + buf.push(c); + if buf.len() > end_marker.len() { + buf.remove(0); + } + if buf == end_marker { + break; + } + } + None => break, + } + } + } + ';' => { + // 语句结束 + let trimmed = current.trim().to_string(); + if !trimmed.is_empty() { + statements.push(trimmed); + } + current.clear(); + } + _ => { + current.push(ch); + } + } + } + + // 最后一条语句 (可能不以分号结尾) + let trimmed = current.trim().to_string(); + if !trimmed.is_empty() { + statements.push(trimmed); + } + + statements +} + /// Seed 角色数据 async fn seed_roles(pool: &PgPool) -> SaasResult<()> { let now = chrono::Utc::now().to_rfc3339(); diff --git a/crates/zclaw-saas/src/main.rs b/crates/zclaw-saas/src/main.rs index 12ebb4f..c0fd4cb 100644 --- a/crates/zclaw-saas/src/main.rs +++ b/crates/zclaw-saas/src/main.rs @@ -67,7 +67,9 @@ async fn main() -> anyhow::Result<()> { Ok(()) } -async fn health_handler(State(state): State) -> axum::Json { +async fn health_handler( + State(state): State, +) -> (axum::http::StatusCode, axum::Json ) { // health 必须独立快速返回,用 3s 超时避免连接池满时阻塞 let db_healthy = tokio::time::timeout( std::time::Duration::from_secs(3), @@ -77,15 +79,41 @@ async fn health_handler(State(state): State) -> axum::Json= 80% 返回 503 (degraded) + let pool = &state.db; + let total = pool.options().get_max_connections() as usize; + if total > 0 { + let idle = pool.num_idle() as usize; + let used = total - idle; + let ratio = used * 100 / total; + if ratio >= 80 { + return ( + axum::http::StatusCode::SERVICE_UNAVAILABLE, + axum::Json(serde_json::json!({ + "status": "degraded", + "database": true, + "database_pool": { + "usage_pct": ratio, + "used": used, + "total": total, + }, + "timestamp": chrono::Utc::now().to_rfc3339(), + "version": env!("CARGO_PKG_VERSION"), + })), + ); + } + } - axum::Json(serde_json::json!({ + let status = if db_healthy { "healthy" } else { "degraded" }; + let code = if db_healthy { + axum::http::StatusCode::OK } else { axum::http::StatusCode::SERVICE_UNAVAILABLE }; + + (code, axum::Json(serde_json::json!({ "status": status, "database": db_healthy, "timestamp": chrono::Utc::now().to_rfc3339(), "version": env!("CARGO_PKG_VERSION"), - })) + }))) } async fn build_router(state: AppState) -> axum::Router { diff --git a/crates/zclaw-saas/src/relay/key_pool.rs b/crates/zclaw-saas/src/relay/key_pool.rs index cfe5c7f..bfbab7d 100644 --- a/crates/zclaw-saas/src/relay/key_pool.rs +++ b/crates/zclaw-saas/src/relay/key_pool.rs @@ -4,7 +4,7 @@ use sqlx::PgPool; use crate::error::{SaasError, SaasResult}; -use crate::models::{ProviderKeySelectRow, ProviderKeyRow}; + use crate::models::ProviderKeyRow; use crate::crypto; /// 解密 key_value (如果已加密),否则原样返回 @@ -36,19 +36,63 @@ pub struct KeySelection { } /// 从 provider 的 Key Pool 中选择最佳可用 Key +/// +/// 优化: 单次 JOIN 查询获取 Key + 当前分钟使用量,避免 N+1 查询 pub async fn select_best_key(db: &PgPool, provider_id: &str, enc_key: &[u8; 32]) -> SaasResult { let now = chrono::Utc::now().to_rfc3339(); let current_minute = chrono::Utc::now().format("%Y-%m-%dT%H:%M").to_string(); - // 获取所有活跃 Key - let rows: Vec = + // 单次查询: 活跃 Key + 当前分钟的 RPM/TPM 使用量 (LEFT JOIN) + let rows: Vec<(String, String, i32, Option, Option, Option, Option, Option)> = sqlx::query_as( - "SELECT id, key_value, priority, max_rpm, max_tpm, quota_reset_interval - FROM provider_keys - WHERE provider_id = $1 AND is_active = TRUE AND (cooldown_until IS NULL OR cooldown_until <= $2) - ORDER BY priority ASC" - ).bind(provider_id).bind(&now).fetch_all(db).await?; + "SELECT pk.id, pk.key_value, pk.priority, pk.max_rpm, pk.max_tpm, pk.quota_reset_interval, + uw.request_count, uw.token_count + FROM provider_keys pk + LEFT JOIN key_usage_window uw ON pk.id = uw.key_id AND uw.window_minute = $1 + WHERE pk.provider_id = $2 AND pk.is_active = TRUE + AND (pk.cooldown_until IS NULL OR pk.cooldown_until <= $3) + ORDER BY pk.priority ASC" + ).bind(¤t_minute).bind(provider_id).bind(&now).fetch_all(db).await?; + for (id, key_value, priority, max_rpm, max_tpm, quota_reset_interval, req_count, token_count) in &rows { + // RPM 检查 + if let Some(rpm_limit) = max_rpm { + if *rpm_limit > 0 { + let count = req_count.unwrap_or(0); + if count >= *rpm_limit { + tracing::debug!("Key {} hit RPM limit ({}/{})", id, count, rpm_limit); + continue; + } + } + } + + // TPM 检查 + if let Some(tpm_limit) = max_tpm { + if *tpm_limit > 0 { + let tokens = token_count.unwrap_or(0); + if tokens >= *tpm_limit { + tracing::debug!("Key {} hit TPM limit ({}/{})", id, tokens, tpm_limit); + continue; + } + } + } + + // 此 Key 可用 — 解密 key_value + let decrypted_kv = decrypt_key_value(key_value, enc_key)?; + return Ok(KeySelection { + key: PoolKey { + id: id.clone(), + key_value: decrypted_kv, + priority: *priority, + max_rpm: *max_rpm, + max_tpm: *max_tpm, + quota_reset_interval: quota_reset_interval.clone(), + }, + key_id: id.clone(), + }); + } + + // 所有 Key 都超限或无 Key if rows.is_empty() { // 检查是否有冷却中的 Key,返回预计等待时间 let cooldown_row: Option<(String,)> = sqlx::query_as( @@ -59,88 +103,14 @@ pub async fn select_best_key(db: &PgPool, provider_id: &str, enc_key: &[u8; 32]) ).bind(provider_id).bind(&now).fetch_optional(db).await?; if let Some((earliest,)) = cooldown_row { - // 尝试解析时间差 let wait_secs = parse_cooldown_remaining(&earliest, &now); return Err(SaasError::RateLimited( format!("所有 Key 均在冷却中,预计 {} 秒后可用", wait_secs) )); } - - // 检查 provider 级别的单 Key - let provider_key: Option = sqlx::query_scalar( - "SELECT api_key FROM providers WHERE id = $1" - ).bind(provider_id).fetch_optional(db).await?.flatten(); - - if let Some(key) = provider_key { - let decrypted = decrypt_key_value(&key, enc_key)?; - return Ok(KeySelection { - key: PoolKey { - id: "provider-fallback".to_string(), - key_value: decrypted, - priority: 0, - max_rpm: None, - max_tpm: None, - quota_reset_interval: None, - }, - key_id: "provider-fallback".to_string(), - }); - } - - return Err(SaasError::NotFound(format!("Provider {} 没有可用的 API Key", provider_id))); } - // 检查滑动窗口使用量 - for row in rows { - // 检查 RPM 限额 - if let Some(rpm_limit) = row.max_rpm { - if rpm_limit > 0 { - let window: Option<(i64,)> = sqlx::query_as( - "SELECT COALESCE(SUM(request_count), 0) FROM key_usage_window - WHERE key_id = $1 AND window_minute = $2" - ).bind(&row.id).bind(¤t_minute).fetch_optional(db).await?; - - if let Some((count,)) = window { - if count >= rpm_limit { - tracing::debug!("Key {} hit RPM limit ({}/{})", row.id, count, rpm_limit); - continue; - } - } - } - } - - // 检查 TPM 限额 - if let Some(tpm_limit) = row.max_tpm { - if tpm_limit > 0 { - let window: Option<(i64,)> = sqlx::query_as( - "SELECT COALESCE(SUM(token_count), 0) FROM key_usage_window - WHERE key_id = $1 AND window_minute = $2" - ).bind(&row.id).bind(¤t_minute).fetch_optional(db).await?; - - if let Some((tokens,)) = window { - if tokens >= tpm_limit { - tracing::debug!("Key {} hit TPM limit ({}/{})", row.id, tokens, tpm_limit); - continue; - } - } - } - } - - // 此 Key 可用 — 解密 key_value - let decrypted_kv = decrypt_key_value(&row.key_value, enc_key)?; - return Ok(KeySelection { - key: PoolKey { - id: row.id.clone(), - key_value: decrypted_kv, - priority: row.priority, - max_rpm: row.max_rpm, - max_tpm: row.max_tpm, - quota_reset_interval: row.quota_reset_interval, - }, - key_id: row.id, - }); - } - - // 所有 Key 都超限,回退到 provider 单 Key + // 回退到 provider 单 Key let provider_key: Option = sqlx::query_scalar( "SELECT api_key FROM providers WHERE id = $1" ).bind(provider_id).fetch_optional(db).await?.flatten(); @@ -160,9 +130,13 @@ pub async fn select_best_key(db: &PgPool, provider_id: &str, enc_key: &[u8; 32]) }); } - Err(SaasError::RateLimited( - format!("Provider {} 所有 Key 均已达限额", provider_id) - )) + if rows.is_empty() { + Err(SaasError::NotFound(format!("Provider {} 没有可用的 API Key", provider_id))) + } else { + Err(SaasError::RateLimited( + format!("Provider {} 所有 Key 均已达限额", provider_id) + )) + } } /// 记录 Key 使用量(滑动窗口) diff --git a/crates/zclaw-saas/src/relay/service.rs b/crates/zclaw-saas/src/relay/service.rs index 4289059..7e65369 100644 --- a/crates/zclaw-saas/src/relay/service.rs +++ b/crates/zclaw-saas/src/relay/service.rs @@ -298,7 +298,21 @@ pub async fn execute_relay( let body = axum::body::Body::from_stream(body_stream); // SSE 流结束后异步记录 usage + Key 使用量 + // 使用全局 Arc 限制并发 spawned tasks,防止高并发时耗尽连接池 + static SSE_SPAWN_SEMAPHORE: std::sync::OnceLock> = std::sync::OnceLock::new(); + let semaphore = SSE_SPAWN_SEMAPHORE.get_or_init(|| Arc::new(tokio::sync::Semaphore::new(16))); + let permit = match semaphore.clone().try_acquire_owned() { + Ok(p) => p, + Err(_) => { + // 信号量满时跳过 usage 记录,流本身不受影响 + tracing::warn!("SSE usage spawn at capacity, skipping usage record for task {}", task_id); + return Ok(RelayResponse::Sse(body)); + } + }; + tokio::spawn(async move { + let _permit = permit; // 持有 permit 直到任务完成 + tokio::time::sleep(std::time::Duration::from_secs(3)).await; tokio::time::sleep(std::time::Duration::from_secs(3)).await; let capture = usage_capture.lock().await; let (input, output) = ( @@ -464,11 +478,11 @@ async fn validate_provider_url(url: &str) -> SaasResult<()> { // 去除 IPv6 方括号 let host = host.trim_start_matches('[').trim_end_matches(']'); - // 精确匹配的阻止列表 + // 精确匹配的阻止列表: 仅包含主机名和特殊域名 + // 私有 IP 范围 (10.x, 172.16-31.x, 192.168.x, 127.x, 169.254.x, ::1 等) + // 由 is_private_ip() 统一判断,无需在此重复列出 let blocked_exact = [ - "127.0.0.1", "0.0.0.0", "localhost", "::1", "::ffff:127.0.0.1", - "0:0:0:0:0:ffff:7f00:1", "169.254.169.254", "metadata.google.internal", - "10.0.0.1", "172.16.0.1", "192.168.0.1", + "localhost", "metadata.google.internal", ]; if blocked_exact.contains(&host) { return Err(SaasError::InvalidInput(format!("provider URL 指向禁止的内网地址: {}", host))); diff --git a/crates/zclaw-saas/src/state.rs b/crates/zclaw-saas/src/state.rs index d5d090a..b27032c 100644 --- a/crates/zclaw-saas/src/state.rs +++ b/crates/zclaw-saas/src/state.rs @@ -21,6 +21,8 @@ pub struct AppState { pub rate_limit_entries: Arc>>, /// 角色权限缓存: role_id → permissions list pub role_permissions_cache: Arc>>, + /// TOTP 失败计数: account_id → (失败次数, 首次失败时间) + pub totp_fail_counts: Arc>, /// 无锁 rate limit RPM(从 config 同步,避免每个请求获取 RwLock) rate_limit_rpm: Arc, /// Worker 调度器 (异步后台任务) @@ -37,6 +39,7 @@ impl AppState { jwt_secret, rate_limit_entries: Arc::new(dashmap::DashMap::new()), role_permissions_cache: Arc::new(dashmap::DashMap::new()), + totp_fail_counts: Arc::new(dashmap::DashMap::new()), rate_limit_rpm: Arc::new(AtomicU32::new(rpm)), worker_dispatcher, }) diff --git a/crates/zclaw-saas/src/workers/mod.rs b/crates/zclaw-saas/src/workers/mod.rs index e49da9c..6af7383 100644 --- a/crates/zclaw-saas/src/workers/mod.rs +++ b/crates/zclaw-saas/src/workers/mod.rs @@ -155,6 +155,7 @@ impl WorkerDispatcher { fn start_consumer(&self, mut receiver: mpsc::Receiver) { let db = self.db.clone(); let handlers = self.handlers.clone(); + let sender = self.sender.clone(); tokio::spawn(async move { while let Some(msg) = receiver.recv().await { @@ -169,6 +170,7 @@ impl WorkerDispatcher { let worker_name = msg.worker_name.clone(); let max_retries = handler.max_retries(); let db = db.clone(); + let sender = sender.clone(); tokio::spawn(async move { match handler.perform(&db, &msg.args_json).await { @@ -177,18 +179,27 @@ impl WorkerDispatcher { } Err(e) => { if msg.attempt < max_retries { - tracing::warn!( - "Worker {} failed (attempt {}/{}): {}. Will retry.", - worker_name, msg.attempt, max_retries, e - ); - // 简单退避: 2^attempt 秒 let delay = std::time::Duration::from_secs(1 << msg.attempt.min(4)); + tracing::warn!( + "Worker {} failed (attempt {}/{}): {}. Re-queuing after {:?}.", + worker_name, msg.attempt, max_retries, e, delay + ); tokio::time::sleep(delay).await; - // 注意: 重试在当前设计中通过日志提醒 - // 生产环境应将任务重新入队 + // 重新入队(递增 attempt 计数) + let retry_msg = TaskMessage { + worker_name: msg.worker_name.clone(), + args_json: msg.args_json.clone(), + attempt: msg.attempt + 1, + }; + if let Err(send_err) = sender.send(retry_msg).await { + tracing::error!( + "Worker {} retry enqueue failed (channel closed): {}", + worker_name, send_err + ); + } } else { tracing::error!( - "Worker {} failed after {} attempts: {}", + "Worker {} failed after {} attempts: {}. Giving up.", worker_name, max_retries, e ); } diff --git a/desktop/src/components/PipelineResultPreview.tsx b/desktop/src/components/PipelineResultPreview.tsx index 08e9ada..9f1c787 100644 --- a/desktop/src/components/PipelineResultPreview.tsx +++ b/desktop/src/components/PipelineResultPreview.tsx @@ -186,7 +186,7 @@ function MarkdownPreview({ content }: MarkdownPreviewProps) { return (
); }