Files
zclaw_openfang/desktop/src/lib/saas-relay-client.ts
iven a504a40395
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
fix: 7 项 E2E Bug 修复 — Dashboard 404 / 记忆去重 / 记忆注入 / invoice_id / Prompt 版本
P0:
- BUG-H1: Dashboard 路由 /api/v1/stats/dashboard → /api/v1/admin/dashboard

P1:
- BUG-H2: viking_add 预检查 content_hash 去重,返回 "deduped" 状态;SqliteStorage 启动时回填已有条目 content_hash
- BUG-M5: saas-relay-client 发送前调用 viking_inject_prompt 注入跨会话记忆

P2:
- BUG-M1: PaymentResult 添加 invoice_id 字段,query_payment_status 返回 invoice_id
- BUG-M2: UpdatePromptRequest 添加内容字段,更新时自动创建新版本并递增 current_version
- BUG-M3: viking_find scope 参数文档化(设计行为,调用方需传 agent scope)
- BUG-M4: Dashboard 路由缺失已修复,handler 层 require_admin 已正确返回 403

P3 (确认已修复/非代码问题):
- BUG-L1: pain_seed_categories 已统一,无 pain_seeds 残留
- BUG-L2: pipeline_create 参数格式正确,E2E 测试方法问题
2026-04-17 03:31:06 +08:00

441 lines
15 KiB
TypeScript

/**
* SaaS Relay Gateway Client
*
* A lightweight GatewayClient-compatible adapter for browser-only mode.
* Routes agent listing through SaaS agent-templates, Converts
* chatStream() to OpenAI SSE streaming via SaaS relay.
*
* Used in connectionStore when running in a browser (non-Tauri) with
* SaaS relay connection mode.
*/
import type { GatewayClient } from './gateway-client';
import { saasClient } from './saas-client';
import type { AgentTemplateAvailable } from './saas-types';
import { createLogger } from './logger';
const log = createLogger('SaaSRelayGateway');
// ---------------------------------------------------------------------------
// Memory injection helper — injects relevant memories into system prompt
// before sending to SaaS relay (mirrors MemoryMiddleware in Tauri kernel path)
// ---------------------------------------------------------------------------
/**
* Attempt to inject relevant memories into the system prompt via Tauri IPC.
* Falls back gracefully in non-Tauri contexts (browser mode).
*/
async function injectMemories(
agentId: string | undefined,
basePrompt: string,
userInput: string,
): Promise<string> {
try {
// Dynamic import — only available in Tauri context
const { invoke } = await import('@tauri-apps/api/core');
const enhanced = await invoke<string>('viking_inject_prompt', {
agentId: agentId ?? 'default',
basePrompt,
userInput,
maxTokens: 500,
});
if (enhanced && enhanced !== basePrompt) {
log.debug('Memory injection succeeded for relay request');
return enhanced;
}
} catch {
// Non-Tauri context or viking not initialized — skip silently
}
return basePrompt;
}
// ---------------------------------------------------------------------------
// Frontend DataMasking — mirrors Rust DataMasking middleware for SaaS Relay
// ---------------------------------------------------------------------------
const MASK_PATTERNS: RegExp[] = [
/\b\d{17}[\dXx]\b/g, // ID card
/1[3-9]\d-?\d{4}-?\d{4}/g, // Phone
/[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}/g, // Email
/[¥¥$]\s*[\d,.]+[万亿]?元?|[\d,.]+[万亿]元/g, // Money
/[^\s]{1,20}(?:公司|厂|集团|工作室|商行|有限|股份)/g, // Company
];
let maskCounter = 0;
const entityMap = new Map<string, string>();
/** Mask sensitive entities in text before sending to SaaS relay. */
function maskSensitiveData(text: string): string {
const entities: { text: string; token: string }[] = [];
for (const pattern of MASK_PATTERNS) {
pattern.lastIndex = 0;
let match: RegExpExecArray | null;
while ((match = pattern.exec(text)) !== null) {
const entity = match[0];
if (!entityMap.has(entity)) {
maskCounter++;
entityMap.set(entity, `__ENTITY_${maskCounter}__`);
}
entities.push({ text: entity, token: entityMap.get(entity)! });
}
}
// Sort by length descending to replace longest entities first
entities.sort((a, b) => b.text.length - a.text.length);
let result = text;
for (const { text: entity, token } of entities) {
result = result.split(entity).join(token);
}
return result;
}
/** Restore masked tokens in AI response back to original entities. */
function unmaskSensitiveData(text: string): string {
let result = text;
for (const [entity, token] of entityMap) {
result = result.split(token).join(entity);
}
return result;
}
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
interface CloneInfo {
id: string;
name: string;
role?: string;
nickname?: string;
emoji?: string;
personality?: string;
scenarios?: string[];
model?: string;
status?: string;
templateId?: string;
}
// ---------------------------------------------------------------------------
// Implementation
// ---------------------------------------------------------------------------
/**
* Create a GatewayClient-compatible object that routes through SaaS APIs.
* Only the methods needed by the stores are implemented; others return
* sensible defaults.
*/
export function createSaaSRelayGatewayClient(
_saasUrl: string,
getModel: () => string,
): GatewayClient {
// saasUrl preserved for future direct API routing (currently routed through saasClient singleton)
void _saasUrl;
// Local in-memory agent registry
const agents = new Map<string, CloneInfo>();
let defaultAgentId: string | null = null;
// -----------------------------------------------------------------------
// Helper: list agents as clones
// -----------------------------------------------------------------------
async function listClones(): Promise<{ clones: CloneInfo[] }> {
try {
const templates: AgentTemplateAvailable[] = await saasClient.fetchAvailableTemplates();
const clones: CloneInfo[] = templates.map((t) => {
const id = t.id || `agent-${t.name}`;
const clone: CloneInfo = {
id,
name: t.name,
role: t.description || t.category,
emoji: t.emoji,
personality: t.category,
scenarios: [],
model: getModel(),
status: 'active',
templateId: t.id,
};
agents.set(id, clone);
return clone;
});
// Set first as default
if (clones.length > 0 && !defaultAgentId) {
defaultAgentId = clones[0].id;
}
return { clones };
} catch (err) {
log.warn('Failed to list templates', err);
return { clones: [] };
}
}
// -----------------------------------------------------------------------
// Helper: OpenAI SSE streaming via SaaS relay
// -----------------------------------------------------------------------
// AbortController for cancelling active streams
let activeAbortController: AbortController | null = null;
async function chatStream(
message: string,
callbacks: {
onDelta: (delta: string) => void;
onThinkingDelta?: (delta: string) => void;
onTool?: (tool: string, input: string, output: string) => void;
onHand?: (name: string, status: string, result?: unknown) => void;
onComplete: (inputTokens?: number, outputTokens?: number) => void;
onError: (error: string) => void;
},
opts?: {
sessionKey?: string;
agentId?: string;
thinking_enabled?: boolean;
reasoning_effort?: string;
plan_mode?: boolean;
subagent_enabled?: boolean;
history?: Array<{ role: string; content: string }>;
},
): Promise<{ runId: string }> {
const runId = `run_${Date.now()}`;
const abortController = new AbortController();
activeAbortController = abortController;
const aborted = () => abortController.signal.aborted;
try {
// Build messages array: use history if available, fallback to current message only
// Apply DataMasking to protect sensitive data before sending to relay
const history = opts?.history || [];
const maskedMessage = maskSensitiveData(message);
const messages = history.length > 0
? [...history, { role: 'user' as const, content: maskedMessage }]
: [{ role: 'user' as const, content: maskedMessage }];
// BUG-M5 fix: Inject relevant memories into system prompt via Tauri IPC.
// This mirrors the MemoryMiddleware that runs in the kernel path.
const enhancedSystemPrompt = await injectMemories(
opts?.agentId,
'',
message,
);
if (enhancedSystemPrompt) {
messages.unshift({ role: 'system', content: enhancedSystemPrompt });
}
const model = getModel();
if (!model) {
callbacks.onError('No model available — please check SaaS relay configuration');
callbacks.onComplete();
return { runId };
}
const body: Record<string, unknown> = {
model,
messages,
stream: true,
};
// P3-06: Pass sessionKey/agentId to relay for session continuity
if (opts?.sessionKey) body['session_key'] = opts.sessionKey;
if (opts?.agentId) body['agent_id'] = opts.agentId;
if (opts?.thinking_enabled) body['thinking_enabled'] = true;
if (opts?.reasoning_effort) body['reasoning_effort'] = opts.reasoning_effort;
if (opts?.plan_mode) body['plan_mode'] = true;
if (opts?.subagent_enabled) body['subagent_enabled'] = true;
const response = await saasClient.chatCompletion(body, abortController.signal);
if (!response.ok) {
const errText = await response.text().catch(() => '');
callbacks.onError(`Relay error: ${response.status} ${errText}`);
callbacks.onComplete();
return { runId };
}
// Parse SSE stream
const reader = response.body?.getReader();
if (!reader) {
callbacks.onError('No response body');
callbacks.onComplete();
return { runId };
}
const decoder = new TextDecoder();
let buffer = '';
while (!aborted()) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Normalize CRLF to LF for SSE spec compliance
buffer = buffer.replace(/\r\n/g, '\n');
// Optimized SSE parsing: split by double-newline (event boundaries)
let boundary: number;
while ((boundary = buffer.indexOf('\n\n')) !== -1) {
const eventBlock = buffer.slice(0, boundary);
buffer = buffer.slice(boundary + 2);
// Process each line in the event block
const lines = eventBlock.split('\n');
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const data = line.slice(6).trim();
if (data === '[DONE]') continue;
try {
const parsed = JSON.parse(data);
// Handle SSE error events from relay (e.g. stream_timeout)
if (parsed.error) {
const errMsg = parsed.message || parsed.error || 'Unknown stream error';
log.warn('SSE stream error:', errMsg);
callbacks.onError(errMsg);
callbacks.onComplete();
return { runId };
}
const choices = parsed.choices?.[0];
if (!choices) continue;
const delta = choices.delta;
// Handle thinking/reasoning content
if (delta?.reasoning_content) {
callbacks.onThinkingDelta?.(delta.reasoning_content);
}
// Handle regular content — unmask tokens so user sees original entities
if (delta?.content) {
callbacks.onDelta(unmaskSensitiveData(delta.content));
}
// Check for completion
if (choices.finish_reason) {
const usage = parsed.usage;
callbacks.onComplete(
usage?.prompt_tokens,
usage?.completion_tokens,
);
return { runId };
}
} catch {
// Skip malformed SSE lines
}
}
}
}
// If aborted, cancel the reader
if (aborted()) {
try { reader.cancel(); } catch { /* already closed */ }
}
// Stream ended without explicit finish_reason
callbacks.onComplete();
} catch (err) {
if (aborted()) {
// Cancelled by user — don't report as error
callbacks.onComplete();
return { runId };
}
const msg = err instanceof Error ? err.message : String(err);
callbacks.onError(msg);
callbacks.onComplete();
} finally {
if (activeAbortController === abortController) {
activeAbortController = null;
}
}
return { runId };
}
// -----------------------------------------------------------------------
// Build the client object with GatewayClient-compatible shape
// -----------------------------------------------------------------------
return {
// --- Connection ---
connect: async () => { log.debug('SaaS relay client connect'); },
disconnect: async () => {},
getState: () => 'connected' as const,
onStateChange: undefined,
onLog: undefined,
// --- Agents (Clones) ---
listClones,
createClone: async (opts: Record<string, unknown>) => {
const id = `agent-${Date.now()}`;
const clone: CloneInfo = {
id,
name: (opts.name as string) || 'New Agent',
role: opts.role as string,
nickname: opts.nickname as string,
emoji: opts.emoji as string,
model: getModel(),
status: 'active',
};
agents.set(id, clone);
if (!defaultAgentId) defaultAgentId = id;
return { clone };
},
updateClone: async (id: string, updates: Record<string, unknown>) => {
const existing = agents.get(id);
if (existing) agents.set(id, { ...existing, ...updates });
return { clone: agents.get(id) };
},
deleteClone: async (id: string) => {
agents.delete(id);
if (defaultAgentId === id) defaultAgentId = null;
},
getDefaultAgentId: () => defaultAgentId,
setDefaultAgentId: (id: string) => { defaultAgentId = id; },
// --- Chat ---
chatStream,
cancelStream: () => {
if (activeAbortController) {
activeAbortController.abort();
activeAbortController = null;
log.info('SSE stream cancelled by user');
}
},
// --- Hands ---
listHands: async () => ({ hands: [] }),
getHand: async () => null,
triggerHand: async () => ({ runId: `hand_${Date.now()}`, status: 'completed' }),
// --- Skills ---
listSkills: async () => ({ skills: [] }),
getSkill: async () => null,
createSkill: async () => null,
updateSkill: async () => null,
deleteSkill: async () => {},
// --- Config ---
getQuickConfig: async () => ({}),
saveQuickConfig: async () => {},
getWorkspaceInfo: async () => null,
// --- Health ---
health: async () => ({ status: 'ok', mode: 'saas-relay' }),
status: async () => ({ version: 'saas-relay', mode: 'browser' }),
// --- Usage ---
getUsageStats: async () => null,
getSessionStats: async () => null,
// --- REST helpers (not used in browser mode) ---
restGet: async () => { throw new Error('REST not available in browser mode'); },
restPost: async () => { throw new Error('REST not available in browser mode'); },
restPut: async () => { throw new Error('REST not available in browser mode'); },
restDelete: async () => { throw new Error('REST not available in browser mode'); },
restPatch: async () => { throw new Error('REST not available in browser mode'); },
} as unknown as GatewayClient;
}