Files
zclaw_openfang/desktop/src/lib/saas-relay-client.ts
iven fa5ab4e161
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
refactor(middleware): 移除数据脱敏中间件及相关代码
移除不再使用的数据脱敏功能,包括:
1. 删除data_masking模块
2. 清理loop_runner中的unmask逻辑
3. 移除前端saas-relay-client.ts中的mask/unmask实现
4. 更新中间件层数从15层降为14层
5. 同步更新相关文档(CLAUDE.md、TRUTH.md、wiki等)

此次变更简化了系统架构,移除了不再需要的敏感数据处理逻辑。所有相关测试证据和截图已归档。
2026-04-22 19:19:07 +08:00

388 lines
13 KiB
TypeScript

/**
* SaaS Relay Gateway Client
*
* A lightweight GatewayClient-compatible adapter for browser-only mode.
* Routes agent listing through SaaS agent-templates, Converts
* chatStream() to OpenAI SSE streaming via SaaS relay.
*
* Used in connectionStore when running in a browser (non-Tauri) with
* SaaS relay connection mode.
*/
import type { GatewayClient } from './gateway-client';
import { saasClient } from './saas-client';
import type { AgentTemplateAvailable } from './saas-types';
import { createLogger } from './logger';
const log = createLogger('SaaSRelayGateway');
// ---------------------------------------------------------------------------
// Memory injection helper — injects relevant memories into system prompt
// before sending to SaaS relay (mirrors MemoryMiddleware in Tauri kernel path)
// ---------------------------------------------------------------------------
/**
* Attempt to inject relevant memories into the system prompt via Tauri IPC.
* Falls back gracefully in non-Tauri contexts (browser mode).
*/
async function injectMemories(
agentId: string | undefined,
basePrompt: string,
userInput: string,
): Promise<string> {
try {
// Dynamic import — only available in Tauri context
const { invoke } = await import('@tauri-apps/api/core');
const enhanced = await invoke<string>('viking_inject_prompt', {
agentId: agentId ?? 'default',
basePrompt,
userInput,
maxTokens: 500,
});
if (enhanced && enhanced !== basePrompt) {
log.debug('Memory injection succeeded for relay request');
return enhanced;
}
} catch {
// Non-Tauri context or viking not initialized — skip silently
}
return basePrompt;
}
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
interface CloneInfo {
id: string;
name: string;
role?: string;
nickname?: string;
emoji?: string;
personality?: string;
scenarios?: string[];
model?: string;
status?: string;
templateId?: string;
}
// ---------------------------------------------------------------------------
// Implementation
// ---------------------------------------------------------------------------
/**
* Create a GatewayClient-compatible object that routes through SaaS APIs.
* Only the methods needed by the stores are implemented; others return
* sensible defaults.
*/
export function createSaaSRelayGatewayClient(
_saasUrl: string,
getModel: () => string,
): GatewayClient {
// saasUrl preserved for future direct API routing (currently routed through saasClient singleton)
void _saasUrl;
// Local in-memory agent registry
const agents = new Map<string, CloneInfo>();
let defaultAgentId: string | null = null;
// -----------------------------------------------------------------------
// Helper: list agents as clones
// -----------------------------------------------------------------------
async function listClones(): Promise<{ clones: CloneInfo[] }> {
try {
const templates: AgentTemplateAvailable[] = await saasClient.fetchAvailableTemplates();
const clones: CloneInfo[] = templates.map((t) => {
const id = t.id || `agent-${t.name}`;
const clone: CloneInfo = {
id,
name: t.name,
role: t.description || t.category,
emoji: t.emoji,
personality: t.category,
scenarios: [],
model: getModel(),
status: 'active',
templateId: t.id,
};
agents.set(id, clone);
return clone;
});
// Set first as default
if (clones.length > 0 && !defaultAgentId) {
defaultAgentId = clones[0].id;
}
return { clones };
} catch (err) {
log.warn('Failed to list templates', err);
return { clones: [] };
}
}
// -----------------------------------------------------------------------
// Helper: OpenAI SSE streaming via SaaS relay
// -----------------------------------------------------------------------
// AbortController for cancelling active streams
let activeAbortController: AbortController | null = null;
async function chatStream(
message: string,
callbacks: {
onDelta: (delta: string) => void;
onThinkingDelta?: (delta: string) => void;
onTool?: (tool: string, input: string, output: string) => void;
onHand?: (name: string, status: string, result?: unknown) => void;
onComplete: (inputTokens?: number, outputTokens?: number) => void;
onError: (error: string) => void;
},
opts?: {
sessionKey?: string;
agentId?: string;
thinking_enabled?: boolean;
reasoning_effort?: string;
plan_mode?: boolean;
subagent_enabled?: boolean;
history?: Array<{ role: string; content: string }>;
},
): Promise<{ runId: string }> {
const runId = `run_${Date.now()}`;
const abortController = new AbortController();
activeAbortController = abortController;
const aborted = () => abortController.signal.aborted;
try {
// Build messages array: use history if available, fallback to current message only
const history = opts?.history || [];
const messages = history.length > 0
? [...history, { role: 'user' as const, content: message }]
: [{ role: 'user' as const, content: message }];
// BUG-M5 fix: Inject relevant memories into system prompt via Tauri IPC.
// This mirrors the MemoryMiddleware that runs in the kernel path.
const enhancedSystemPrompt = await injectMemories(
opts?.agentId,
'',
message,
);
if (enhancedSystemPrompt) {
messages.unshift({ role: 'system', content: enhancedSystemPrompt });
}
const model = getModel();
if (!model) {
callbacks.onError('No model available — please check SaaS relay configuration');
callbacks.onComplete();
return { runId };
}
const body: Record<string, unknown> = {
model,
messages,
stream: true,
};
// P3-06: Pass sessionKey/agentId to relay for session continuity
if (opts?.sessionKey) body['session_key'] = opts.sessionKey;
if (opts?.agentId) body['agent_id'] = opts.agentId;
if (opts?.thinking_enabled) body['thinking_enabled'] = true;
if (opts?.reasoning_effort) body['reasoning_effort'] = opts.reasoning_effort;
if (opts?.plan_mode) body['plan_mode'] = true;
if (opts?.subagent_enabled) body['subagent_enabled'] = true;
const response = await saasClient.chatCompletion(body, abortController.signal);
if (!response.ok) {
const errText = await response.text().catch(() => '');
callbacks.onError(`Relay error: ${response.status} ${errText}`);
callbacks.onComplete();
return { runId };
}
// Parse SSE stream
const reader = response.body?.getReader();
if (!reader) {
callbacks.onError('No response body');
callbacks.onComplete();
return { runId };
}
const decoder = new TextDecoder();
let buffer = '';
while (!aborted()) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Normalize CRLF to LF for SSE spec compliance
buffer = buffer.replace(/\r\n/g, '\n');
// Optimized SSE parsing: split by double-newline (event boundaries)
let boundary: number;
while ((boundary = buffer.indexOf('\n\n')) !== -1) {
const eventBlock = buffer.slice(0, boundary);
buffer = buffer.slice(boundary + 2);
// Process each line in the event block
const lines = eventBlock.split('\n');
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const data = line.slice(6).trim();
if (data === '[DONE]') continue;
try {
const parsed = JSON.parse(data);
// Handle SSE error events from relay (e.g. stream_timeout)
if (parsed.error) {
const errMsg = parsed.message || parsed.error || 'Unknown stream error';
log.warn('SSE stream error:', errMsg);
callbacks.onError(errMsg);
callbacks.onComplete();
return { runId };
}
const choices = parsed.choices?.[0];
if (!choices) continue;
const delta = choices.delta;
// Handle thinking/reasoning content
if (delta?.reasoning_content) {
callbacks.onThinkingDelta?.(delta.reasoning_content);
}
// Handle regular content
if (delta?.content) {
callbacks.onDelta(delta.content);
}
// Check for completion
if (choices.finish_reason) {
const usage = parsed.usage;
callbacks.onComplete(
usage?.prompt_tokens,
usage?.completion_tokens,
);
return { runId };
}
} catch {
// Skip malformed SSE lines
}
}
}
}
// If aborted, cancel the reader
if (aborted()) {
try { reader.cancel(); } catch { /* already closed */ }
}
// Stream ended without explicit finish_reason
callbacks.onComplete();
} catch (err) {
if (aborted()) {
// Cancelled by user — don't report as error
callbacks.onComplete();
return { runId };
}
const msg = err instanceof Error ? err.message : String(err);
callbacks.onError(msg);
callbacks.onComplete();
} finally {
if (activeAbortController === abortController) {
activeAbortController = null;
}
}
return { runId };
}
// -----------------------------------------------------------------------
// Build the client object with GatewayClient-compatible shape
// -----------------------------------------------------------------------
return {
// --- Connection ---
connect: async () => { log.debug('SaaS relay client connect'); },
disconnect: async () => {},
getState: () => 'connected' as const,
onStateChange: undefined,
onLog: undefined,
// --- Agents (Clones) ---
listClones,
createClone: async (opts: Record<string, unknown>) => {
const id = `agent-${Date.now()}`;
const clone: CloneInfo = {
id,
name: (opts.name as string) || 'New Agent',
role: opts.role as string,
nickname: opts.nickname as string,
emoji: opts.emoji as string,
model: getModel(),
status: 'active',
};
agents.set(id, clone);
if (!defaultAgentId) defaultAgentId = id;
return { clone };
},
updateClone: async (id: string, updates: Record<string, unknown>) => {
const existing = agents.get(id);
if (existing) agents.set(id, { ...existing, ...updates });
return { clone: agents.get(id) };
},
deleteClone: async (id: string) => {
agents.delete(id);
if (defaultAgentId === id) defaultAgentId = null;
},
getDefaultAgentId: () => defaultAgentId,
setDefaultAgentId: (id: string) => { defaultAgentId = id; },
// --- Chat ---
chatStream,
cancelStream: () => {
if (activeAbortController) {
activeAbortController.abort();
activeAbortController = null;
log.info('SSE stream cancelled by user');
}
},
// --- Hands ---
listHands: async () => ({ hands: [] }),
getHand: async () => null,
triggerHand: async () => ({ runId: `hand_${Date.now()}`, status: 'completed' }),
// --- Skills ---
listSkills: async () => ({ skills: [] }),
getSkill: async () => null,
createSkill: async () => null,
updateSkill: async () => null,
deleteSkill: async () => {},
// --- Config ---
getQuickConfig: async () => ({}),
saveQuickConfig: async () => {},
getWorkspaceInfo: async () => null,
// --- Health ---
health: async () => ({ status: 'ok', mode: 'saas-relay' }),
status: async () => ({ version: 'saas-relay', mode: 'browser' }),
// --- Usage ---
getUsageStats: async () => null,
getSessionStats: async () => null,
// --- REST helpers (not used in browser mode) ---
restGet: async () => { throw new Error('REST not available in browser mode'); },
restPost: async () => { throw new Error('REST not available in browser mode'); },
restPut: async () => { throw new Error('REST not available in browser mode'); },
restDelete: async () => { throw new Error('REST not available in browser mode'); },
restPatch: async () => { throw new Error('REST not available in browser mode'); },
} as unknown as GatewayClient;
}