diff --git a/Cargo.lock b/Cargo.lock
index e7c640b..9cea298 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2506,7 +2506,7 @@ dependencies = [
"libc",
"percent-encoding",
"pin-project-lite",
- "socket2",
+ "socket2 0.6.3",
"tokio",
"tower-service",
"tracing",
@@ -4189,7 +4189,7 @@ dependencies = [
"quinn-udp",
"rustc-hash",
"rustls",
- "socket2",
+ "socket2 0.6.3",
"thiserror 2.0.18",
"tokio",
"tracing",
@@ -4226,7 +4226,7 @@ dependencies = [
"cfg_aliases",
"libc",
"once_cell",
- "socket2",
+ "socket2 0.6.3",
"tracing",
"windows-sys 0.60.2",
]
@@ -5133,6 +5133,16 @@ dependencies = [
"serde",
]
+[[package]]
+name = "socket2"
+version = "0.5.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678"
+dependencies = [
+ "libc",
+ "windows-sys 0.52.0",
+]
+
[[package]]
name = "socket2"
version = "0.6.3"
@@ -6048,7 +6058,7 @@ dependencies = [
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
- "socket2",
+ "socket2 0.6.3",
"tokio-macros",
"windows-sys 0.61.2",
]
@@ -8328,6 +8338,7 @@ dependencies = [
"serde",
"serde_json",
"sha2",
+ "socket2 0.5.10",
"sqlx",
"tempfile",
"thiserror 2.0.18",
diff --git a/Cargo.toml b/Cargo.toml
index 0f153a1..6385ac8 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -110,6 +110,9 @@ argon2 = "0.5"
totp-rs = "5"
hex = "0.4"
+# TCP socket configuration
+socket2 = { version = "0.5", features = ["all"] }
+
# Internal crates
zclaw-types = { path = "crates/zclaw-types" }
zclaw-memory = { path = "crates/zclaw-memory" }
diff --git a/admin-v2/src/main.tsx b/admin-v2/src/main.tsx
index c50dabc..20210ff 100644
--- a/admin-v2/src/main.tsx
+++ b/admin-v2/src/main.tsx
@@ -4,6 +4,7 @@ import { RouterProvider } from 'react-router-dom'
import { ConfigProvider, App as AntApp } from 'antd'
import zhCN from 'antd/locale/zh_CN'
import { router } from './router'
+import { ErrorBoundary } from './components/ErrorBoundary'
const queryClient = new QueryClient({
defaultOptions: {
@@ -16,11 +17,13 @@ const queryClient = new QueryClient({
})
createRoot(document.getElementById('root')!).render(
-
-
-
-
-
-
- ,
+
+
+
+
+
+
+
+
+ ,
)
diff --git a/admin-v2/src/pages/Accounts.tsx b/admin-v2/src/pages/Accounts.tsx
index 01569d0..b6e6ece 100644
--- a/admin-v2/src/pages/Accounts.tsx
+++ b/admin-v2/src/pages/Accounts.tsx
@@ -42,7 +42,7 @@ export default function Accounts() {
const { data, isLoading } = useQuery({
queryKey: ['accounts'],
- queryFn: () => accountService.list(),
+ queryFn: ({ signal }) => accountService.list(signal),
})
const updateMutation = useMutation({
diff --git a/admin-v2/src/pages/AgentTemplates.tsx b/admin-v2/src/pages/AgentTemplates.tsx
index 492ff7d..ff5e512 100644
--- a/admin-v2/src/pages/AgentTemplates.tsx
+++ b/admin-v2/src/pages/AgentTemplates.tsx
@@ -26,7 +26,7 @@ export default function AgentTemplates() {
const { data, isLoading } = useQuery({
queryKey: ['agent-templates'],
- queryFn: () => agentTemplateService.list(),
+ queryFn: ({ signal }) => agentTemplateService.list(signal),
})
const createMutation = useMutation({
diff --git a/admin-v2/src/pages/ApiKeys.tsx b/admin-v2/src/pages/ApiKeys.tsx
index 9a533bd..58f20c9 100644
--- a/admin-v2/src/pages/ApiKeys.tsx
+++ b/admin-v2/src/pages/ApiKeys.tsx
@@ -21,7 +21,7 @@ export default function ApiKeys() {
const { data, isLoading } = useQuery({
queryKey: ['api-keys'],
- queryFn: () => apiKeyService.list(),
+ queryFn: ({ signal }) => apiKeyService.list(signal),
})
const createMutation = useMutation({
diff --git a/admin-v2/src/pages/Config.tsx b/admin-v2/src/pages/Config.tsx
index 1e9ba30..a033bb8 100644
--- a/admin-v2/src/pages/Config.tsx
+++ b/admin-v2/src/pages/Config.tsx
@@ -20,7 +20,7 @@ export default function Config() {
const { data, isLoading } = useQuery({
queryKey: ['config', category],
- queryFn: () => configService.list({ category }),
+ queryFn: ({ signal }) => configService.list({ category }, signal),
})
const updateMutation = useMutation({
diff --git a/admin-v2/src/pages/Dashboard.tsx b/admin-v2/src/pages/Dashboard.tsx
index 6ccc8ae..4a1e812 100644
--- a/admin-v2/src/pages/Dashboard.tsx
+++ b/admin-v2/src/pages/Dashboard.tsx
@@ -42,12 +42,12 @@ const actionColors: Record = {
export default function Dashboard() {
const { data: stats, isLoading: statsLoading, error: statsError } = useQuery({
queryKey: ['dashboard-stats'],
- queryFn: () => statsService.dashboard(),
+ queryFn: ({ signal }) => statsService.dashboard(signal),
})
const { data: logsData, isLoading: logsLoading } = useQuery({
queryKey: ['recent-logs'],
- queryFn: () => logService.list({ page: 1, page_size: 10 }),
+ queryFn: ({ signal }) => logService.list({ page: 1, page_size: 10 }, signal),
})
if (statsError) {
diff --git a/admin-v2/src/pages/Logs.tsx b/admin-v2/src/pages/Logs.tsx
index ab485cb..5111a67 100644
--- a/admin-v2/src/pages/Logs.tsx
+++ b/admin-v2/src/pages/Logs.tsx
@@ -42,7 +42,7 @@ export default function Logs() {
const { data, isLoading } = useQuery({
queryKey: ['logs', page, actionFilter],
- queryFn: () => logService.list({ page, page_size: 20, action: actionFilter }),
+ queryFn: ({ signal }) => logService.list({ page, page_size: 20, action: actionFilter }, signal),
})
const columns: ProColumns[] = [
diff --git a/admin-v2/src/pages/Models.tsx b/admin-v2/src/pages/Models.tsx
index f8be5c8..e8ef5a5 100644
--- a/admin-v2/src/pages/Models.tsx
+++ b/admin-v2/src/pages/Models.tsx
@@ -20,12 +20,12 @@ export default function Models() {
const { data, isLoading } = useQuery({
queryKey: ['models'],
- queryFn: () => modelService.list(),
+ queryFn: ({ signal }) => modelService.list(signal),
})
const { data: providersData } = useQuery({
queryKey: ['providers-for-select'],
- queryFn: () => providerService.list(),
+ queryFn: ({ signal }) => providerService.list(signal),
})
const createMutation = useMutation({
diff --git a/admin-v2/src/pages/Prompts.tsx b/admin-v2/src/pages/Prompts.tsx
index c6bd4cc..7f55ea5 100644
--- a/admin-v2/src/pages/Prompts.tsx
+++ b/admin-v2/src/pages/Prompts.tsx
@@ -26,18 +26,18 @@ export default function Prompts() {
const { data, isLoading } = useQuery({
queryKey: ['prompts'],
- queryFn: () => promptService.list(),
+ queryFn: ({ signal }) => promptService.list(signal),
})
const { data: detailData } = useQuery({
queryKey: ['prompt-detail', detailName],
- queryFn: () => promptService.get(detailName!),
+ queryFn: ({ signal }) => promptService.get(detailName!, signal),
enabled: !!detailName,
})
const { data: versionsData } = useQuery({
queryKey: ['prompt-versions', detailName],
- queryFn: () => promptService.listVersions(detailName!),
+ queryFn: ({ signal }) => promptService.listVersions(detailName!, signal),
enabled: !!detailName,
})
diff --git a/admin-v2/src/pages/Providers.tsx b/admin-v2/src/pages/Providers.tsx
index 5612528..5764391 100644
--- a/admin-v2/src/pages/Providers.tsx
+++ b/admin-v2/src/pages/Providers.tsx
@@ -22,12 +22,12 @@ export default function Providers() {
const { data, isLoading } = useQuery({
queryKey: ['providers'],
- queryFn: () => providerService.list(),
+ queryFn: ({ signal }) => providerService.list(signal),
})
const { data: keysData, isLoading: keysLoading } = useQuery({
queryKey: ['provider-keys', keyModalProviderId],
- queryFn: () => providerService.listKeys(keyModalProviderId!),
+ queryFn: ({ signal }) => providerService.listKeys(keyModalProviderId!, signal),
enabled: !!keyModalProviderId,
})
diff --git a/admin-v2/src/pages/Relay.tsx b/admin-v2/src/pages/Relay.tsx
index aa984d0..0bf066b 100644
--- a/admin-v2/src/pages/Relay.tsx
+++ b/admin-v2/src/pages/Relay.tsx
@@ -34,7 +34,7 @@ export default function Relay() {
const { data, isLoading } = useQuery({
queryKey: ['relay-tasks', page, statusFilter],
- queryFn: () => relayService.list({ page, page_size: 20, status: statusFilter }),
+ queryFn: ({ signal }) => relayService.list({ page, page_size: 20, status: statusFilter }, signal),
})
const columns: ProColumns[] = [
diff --git a/admin-v2/src/pages/Usage.tsx b/admin-v2/src/pages/Usage.tsx
index 35acfa2..a40dc69 100644
--- a/admin-v2/src/pages/Usage.tsx
+++ b/admin-v2/src/pages/Usage.tsx
@@ -19,12 +19,12 @@ export default function Usage() {
const { data: dailyData, isLoading: dailyLoading, error: dailyError } = useQuery({
queryKey: ['usage-daily', days],
- queryFn: () => telemetryService.dailyStats({ days }),
+ queryFn: ({ signal }) => telemetryService.dailyStats({ days }, signal),
})
const { data: modelData, isLoading: modelLoading } = useQuery({
queryKey: ['usage-model', days],
- queryFn: () => telemetryService.modelStats({}),
+ queryFn: ({ signal }) => telemetryService.modelStats({}, signal),
})
if (dailyError) {
diff --git a/admin-v2/src/services/accounts.ts b/admin-v2/src/services/accounts.ts
index 56ff1f2..e089ba8 100644
--- a/admin-v2/src/services/accounts.ts
+++ b/admin-v2/src/services/accounts.ts
@@ -1,16 +1,16 @@
-import request from './request'
+import request, { withSignal } from './request'
import type { AccountPublic, PaginatedResponse } from '@/types'
export const accountService = {
- list: (params?: Record) =>
- request.get>('/accounts', { params }).then((r) => r.data),
+ list: (params?: Record, signal?: AbortSignal) =>
+ request.get>('/accounts', withSignal({ params }, signal)).then((r) => r.data),
- get: (id: string) =>
- request.get(`/accounts/${id}`).then((r) => r.data),
+ get: (id: string, signal?: AbortSignal) =>
+ request.get(`/accounts/${id}`, withSignal({}, signal)).then((r) => r.data),
- update: (id: string, data: Partial>) =>
- request.patch(`/accounts/${id}`, data).then((r) => r.data),
+ update: (id: string, data: Partial>, signal?: AbortSignal) =>
+ request.patch(`/accounts/${id}`, data, withSignal({}, signal)).then((r) => r.data),
- updateStatus: (id: string, data: { status: AccountPublic['status'] }) =>
- request.patch(`/accounts/${id}/status`, data).then((r) => r.data),
+ updateStatus: (id: string, data: { status: AccountPublic['status'] }, signal?: AbortSignal) =>
+ request.patch(`/accounts/${id}/status`, data, withSignal({}, signal)).then((r) => r.data),
}
diff --git a/admin-v2/src/services/agent-templates.ts b/admin-v2/src/services/agent-templates.ts
index 59439d6..5a318a0 100644
--- a/admin-v2/src/services/agent-templates.ts
+++ b/admin-v2/src/services/agent-templates.ts
@@ -1,28 +1,28 @@
-import request from './request'
+import request, { withSignal } from './request'
import type { AgentTemplate, PaginatedResponse } from '@/types'
export const agentTemplateService = {
- list: (params?: Record) =>
- request.get>('/agent-templates', { params }).then((r) => r.data),
+ list: (params?: Record, signal?: AbortSignal) =>
+ request.get>('/agent-templates', withSignal({ params }, signal)).then((r) => r.data),
- get: (id: string) =>
- request.get(`/agent-templates/${id}`).then((r) => r.data),
+ get: (id: string, signal?: AbortSignal) =>
+ request.get(`/agent-templates/${id}`, withSignal({}, signal)).then((r) => r.data),
create: (data: {
name: string; description?: string; category?: string; source?: string
model?: string; system_prompt?: string; tools?: string[]
capabilities?: string[]; temperature?: number; max_tokens?: number
visibility?: string
- }) =>
- request.post('/agent-templates', data).then((r) => r.data),
+ }, signal?: AbortSignal) =>
+ request.post('/agent-templates', data, withSignal({}, signal)).then((r) => r.data),
update: (id: string, data: {
description?: string; model?: string; system_prompt?: string
tools?: string[]; capabilities?: string[]; temperature?: number
max_tokens?: number; visibility?: string; status?: string
- }) =>
- request.post(`/agent-templates/${id}`, data).then((r) => r.data),
+ }, signal?: AbortSignal) =>
+ request.post(`/agent-templates/${id}`, data, withSignal({}, signal)).then((r) => r.data),
- archive: (id: string) =>
- request.delete(`/agent-templates/${id}`).then((r) => r.data),
+ archive: (id: string, signal?: AbortSignal) =>
+ request.delete(`/agent-templates/${id}`, withSignal({}, signal)).then((r) => r.data),
}
diff --git a/admin-v2/src/services/api-keys.ts b/admin-v2/src/services/api-keys.ts
index 02d98db..5268c44 100644
--- a/admin-v2/src/services/api-keys.ts
+++ b/admin-v2/src/services/api-keys.ts
@@ -1,13 +1,13 @@
-import request from './request'
+import request, { withSignal } from './request'
import type { TokenInfo, CreateTokenRequest, PaginatedResponse } from '@/types'
export const apiKeyService = {
- list: (params?: Record) =>
- request.get>('/keys', { params }).then((r) => r.data),
+ list: (params?: Record, signal?: AbortSignal) =>
+ request.get>('/keys', withSignal({ params }, signal)).then((r) => r.data),
- create: (data: CreateTokenRequest) =>
- request.post('/keys', data).then((r) => r.data),
+ create: (data: CreateTokenRequest, signal?: AbortSignal) =>
+ request.post('/keys', data, withSignal({}, signal)).then((r) => r.data),
- revoke: (id: string) =>
- request.delete(`/keys/${id}`).then((r) => r.data),
+ revoke: (id: string, signal?: AbortSignal) =>
+ request.delete(`/keys/${id}`, withSignal({}, signal)).then((r) => r.data),
}
diff --git a/admin-v2/src/services/auth.ts b/admin-v2/src/services/auth.ts
index e5fd875..0e92444 100644
--- a/admin-v2/src/services/auth.ts
+++ b/admin-v2/src/services/auth.ts
@@ -1,10 +1,10 @@
-import request from './request'
+import request, { withSignal } from './request'
import type { AccountPublic, LoginRequest, LoginResponse } from '@/types'
export const authService = {
- login: (data: LoginRequest) =>
- request.post('/auth/login', data).then((r) => r.data),
+ login: (data: LoginRequest, signal?: AbortSignal) =>
+ request.post('/auth/login', data, withSignal({}, signal)).then((r) => r.data),
- me: () =>
- request.get('/auth/me').then((r) => r.data),
+ me: (signal?: AbortSignal) =>
+ request.get('/auth/me', withSignal({}, signal)).then((r) => r.data),
}
diff --git a/admin-v2/src/services/config.ts b/admin-v2/src/services/config.ts
index 0783195..ed2f8af 100644
--- a/admin-v2/src/services/config.ts
+++ b/admin-v2/src/services/config.ts
@@ -1,11 +1,11 @@
-import request from './request'
+import request, { withSignal } from './request'
import type { ConfigItem, PaginatedResponse } from '@/types'
export const configService = {
- list: (params?: Record) =>
- request.get>('/config/items', { params })
+ list: (params?: Record, signal?: AbortSignal) =>
+ request.get>('/config/items', withSignal({ params }, signal))
.then((r) => r.data.items),
- update: (id: string, data: { value: string | number | boolean }) =>
- request.patch(`/config/items/${id}`, data).then((r) => r.data),
+ update: (id: string, data: { value: string | number | boolean }, signal?: AbortSignal) =>
+ request.patch(`/config/items/${id}`, data, withSignal({}, signal)).then((r) => r.data),
}
diff --git a/admin-v2/src/services/logs.ts b/admin-v2/src/services/logs.ts
index 3d5535f..3f2019f 100644
--- a/admin-v2/src/services/logs.ts
+++ b/admin-v2/src/services/logs.ts
@@ -1,7 +1,7 @@
-import request from './request'
+import request, { withSignal } from './request'
import type { OperationLog, PaginatedResponse } from '@/types'
export const logService = {
- list: (params?: Record) =>
- request.get>('/logs/operations', { params }).then((r) => r.data),
+ list: (params?: Record, signal?: AbortSignal) =>
+ request.get>('/logs/operations', withSignal({ params }, signal)).then((r) => r.data),
}
diff --git a/admin-v2/src/services/models.ts b/admin-v2/src/services/models.ts
index d79ca58..f77ac6a 100644
--- a/admin-v2/src/services/models.ts
+++ b/admin-v2/src/services/models.ts
@@ -1,16 +1,16 @@
-import request from './request'
+import request, { withSignal } from './request'
import type { Model, PaginatedResponse } from '@/types'
export const modelService = {
- list: (params?: Record) =>
- request.get>('/models', { params }).then((r) => r.data),
+ list: (params?: Record, signal?: AbortSignal) =>
+ request.get>('/models', withSignal({ params }, signal)).then((r) => r.data),
- create: (data: Partial>) =>
- request.post('/models', data).then((r) => r.data),
+ create: (data: Partial>, signal?: AbortSignal) =>
+ request.post('/models', data, withSignal({}, signal)).then((r) => r.data),
- update: (id: string, data: Partial>) =>
- request.patch(`/models/${id}`, data).then((r) => r.data),
+ update: (id: string, data: Partial>, signal?: AbortSignal) =>
+ request.patch(`/models/${id}`, data, withSignal({}, signal)).then((r) => r.data),
- delete: (id: string) =>
- request.delete(`/models/${id}`).then((r) => r.data),
+ delete: (id: string, signal?: AbortSignal) =>
+ request.delete(`/models/${id}`, withSignal({}, signal)).then((r) => r.data),
}
diff --git a/admin-v2/src/services/prompts.ts b/admin-v2/src/services/prompts.ts
index 9078b33..9dd13e9 100644
--- a/admin-v2/src/services/prompts.ts
+++ b/admin-v2/src/services/prompts.ts
@@ -1,35 +1,35 @@
-import request from './request'
+import request, { withSignal } from './request'
import type { PromptTemplate, PromptVersion, PaginatedResponse } from '@/types'
export const promptService = {
- list: (params?: Record) =>
- request.get>('/prompts', { params }).then((r) => r.data),
+ list: (params?: Record, signal?: AbortSignal) =>
+ request.get>('/prompts', withSignal({ params }, signal)).then((r) => r.data),
- get: (name: string) =>
- request.get(`/prompts/${encodeURIComponent(name)}`).then((r) => r.data),
+ get: (name: string, signal?: AbortSignal) =>
+ request.get(`/prompts/${encodeURIComponent(name)}`, withSignal({}, signal)).then((r) => r.data),
create: (data: {
name: string; category: string; description?: string; source?: string
system_prompt: string; user_prompt_template?: string
variables?: unknown[]; min_app_version?: string
- }) =>
- request.post('/prompts', data).then((r) => r.data),
+ }, signal?: AbortSignal) =>
+ request.post('/prompts', data, withSignal({}, signal)).then((r) => r.data),
- update: (name: string, data: { description?: string; status?: string }) =>
- request.put(`/prompts/${encodeURIComponent(name)}`, data).then((r) => r.data),
+ update: (name: string, data: { description?: string; status?: string }, signal?: AbortSignal) =>
+ request.put(`/prompts/${encodeURIComponent(name)}`, data, withSignal({}, signal)).then((r) => r.data),
- archive: (name: string) =>
- request.delete(`/prompts/${encodeURIComponent(name)}`).then((r) => r.data),
+ archive: (name: string, signal?: AbortSignal) =>
+ request.delete(`/prompts/${encodeURIComponent(name)}`, withSignal({}, signal)).then((r) => r.data),
- listVersions: (name: string) =>
- request.get(`/prompts/${encodeURIComponent(name)}/versions`).then((r) => r.data),
+ listVersions: (name: string, signal?: AbortSignal) =>
+ request.get(`/prompts/${encodeURIComponent(name)}/versions`, withSignal({}, signal)).then((r) => r.data),
createVersion: (name: string, data: {
system_prompt: string; user_prompt_template?: string
variables?: unknown[]; changelog?: string; min_app_version?: string
- }) =>
- request.post(`/prompts/${encodeURIComponent(name)}/versions`, data).then((r) => r.data),
+ }, signal?: AbortSignal) =>
+ request.post(`/prompts/${encodeURIComponent(name)}/versions`, data, withSignal({}, signal)).then((r) => r.data),
- rollback: (name: string, version: number) =>
- request.post(`/prompts/${encodeURIComponent(name)}/rollback/${version}`).then((r) => r.data),
+ rollback: (name: string, version: number, signal?: AbortSignal) =>
+ request.post(`/prompts/${encodeURIComponent(name)}/rollback/${version}`, undefined, withSignal({}, signal)).then((r) => r.data),
}
diff --git a/admin-v2/src/services/providers.ts b/admin-v2/src/services/providers.ts
index 73ac259..45470c5 100644
--- a/admin-v2/src/services/providers.ts
+++ b/admin-v2/src/services/providers.ts
@@ -1,31 +1,31 @@
-import request from './request'
+import request, { withSignal } from './request'
import type { Provider, ProviderKey, PaginatedResponse } from '@/types'
export const providerService = {
- list: (params?: Record) =>
- request.get>('/providers', { params }).then((r) => r.data),
+ list: (params?: Record, signal?: AbortSignal) =>
+ request.get>('/providers', withSignal({ params }, signal)).then((r) => r.data),
- create: (data: Partial>) =>
- request.post('/providers', data).then((r) => r.data),
+ create: (data: Partial>, signal?: AbortSignal) =>
+ request.post('/providers', data, withSignal({}, signal)).then((r) => r.data),
- update: (id: string, data: Partial>) =>
- request.patch(`/providers/${id}`, data).then((r) => r.data),
+ update: (id: string, data: Partial>, signal?: AbortSignal) =>
+ request.patch(`/providers/${id}`, data, withSignal({}, signal)).then((r) => r.data),
- delete: (id: string) =>
- request.delete(`/providers/${id}`).then((r) => r.data),
+ delete: (id: string, signal?: AbortSignal) =>
+ request.delete(`/providers/${id}`, withSignal({}, signal)).then((r) => r.data),
- listKeys: (providerId: string) =>
- request.get(`/providers/${providerId}/keys`).then((r) => r.data),
+ listKeys: (providerId: string, signal?: AbortSignal) =>
+ request.get(`/providers/${providerId}/keys`, withSignal({}, signal)).then((r) => r.data),
addKey: (providerId: string, data: {
key_label: string; key_value: string; priority?: number
max_rpm?: number; max_tpm?: number; quota_reset_interval?: string
- }) =>
- request.post<{ ok: boolean; key_id: string }>(`/providers/${providerId}/keys`, data).then((r) => r.data),
+ }, signal?: AbortSignal) =>
+ request.post<{ ok: boolean; key_id: string }>(`/providers/${providerId}/keys`, data, withSignal({}, signal)).then((r) => r.data),
- toggleKey: (providerId: string, keyId: string, active: boolean) =>
- request.put<{ ok: boolean }>(`/providers/${providerId}/keys/${keyId}/toggle`, { active }).then((r) => r.data),
+ toggleKey: (providerId: string, keyId: string, active: boolean, signal?: AbortSignal) =>
+ request.put<{ ok: boolean }>(`/providers/${providerId}/keys/${keyId}/toggle`, { active }, withSignal({}, signal)).then((r) => r.data),
- deleteKey: (providerId: string, keyId: string) =>
- request.delete<{ ok: boolean }>(`/providers/${providerId}/keys/${keyId}`).then((r) => r.data),
+ deleteKey: (providerId: string, keyId: string, signal?: AbortSignal) =>
+ request.delete<{ ok: boolean }>(`/providers/${providerId}/keys/${keyId}`, withSignal({}, signal)).then((r) => r.data),
}
diff --git a/admin-v2/src/services/relay.ts b/admin-v2/src/services/relay.ts
index 728d0a8..4fbe04b 100644
--- a/admin-v2/src/services/relay.ts
+++ b/admin-v2/src/services/relay.ts
@@ -1,10 +1,10 @@
-import request from './request'
+import request, { withSignal } from './request'
import type { RelayTask, PaginatedResponse } from '@/types'
export const relayService = {
- list: (params?: Record) =>
- request.get>('/relay/tasks', { params }).then((r) => r.data),
+ list: (params?: Record, signal?: AbortSignal) =>
+ request.get>('/relay/tasks', withSignal({ params }, signal)).then((r) => r.data),
- get: (id: string) =>
- request.get(`/relay/tasks/${id}`).then((r) => r.data),
+ get: (id: string, signal?: AbortSignal) =>
+ request.get(`/relay/tasks/${id}`, withSignal({}, signal)).then((r) => r.data),
}
diff --git a/admin-v2/src/services/request.ts b/admin-v2/src/services/request.ts
index 9bbad36..20bd489 100644
--- a/admin-v2/src/services/request.ts
+++ b/admin-v2/src/services/request.ts
@@ -4,6 +4,7 @@
import axios from 'axios'
import type { AxiosError, InternalAxiosRequestConfig } from 'axios'
+import type { AxiosRequestConfig } from 'axios'
import type { ApiError } from '@/types'
import { useAuthStore } from '@/stores/authStore'
@@ -106,3 +107,11 @@ request.interceptors.response.use(
)
export default request
+
+/** 将 AbortSignal 注入 Axios config,用于 TanStack Query 的请求取消 */
+export function withSignal(config: AxiosRequestConfig = {}, signal?: AbortSignal): AxiosRequestConfig {
+ if (signal) {
+ return { ...config, signal }
+ }
+ return config
+}
diff --git a/admin-v2/src/services/stats.ts b/admin-v2/src/services/stats.ts
index 63bd461..dca8c92 100644
--- a/admin-v2/src/services/stats.ts
+++ b/admin-v2/src/services/stats.ts
@@ -1,7 +1,7 @@
-import request from './request'
+import request, { withSignal } from './request'
import type { DashboardStats } from '@/types'
export const statsService = {
- dashboard: () =>
- request.get('/stats/dashboard').then((r) => r.data),
+ dashboard: (signal?: AbortSignal) =>
+ request.get('/stats/dashboard', withSignal({}, signal)).then((r) => r.data),
}
diff --git a/admin-v2/src/services/telemetry.ts b/admin-v2/src/services/telemetry.ts
index 5df2312..a94ca13 100644
--- a/admin-v2/src/services/telemetry.ts
+++ b/admin-v2/src/services/telemetry.ts
@@ -1,10 +1,10 @@
-import request from './request'
+import request, { withSignal } from './request'
import type { ModelUsageStat, DailyUsageStat } from '@/types'
export const telemetryService = {
- modelStats: (params?: Record) =>
- request.get('/telemetry/stats', { params }).then((r) => r.data),
+ modelStats: (params?: Record, signal?: AbortSignal) =>
+ request.get('/telemetry/stats', withSignal({ params }, signal)).then((r) => r.data),
- dailyStats: (params?: { days?: number }) =>
- request.get('/telemetry/daily', { params }).then((r) => r.data),
+ dailyStats: (params?: { days?: number }, signal?: AbortSignal) =>
+ request.get('/telemetry/daily', withSignal({ params }, signal)).then((r) => r.data),
}
diff --git a/admin-v2/src/services/usage.ts b/admin-v2/src/services/usage.ts
index b05f98d..1f85ccc 100644
--- a/admin-v2/src/services/usage.ts
+++ b/admin-v2/src/services/usage.ts
@@ -1,12 +1,12 @@
-import request from './request'
+import request, { withSignal } from './request'
import type { UsageRecord, UsageByModel } from '@/types'
export const usageService = {
- daily: (params?: { days?: number }) =>
- request.get<{ by_day: UsageRecord[] }>('/usage', { params: { ...params, group_by: 'day' } })
+ daily: (params?: { days?: number }, signal?: AbortSignal) =>
+ request.get<{ by_day: UsageRecord[] }>('/usage', withSignal({ params: { ...params, group_by: 'day' } }, signal))
.then((r) => r.data.by_day || []),
- byModel: (params?: { days?: number }) =>
- request.get<{ by_model: UsageByModel[] }>('/usage', { params: { ...params, group_by: 'model' } })
+ byModel: (params?: { days?: number }, signal?: AbortSignal) =>
+ request.get<{ by_model: UsageByModel[] }>('/usage', withSignal({ params: { ...params, group_by: 'model' } }, signal))
.then((r) => r.data.by_model || []),
}
diff --git a/admin-v2/vite.config.ts b/admin-v2/vite.config.ts
index 14f22f5..1534916 100644
--- a/admin-v2/vite.config.ts
+++ b/admin-v2/vite.config.ts
@@ -15,6 +15,16 @@ export default defineConfig({
'/api': {
target: 'http://localhost:8080',
changeOrigin: true,
+ timeout: 30_000,
+ proxyTimeout: 30_000,
+ configure: (proxy) => {
+ proxy.on('proxyReq', (proxyReq) => {
+ proxyReq.setTimeout(30_000)
+ })
+ proxy.on('proxyRes', (proxyRes) => {
+ proxyRes.setTimeout(30_000)
+ })
+ },
},
},
},
diff --git a/crates/zclaw-saas/Cargo.toml b/crates/zclaw-saas/Cargo.toml
index c22c303..a97446a 100644
--- a/crates/zclaw-saas/Cargo.toml
+++ b/crates/zclaw-saas/Cargo.toml
@@ -31,6 +31,7 @@ sha2 = { workspace = true }
rand = { workspace = true }
dashmap = { workspace = true }
hex = { workspace = true }
+socket2 = { workspace = true }
url = "2"
axum = { workspace = true }
diff --git a/crates/zclaw-saas/src/auth/totp.rs b/crates/zclaw-saas/src/auth/totp.rs
index 75cda66..2425257 100644
--- a/crates/zclaw-saas/src/auth/totp.rs
+++ b/crates/zclaw-saas/src/auth/totp.rs
@@ -148,6 +148,34 @@ pub async fn verify_totp(
return Err(SaasError::InvalidInput("TOTP 码必须是 6 位数字".into()));
}
+ // TOTP 暴力破解保护: 10 分钟内最多 5 次失败
+ const MAX_TOTP_FAILURES: u32 = 5;
+ const TOTP_LOCKOUT_SECS: u64 = 600;
+ let now = std::time::Instant::now();
+ let lockout_duration = std::time::Duration::from_secs(TOTP_LOCKOUT_SECS);
+
+ let is_locked = {
+ if let Some(entry) = state.totp_fail_counts.get(&ctx.account_id) {
+ let (count, first_fail) = entry.value();
+ if *count >= MAX_TOTP_FAILURES && now.duration_since(*first_fail) < lockout_duration {
+ true
+ } else {
+ // 窗口过期,重置
+ drop(entry);
+ state.totp_fail_counts.remove(&ctx.account_id);
+ false
+ }
+ } else {
+ false
+ }
+ };
+
+ if is_locked {
+ return Err(SaasError::RateLimited(
+ format!("TOTP 验证失败次数过多,请 {} 秒后重试", TOTP_LOCKOUT_SECS)
+ ));
+ }
+
// 获取存储的密钥
let (totp_secret,): (Option,) = sqlx::query_as(
"SELECT totp_secret FROM accounts WHERE id = $1"
@@ -172,9 +200,24 @@ pub async fn verify_totp(
};
if !verify_totp_code(&secret, code) {
+ // 记录失败次数
+ let new_count = {
+ let mut entry = state.totp_fail_counts
+ .entry(ctx.account_id.clone())
+ .or_insert((0, now));
+ entry.value_mut().0 += 1;
+ entry.value().0
+ };
+ tracing::warn!(
+ "TOTP verify failed for account {} ({}/{} attempts)",
+ ctx.account_id, new_count, MAX_TOTP_FAILURES
+ );
return Err(SaasError::Totp("TOTP 码验证失败".into()));
}
+ // 验证成功 → 清除失败计数
+ state.totp_fail_counts.remove(&ctx.account_id);
+
// 验证成功 → 启用 TOTP,同时确保密钥已加密
let final_secret = if encrypted_secret.starts_with(crypto::ENCRYPTED_PREFIX) {
encrypted_secret
@@ -183,10 +226,10 @@ pub async fn verify_totp(
encrypt_totp_secret(&secret, &enc_key)?
};
- let now = chrono::Utc::now().to_rfc3339();
+ let now_ts = chrono::Utc::now().to_rfc3339();
sqlx::query("UPDATE accounts SET totp_enabled = true, totp_secret = $1, updated_at = $2 WHERE id = $3")
.bind(&final_secret)
- .bind(&now)
+ .bind(&now_ts)
.bind(&ctx.account_id)
.execute(&state.db)
.await?;
diff --git a/crates/zclaw-saas/src/db.rs b/crates/zclaw-saas/src/db.rs
index 5a0b971..580eaf0 100644
--- a/crates/zclaw-saas/src/db.rs
+++ b/crates/zclaw-saas/src/db.rs
@@ -90,7 +90,7 @@ async fn run_migration_files(pool: &PgPool, dir: &std::path::Path) -> SaasResult
let filename = path.file_name().unwrap_or_default().to_string_lossy();
tracing::info!("Running migration: {}", filename);
let content = std::fs::read_to_string(path)?;
- for stmt in content.split(';') {
+ for stmt in split_sql_statements(&content) {
let trimmed = stmt.trim();
if !trimmed.is_empty() && !trimmed.starts_with("--") {
sqlx::query(trimmed).execute(pool).await?;
@@ -100,6 +100,150 @@ async fn run_migration_files(pool: &PgPool, dir: &std::path::Path) -> SaasResult
Ok(())
}
+/// 按语句分割 SQL 文件内容,正确处理:
+/// - 单引号字符串 `'...'`
+/// - 双引号标识符 `"..."`
+/// - 美元符号引用字符串 `$$...$$` 和 `$tag$...$tag$`
+/// - `--` 单行注释
+/// - `/* ... */` 块注释
+/// - `E'...'` 转义字符串
+fn split_sql_statements(sql: &str) -> Vec {
+ let mut statements = Vec::new();
+ let mut current = String::new();
+ let mut chars = sql.chars().peekable();
+
+ while let Some(ch) = chars.next() {
+ match ch {
+ '\'' => {
+ // 单引号字符串
+ current.push(ch);
+ loop {
+ match chars.next() {
+ Some('\'') => {
+ current.push('\'');
+ // 检查是否为转义引号 ''
+ if chars.peek() == Some(&'\'') {
+ current.push(chars.next().unwrap());
+ } else {
+ break;
+ }
+ }
+ Some(c) => current.push(c),
+ None => break,
+ }
+ }
+ }
+ '"' => {
+ // 双引号标识符
+ current.push(ch);
+ loop {
+ match chars.next() {
+ Some('"') => {
+ current.push('"');
+ break;
+ }
+ Some(c) => current.push(c),
+ None => break,
+ }
+ }
+ }
+ '-' if chars.peek() == Some(&'-') => {
+ // 单行注释: 跳过直到行尾
+ chars.next(); // consume second '-'
+ while let Some(&c) = chars.peek() {
+ if c == '\n' {
+ chars.next();
+ current.push(c);
+ break;
+ }
+ chars.next();
+ }
+ }
+ '/' if chars.peek() == Some(&'*') => {
+ // 块注释: 跳过直到 */
+ chars.next(); // consume '*'
+ current.push_str("/*");
+ let mut prev = ' ';
+ loop {
+ match chars.next() {
+ Some('/') if prev == '*' => {
+ current.push('/');
+ break;
+ }
+ Some(c) => {
+ current.push(c);
+ prev = c;
+ }
+ None => break,
+ }
+ }
+ }
+ '$' => {
+ // 美元符号引用: $$ 或 $tag$ ... $tag$
+ current.push(ch);
+ // 读取 tag (字母数字和下划线)
+ let mut tag = String::new();
+ while let Some(&c) = chars.peek() {
+ if c == '$' || c.is_alphanumeric() || c == '_' {
+ if c == '$' {
+ chars.next();
+ current.push(c);
+ break;
+ }
+ chars.next();
+ tag.push(c);
+ current.push(c);
+ } else {
+ break;
+ }
+ }
+ // 如果 tag 为空,就是 $$ 格式
+ let end_marker = if tag.is_empty() {
+ "$$".to_string()
+ } else {
+ format!("${}$", tag)
+ };
+ // 读取直到遇到 end_marker
+ let mut buf = String::new();
+ loop {
+ match chars.next() {
+ Some(c) => {
+ current.push(c);
+ buf.push(c);
+ if buf.len() > end_marker.len() {
+ buf.remove(0);
+ }
+ if buf == end_marker {
+ break;
+ }
+ }
+ None => break,
+ }
+ }
+ }
+ ';' => {
+ // 语句结束
+ let trimmed = current.trim().to_string();
+ if !trimmed.is_empty() {
+ statements.push(trimmed);
+ }
+ current.clear();
+ }
+ _ => {
+ current.push(ch);
+ }
+ }
+ }
+
+ // 最后一条语句 (可能不以分号结尾)
+ let trimmed = current.trim().to_string();
+ if !trimmed.is_empty() {
+ statements.push(trimmed);
+ }
+
+ statements
+}
+
/// Seed 角色数据
async fn seed_roles(pool: &PgPool) -> SaasResult<()> {
let now = chrono::Utc::now().to_rfc3339();
diff --git a/crates/zclaw-saas/src/main.rs b/crates/zclaw-saas/src/main.rs
index 12ebb4f..c0fd4cb 100644
--- a/crates/zclaw-saas/src/main.rs
+++ b/crates/zclaw-saas/src/main.rs
@@ -67,7 +67,9 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}
-async fn health_handler(State(state): State) -> axum::Json {
+async fn health_handler(
+ State(state): State,
+) -> (axum::http::StatusCode, axum::Json ) {
// health 必须独立快速返回,用 3s 超时避免连接池满时阻塞
let db_healthy = tokio::time::timeout(
std::time::Duration::from_secs(3),
@@ -77,15 +79,41 @@ async fn health_handler(State(state): State) -> axum::Json= 80% 返回 503 (degraded)
+ let pool = &state.db;
+ let total = pool.options().get_max_connections() as usize;
+ if total > 0 {
+ let idle = pool.num_idle() as usize;
+ let used = total - idle;
+ let ratio = used * 100 / total;
+ if ratio >= 80 {
+ return (
+ axum::http::StatusCode::SERVICE_UNAVAILABLE,
+ axum::Json(serde_json::json!({
+ "status": "degraded",
+ "database": true,
+ "database_pool": {
+ "usage_pct": ratio,
+ "used": used,
+ "total": total,
+ },
+ "timestamp": chrono::Utc::now().to_rfc3339(),
+ "version": env!("CARGO_PKG_VERSION"),
+ })),
+ );
+ }
+ }
- axum::Json(serde_json::json!({
+ let status = if db_healthy { "healthy" } else { "degraded" };
+ let code = if db_healthy {
+ axum::http::StatusCode::OK } else { axum::http::StatusCode::SERVICE_UNAVAILABLE };
+
+ (code, axum::Json(serde_json::json!({
"status": status,
"database": db_healthy,
"timestamp": chrono::Utc::now().to_rfc3339(),
"version": env!("CARGO_PKG_VERSION"),
- }))
+ })))
}
async fn build_router(state: AppState) -> axum::Router {
diff --git a/crates/zclaw-saas/src/relay/key_pool.rs b/crates/zclaw-saas/src/relay/key_pool.rs
index cfe5c7f..bfbab7d 100644
--- a/crates/zclaw-saas/src/relay/key_pool.rs
+++ b/crates/zclaw-saas/src/relay/key_pool.rs
@@ -4,7 +4,7 @@
use sqlx::PgPool;
use crate::error::{SaasError, SaasResult};
-use crate::models::{ProviderKeySelectRow, ProviderKeyRow};
+ use crate::models::ProviderKeyRow;
use crate::crypto;
/// 解密 key_value (如果已加密),否则原样返回
@@ -36,19 +36,63 @@ pub struct KeySelection {
}
/// 从 provider 的 Key Pool 中选择最佳可用 Key
+///
+/// 优化: 单次 JOIN 查询获取 Key + 当前分钟使用量,避免 N+1 查询
pub async fn select_best_key(db: &PgPool, provider_id: &str, enc_key: &[u8; 32]) -> SaasResult {
let now = chrono::Utc::now().to_rfc3339();
let current_minute = chrono::Utc::now().format("%Y-%m-%dT%H:%M").to_string();
- // 获取所有活跃 Key
- let rows: Vec =
+ // 单次查询: 活跃 Key + 当前分钟的 RPM/TPM 使用量 (LEFT JOIN)
+ let rows: Vec<(String, String, i32, Option, Option, Option, Option, Option)> =
sqlx::query_as(
- "SELECT id, key_value, priority, max_rpm, max_tpm, quota_reset_interval
- FROM provider_keys
- WHERE provider_id = $1 AND is_active = TRUE AND (cooldown_until IS NULL OR cooldown_until <= $2)
- ORDER BY priority ASC"
- ).bind(provider_id).bind(&now).fetch_all(db).await?;
+ "SELECT pk.id, pk.key_value, pk.priority, pk.max_rpm, pk.max_tpm, pk.quota_reset_interval,
+ uw.request_count, uw.token_count
+ FROM provider_keys pk
+ LEFT JOIN key_usage_window uw ON pk.id = uw.key_id AND uw.window_minute = $1
+ WHERE pk.provider_id = $2 AND pk.is_active = TRUE
+ AND (pk.cooldown_until IS NULL OR pk.cooldown_until <= $3)
+ ORDER BY pk.priority ASC"
+ ).bind(¤t_minute).bind(provider_id).bind(&now).fetch_all(db).await?;
+ for (id, key_value, priority, max_rpm, max_tpm, quota_reset_interval, req_count, token_count) in &rows {
+ // RPM 检查
+ if let Some(rpm_limit) = max_rpm {
+ if *rpm_limit > 0 {
+ let count = req_count.unwrap_or(0);
+ if count >= *rpm_limit {
+ tracing::debug!("Key {} hit RPM limit ({}/{})", id, count, rpm_limit);
+ continue;
+ }
+ }
+ }
+
+ // TPM 检查
+ if let Some(tpm_limit) = max_tpm {
+ if *tpm_limit > 0 {
+ let tokens = token_count.unwrap_or(0);
+ if tokens >= *tpm_limit {
+ tracing::debug!("Key {} hit TPM limit ({}/{})", id, tokens, tpm_limit);
+ continue;
+ }
+ }
+ }
+
+ // 此 Key 可用 — 解密 key_value
+ let decrypted_kv = decrypt_key_value(key_value, enc_key)?;
+ return Ok(KeySelection {
+ key: PoolKey {
+ id: id.clone(),
+ key_value: decrypted_kv,
+ priority: *priority,
+ max_rpm: *max_rpm,
+ max_tpm: *max_tpm,
+ quota_reset_interval: quota_reset_interval.clone(),
+ },
+ key_id: id.clone(),
+ });
+ }
+
+ // 所有 Key 都超限或无 Key
if rows.is_empty() {
// 检查是否有冷却中的 Key,返回预计等待时间
let cooldown_row: Option<(String,)> = sqlx::query_as(
@@ -59,88 +103,14 @@ pub async fn select_best_key(db: &PgPool, provider_id: &str, enc_key: &[u8; 32])
).bind(provider_id).bind(&now).fetch_optional(db).await?;
if let Some((earliest,)) = cooldown_row {
- // 尝试解析时间差
let wait_secs = parse_cooldown_remaining(&earliest, &now);
return Err(SaasError::RateLimited(
format!("所有 Key 均在冷却中,预计 {} 秒后可用", wait_secs)
));
}
-
- // 检查 provider 级别的单 Key
- let provider_key: Option = sqlx::query_scalar(
- "SELECT api_key FROM providers WHERE id = $1"
- ).bind(provider_id).fetch_optional(db).await?.flatten();
-
- if let Some(key) = provider_key {
- let decrypted = decrypt_key_value(&key, enc_key)?;
- return Ok(KeySelection {
- key: PoolKey {
- id: "provider-fallback".to_string(),
- key_value: decrypted,
- priority: 0,
- max_rpm: None,
- max_tpm: None,
- quota_reset_interval: None,
- },
- key_id: "provider-fallback".to_string(),
- });
- }
-
- return Err(SaasError::NotFound(format!("Provider {} 没有可用的 API Key", provider_id)));
}
- // 检查滑动窗口使用量
- for row in rows {
- // 检查 RPM 限额
- if let Some(rpm_limit) = row.max_rpm {
- if rpm_limit > 0 {
- let window: Option<(i64,)> = sqlx::query_as(
- "SELECT COALESCE(SUM(request_count), 0) FROM key_usage_window
- WHERE key_id = $1 AND window_minute = $2"
- ).bind(&row.id).bind(¤t_minute).fetch_optional(db).await?;
-
- if let Some((count,)) = window {
- if count >= rpm_limit {
- tracing::debug!("Key {} hit RPM limit ({}/{})", row.id, count, rpm_limit);
- continue;
- }
- }
- }
- }
-
- // 检查 TPM 限额
- if let Some(tpm_limit) = row.max_tpm {
- if tpm_limit > 0 {
- let window: Option<(i64,)> = sqlx::query_as(
- "SELECT COALESCE(SUM(token_count), 0) FROM key_usage_window
- WHERE key_id = $1 AND window_minute = $2"
- ).bind(&row.id).bind(¤t_minute).fetch_optional(db).await?;
-
- if let Some((tokens,)) = window {
- if tokens >= tpm_limit {
- tracing::debug!("Key {} hit TPM limit ({}/{})", row.id, tokens, tpm_limit);
- continue;
- }
- }
- }
- }
-
- // 此 Key 可用 — 解密 key_value
- let decrypted_kv = decrypt_key_value(&row.key_value, enc_key)?;
- return Ok(KeySelection {
- key: PoolKey {
- id: row.id.clone(),
- key_value: decrypted_kv,
- priority: row.priority,
- max_rpm: row.max_rpm,
- max_tpm: row.max_tpm,
- quota_reset_interval: row.quota_reset_interval,
- },
- key_id: row.id,
- });
- }
-
- // 所有 Key 都超限,回退到 provider 单 Key
+ // 回退到 provider 单 Key
let provider_key: Option = sqlx::query_scalar(
"SELECT api_key FROM providers WHERE id = $1"
).bind(provider_id).fetch_optional(db).await?.flatten();
@@ -160,9 +130,13 @@ pub async fn select_best_key(db: &PgPool, provider_id: &str, enc_key: &[u8; 32])
});
}
- Err(SaasError::RateLimited(
- format!("Provider {} 所有 Key 均已达限额", provider_id)
- ))
+ if rows.is_empty() {
+ Err(SaasError::NotFound(format!("Provider {} 没有可用的 API Key", provider_id)))
+ } else {
+ Err(SaasError::RateLimited(
+ format!("Provider {} 所有 Key 均已达限额", provider_id)
+ ))
+ }
}
/// 记录 Key 使用量(滑动窗口)
diff --git a/crates/zclaw-saas/src/relay/service.rs b/crates/zclaw-saas/src/relay/service.rs
index 4289059..7e65369 100644
--- a/crates/zclaw-saas/src/relay/service.rs
+++ b/crates/zclaw-saas/src/relay/service.rs
@@ -298,7 +298,21 @@ pub async fn execute_relay(
let body = axum::body::Body::from_stream(body_stream);
// SSE 流结束后异步记录 usage + Key 使用量
+ // 使用全局 Arc 限制并发 spawned tasks,防止高并发时耗尽连接池
+ static SSE_SPAWN_SEMAPHORE: std::sync::OnceLock> = std::sync::OnceLock::new();
+ let semaphore = SSE_SPAWN_SEMAPHORE.get_or_init(|| Arc::new(tokio::sync::Semaphore::new(16)));
+ let permit = match semaphore.clone().try_acquire_owned() {
+ Ok(p) => p,
+ Err(_) => {
+ // 信号量满时跳过 usage 记录,流本身不受影响
+ tracing::warn!("SSE usage spawn at capacity, skipping usage record for task {}", task_id);
+ return Ok(RelayResponse::Sse(body));
+ }
+ };
+
tokio::spawn(async move {
+ let _permit = permit; // 持有 permit 直到任务完成
+ tokio::time::sleep(std::time::Duration::from_secs(3)).await;
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
let capture = usage_capture.lock().await;
let (input, output) = (
@@ -464,11 +478,11 @@ async fn validate_provider_url(url: &str) -> SaasResult<()> {
// 去除 IPv6 方括号
let host = host.trim_start_matches('[').trim_end_matches(']');
- // 精确匹配的阻止列表
+ // 精确匹配的阻止列表: 仅包含主机名和特殊域名
+ // 私有 IP 范围 (10.x, 172.16-31.x, 192.168.x, 127.x, 169.254.x, ::1 等)
+ // 由 is_private_ip() 统一判断,无需在此重复列出
let blocked_exact = [
- "127.0.0.1", "0.0.0.0", "localhost", "::1", "::ffff:127.0.0.1",
- "0:0:0:0:0:ffff:7f00:1", "169.254.169.254", "metadata.google.internal",
- "10.0.0.1", "172.16.0.1", "192.168.0.1",
+ "localhost", "metadata.google.internal",
];
if blocked_exact.contains(&host) {
return Err(SaasError::InvalidInput(format!("provider URL 指向禁止的内网地址: {}", host)));
diff --git a/crates/zclaw-saas/src/state.rs b/crates/zclaw-saas/src/state.rs
index d5d090a..b27032c 100644
--- a/crates/zclaw-saas/src/state.rs
+++ b/crates/zclaw-saas/src/state.rs
@@ -21,6 +21,8 @@ pub struct AppState {
pub rate_limit_entries: Arc>>,
/// 角色权限缓存: role_id → permissions list
pub role_permissions_cache: Arc>>,
+ /// TOTP 失败计数: account_id → (失败次数, 首次失败时间)
+ pub totp_fail_counts: Arc>,
/// 无锁 rate limit RPM(从 config 同步,避免每个请求获取 RwLock)
rate_limit_rpm: Arc,
/// Worker 调度器 (异步后台任务)
@@ -37,6 +39,7 @@ impl AppState {
jwt_secret,
rate_limit_entries: Arc::new(dashmap::DashMap::new()),
role_permissions_cache: Arc::new(dashmap::DashMap::new()),
+ totp_fail_counts: Arc::new(dashmap::DashMap::new()),
rate_limit_rpm: Arc::new(AtomicU32::new(rpm)),
worker_dispatcher,
})
diff --git a/crates/zclaw-saas/src/workers/mod.rs b/crates/zclaw-saas/src/workers/mod.rs
index e49da9c..6af7383 100644
--- a/crates/zclaw-saas/src/workers/mod.rs
+++ b/crates/zclaw-saas/src/workers/mod.rs
@@ -155,6 +155,7 @@ impl WorkerDispatcher {
fn start_consumer(&self, mut receiver: mpsc::Receiver) {
let db = self.db.clone();
let handlers = self.handlers.clone();
+ let sender = self.sender.clone();
tokio::spawn(async move {
while let Some(msg) = receiver.recv().await {
@@ -169,6 +170,7 @@ impl WorkerDispatcher {
let worker_name = msg.worker_name.clone();
let max_retries = handler.max_retries();
let db = db.clone();
+ let sender = sender.clone();
tokio::spawn(async move {
match handler.perform(&db, &msg.args_json).await {
@@ -177,18 +179,27 @@ impl WorkerDispatcher {
}
Err(e) => {
if msg.attempt < max_retries {
- tracing::warn!(
- "Worker {} failed (attempt {}/{}): {}. Will retry.",
- worker_name, msg.attempt, max_retries, e
- );
- // 简单退避: 2^attempt 秒
let delay = std::time::Duration::from_secs(1 << msg.attempt.min(4));
+ tracing::warn!(
+ "Worker {} failed (attempt {}/{}): {}. Re-queuing after {:?}.",
+ worker_name, msg.attempt, max_retries, e, delay
+ );
tokio::time::sleep(delay).await;
- // 注意: 重试在当前设计中通过日志提醒
- // 生产环境应将任务重新入队
+ // 重新入队(递增 attempt 计数)
+ let retry_msg = TaskMessage {
+ worker_name: msg.worker_name.clone(),
+ args_json: msg.args_json.clone(),
+ attempt: msg.attempt + 1,
+ };
+ if let Err(send_err) = sender.send(retry_msg).await {
+ tracing::error!(
+ "Worker {} retry enqueue failed (channel closed): {}",
+ worker_name, send_err
+ );
+ }
} else {
tracing::error!(
- "Worker {} failed after {} attempts: {}",
+ "Worker {} failed after {} attempts: {}. Giving up.",
worker_name, max_retries, e
);
}
diff --git a/desktop/src/components/PipelineResultPreview.tsx b/desktop/src/components/PipelineResultPreview.tsx
index 08e9ada..9f1c787 100644
--- a/desktop/src/components/PipelineResultPreview.tsx
+++ b/desktop/src/components/PipelineResultPreview.tsx
@@ -186,7 +186,7 @@ function MarkdownPreview({ content }: MarkdownPreviewProps) {
return (
);
}