Files
zclaw_openfang/desktop/tests/e2e/utils/network-helpers.ts
iven 6f72442531 docs(guide): rewrite CLAUDE.md with ZCLAW-first perspective
Major changes:
- Shift from "OpenFang desktop client" to "independent AI Agent desktop app"
- Add decision principle: "Is this useful for ZCLAW? Does it affect ZCLAW?"
- Simplify project structure and tech stack sections
- Replace OpenClaw vs OpenFang comparison with unified backend approach
- Consolidate troubleshooting from scattered sections into organized FAQ
- Update Hands system documentation with 8 capabilities and status
- Stream
2026-03-20 19:30:09 +08:00

367 lines
9.0 KiB
TypeScript

/**
* 网络拦截和 Mock 工具
* 用于深度验证 API 调用、模拟响应和网络错误
*/
import { Page, Route, Request } from '@playwright/test';
export interface CapturedRequest {
url: string;
method: string;
body?: unknown;
headers: Record<string, string>;
timestamp: number;
}
export interface MockResponse {
status?: number;
body?: unknown;
headers?: Record<string, string>;
delay?: number;
}
/**
* 网络拦截和 Mock 工具集
*/
export const networkHelpers = {
/**
* 拦截并记录所有 API 请求
* 返回请求列表,用于后续断言
*/
async interceptAllAPI(page: Page): Promise<CapturedRequest[]> {
const requests: CapturedRequest[] = [];
await page.route('**/api/**', async (route: Route) => {
const request = route.request();
const body = request.postData();
requests.push({
url: request.url(),
method: request.method(),
body: body ? safeParseJSON(body) : undefined,
headers: request.headers(),
timestamp: Date.now(),
});
await route.continue();
});
return requests;
},
/**
* Mock API 响应
* @param path - API 路径(不含 /api 前缀)
* @param response - Mock 响应配置
*/
async mockAPI(
page: Page,
path: string,
response: MockResponse | ((request: Request) => MockResponse)
): Promise<void> {
await page.route(`**/api/${path}**`, async (route: Route) => {
const request = route.request();
const mockConfig = typeof response === 'function' ? response(request) : response;
if (mockConfig.delay) {
await new Promise((r) => setTimeout(r, mockConfig.delay));
}
await route.fulfill({
status: mockConfig.status ?? 200,
contentType: 'application/json',
body: JSON.stringify(mockConfig.body ?? {}),
headers: mockConfig.headers,
});
});
},
/**
* Mock 多个 API 响应
*/
async mockMultipleAPIs(
page: Page,
mocks: Record<string, MockResponse>
): Promise<void> {
for (const [path, response] of Object.entries(mocks)) {
await this.mockAPI(page, path, response);
}
},
/**
* 模拟网络错误
*/
async simulateNetworkError(page: Page, path: string): Promise<void> {
await page.route(`**/api/${path}**`, async (route: Route) => {
await route.abort('failed');
});
},
/**
* 模拟连接超时
*/
async simulateTimeout(page: Page, path: string, timeoutMs: number = 60000): Promise<void> {
await page.route(`**/api/${path}**`, async (route: Route) => {
await new Promise((r) => setTimeout(r, timeoutMs));
});
},
/**
* 模拟延迟响应
*/
async simulateDelay(page: Page, path: string, delayMs: number): Promise<void> {
await page.route(`**/api/${path}**`, async (route: Route) => {
await new Promise((r) => setTimeout(r, delayMs));
await route.continue();
});
},
/**
* 模拟 HTTP 错误状态码
*/
async simulateHTTPError(page: Page, path: string, status: number, message?: string): Promise<void> {
await page.route(`**/api/${path}**`, async (route: Route) => {
await route.fulfill({
status,
contentType: 'application/json',
body: JSON.stringify({
error: true,
message: message || `HTTP ${status} Error`,
}),
});
});
},
/**
* 模拟限流 (429 Too Many Requests)
*/
async simulateRateLimit(page: Page, path: string, retryAfter: number = 60): Promise<void> {
await page.route(`**/api/${path}**`, async (route: Route) => {
await route.fulfill({
status: 429,
headers: {
'Retry-After': String(retryAfter),
},
contentType: 'application/json',
body: JSON.stringify({
error: true,
message: 'Too Many Requests',
retryAfter,
}),
});
});
},
/**
* 拦截 WebSocket 连接
*/
async interceptWebSocket(page: Page): Promise<{
messages: Array<{ direction: 'sent' | 'received'; data: unknown }>;
isConnected: () => boolean;
}> {
const messages: Array<{ direction: 'sent' | 'received'; data: unknown }> = [];
let connected = false;
// Playwright 的 WebSocket 拦截需要特殊处理
page.on('websocket', (ws) => {
connected = true;
ws.on('framereceived', (frame) => {
try {
const data = safeParseJSON(frame.payload as string);
messages.push({ direction: 'received', data });
} catch {
messages.push({ direction: 'received', data: frame.payload });
}
});
ws.on('framesent', (frame) => {
try {
const data = safeParseJSON(frame.payload as string);
messages.push({ direction: 'sent', data });
} catch {
messages.push({ direction: 'sent', data: frame.payload });
}
});
ws.on('close', () => {
connected = false;
});
});
return {
messages,
isConnected: () => connected,
};
},
/**
* Mock 流式响应(用于聊天功能)
*/
async mockStreamResponse(
page: Page,
path: string,
chunks: Array<{ delta?: string; phase?: string; content?: string }>,
chunkDelay: number = 100
): Promise<void> {
await page.route(`**/api/${path}**`, async (route: Route) => {
// 创建流式响应
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
for (const chunk of chunks) {
const data = `data: ${JSON.stringify(chunk)}\n\n`;
controller.enqueue(encoder.encode(data));
await new Promise((r) => setTimeout(r, chunkDelay));
}
controller.enqueue(encoder.encode('data: [DONE]\n\n'));
controller.close();
},
});
await route.fulfill({
status: 200,
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
},
body: stream as any,
});
});
},
/**
* 等待特定 API 请求
*/
async waitForAPIRequest(
page: Page,
pathPattern: string | RegExp,
options?: { timeout?: number }
): Promise<Request> {
return page.waitForRequest(
(request) => {
const url = request.url();
if (typeof pathPattern === 'string') {
return url.includes(pathPattern);
}
return pathPattern.test(url);
},
{ timeout: options?.timeout ?? 30000 }
);
},
/**
* 等待特定 API 响应
*/
async waitForAPIResponse(
page: Page,
pathPattern: string | RegExp,
options?: { timeout?: number }
): Promise<{ status: number; body: unknown }> {
const response = await page.waitForResponse(
(response) => {
const url = response.url();
if (typeof pathPattern === 'string') {
return url.includes(pathPattern);
}
return pathPattern.test(url);
},
{ timeout: options?.timeout ?? 30000 }
);
let body: unknown;
try {
body = await response.json();
} catch {
body = await response.text();
}
return {
status: response.status(),
body,
};
},
/**
* 清除所有路由拦截
*/
async clearRoutes(page: Page): Promise<void> {
await page.unrouteAll();
},
};
/**
* 安全解析 JSON
*/
function safeParseJSON(text: string): unknown {
try {
return JSON.parse(text);
} catch {
return text;
}
}
/**
* 请求匹配器 - 用于断言
*/
export const requestMatchers = {
/**
* 验证请求包含特定字段
*/
hasField(request: CapturedRequest, field: string, value?: unknown): boolean {
if (!request.body || typeof request.body !== 'object') return false;
const body = request.body as Record<string, unknown>;
if (value === undefined) return field in body;
return body[field] === value;
},
/**
* 验证请求方法
*/
isMethod(request: CapturedRequest, method: string): boolean {
return request.method.toUpperCase() === method.toUpperCase();
},
/**
* 验证请求路径
*/
matchesPath(request: CapturedRequest, pattern: string | RegExp): boolean {
if (typeof pattern === 'string') {
return request.url.includes(pattern);
}
return pattern.test(request.url);
},
/**
* 查找匹配的请求
*/
findRequests(
requests: CapturedRequest[],
predicate: (req: CapturedRequest) => boolean
): CapturedRequest[] {
return requests.filter(predicate);
},
/**
* 获取特定路径的所有请求
*/
getRequestsForPath(requests: CapturedRequest[], path: string): CapturedRequest[] {
return requests.filter((r) => r.url.includes(path));
},
/**
* 获取 POST 请求
*/
getPostRequests(requests: CapturedRequest[]): CapturedRequest[] {
return requests.filter((r) => r.method === 'POST');
},
/**
* 获取 GET 请求
*/
getGetRequests(requests: CapturedRequest[]): CapturedRequest[] {
return requests.filter((r) => r.method === 'GET');
},
};