perf(chat): 回复效率 + 建议生成并行化优化
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled

- identity prompt 缓存: LazyLock<RwLock<HashMap>> 缓存已构建的 identity prompt,
  soul.md 更新时自动失效, 省去每次请求的 mutex + 磁盘 I/O (~0.5-1s)
- pre-conversation hook 并行化: tokio::join! 并行执行 identity build 和
  continuity context 查询, 不再串行等待 (~1-2s)
- suggestion context 预取: 流式回复期间提前启动 fetchSuggestionContext,
  回复结束时 context 已就绪 (~0.5-1s)
- 建议生成与 memory extraction 解耦: generateLLMSuggestions 不再等待
  memory extraction LLM 调用完成, 独立启动 (~3-8s)
- Path B (agent stream) 补全 context: lifecycle:end 路径使用预取 context,
  修复零个性化问题
- 上下文窗口扩展: slice(-6) → slice(-20), 每条截断 200 字符
- suggestion prompt 重写: 1 深入追问 + 1 实用行动 + 1 管家关怀,
  明确角色定位, 禁止空泛建议
This commit is contained in:
iven
2026-04-23 23:13:20 +08:00
parent 10497362bb
commit 5cf7adff69
3 changed files with 141 additions and 53 deletions

View File

@@ -7,8 +7,10 @@
use tracing::{debug, warn}; use tracing::{debug, warn};
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tauri::Emitter; use tauri::Emitter;
use tokio::sync::RwLock;
use zclaw_growth::VikingStorage; use zclaw_growth::VikingStorage;
use crate::intelligence::identity::IdentityManagerState; use crate::intelligence::identity::IdentityManagerState;
@@ -16,6 +18,36 @@ use crate::intelligence::heartbeat::HeartbeatEngineState;
use crate::intelligence::reflection::{MemoryEntryForAnalysis, ReflectionEngineState}; use crate::intelligence::reflection::{MemoryEntryForAnalysis, ReflectionEngineState};
use zclaw_runtime::driver::LlmDriver; use zclaw_runtime::driver::LlmDriver;
// ---------------------------------------------------------------------------
// Identity prompt cache — avoids mutex + disk I/O on every request
// ---------------------------------------------------------------------------
struct CachedIdentity {
prompt: String,
#[allow(dead_code)] // Reserved for future TTL-based cache validation
soul_hash: u64,
}
static IDENTITY_CACHE: std::sync::LazyLock<RwLock<HashMap<String, CachedIdentity>>> =
std::sync::LazyLock::new(|| RwLock::new(HashMap::new()));
/// Invalidate cached identity prompt for a given agent (call when soul.md changes).
pub fn invalidate_identity_cache(agent_id: &str) {
let cache = &*IDENTITY_CACHE;
// Non-blocking: spawn a task to remove the entry
if let Ok(mut guard) = cache.try_write() {
guard.remove(agent_id);
}
}
/// Simple hash for cache invalidation — uses string content hash.
fn content_hash(s: &str) -> u64 {
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
s.hash(&mut hasher);
hasher.finish()
}
/// Run pre-conversation intelligence hooks /// Run pre-conversation intelligence hooks
/// ///
/// Builds identity-enhanced system prompt (SOUL.md + instructions) and /// Builds identity-enhanced system prompt (SOUL.md + instructions) and
@@ -29,10 +61,29 @@ pub async fn pre_conversation_hook(
_user_message: &str, _user_message: &str,
identity_state: &IdentityManagerState, identity_state: &IdentityManagerState,
) -> Result<String, String> { ) -> Result<String, String> {
// Build identity-enhanced system prompt (SOUL.md + instructions) // Check identity prompt cache first (avoids mutex + disk I/O)
// Memory context is injected by MemoryMiddleware in the kernel middleware chain, let cache = &*IDENTITY_CACHE;
// not here, to avoid duplicate injection. {
let enhanced_prompt = match build_identity_prompt(agent_id, "", identity_state).await { let guard = cache.read().await;
if let Some(cached) = guard.get(agent_id) {
// Cache hit — still need continuity context, but skip identity build
let continuity_context = build_continuity_context(agent_id, _user_message).await;
let mut result = cached.prompt.clone();
if !continuity_context.is_empty() {
result.push_str(&continuity_context);
}
debug!("[intelligence_hooks] Identity cache HIT for agent {}", agent_id);
return Ok(result);
}
}
// Cache miss — build identity prompt and continuity context in parallel
let (identity_result, continuity_context) = tokio::join!(
build_identity_prompt_cached(agent_id, "", identity_state, cache),
build_continuity_context(agent_id, _user_message)
);
let enhanced_prompt = match identity_result {
Ok(prompt) => prompt, Ok(prompt) => prompt,
Err(e) => { Err(e) => {
warn!( warn!(
@@ -43,9 +94,6 @@ pub async fn pre_conversation_hook(
} }
}; };
// Cross-session continuity: check for unresolved pain points and recent experiences
let continuity_context = build_continuity_context(agent_id, _user_message).await;
let mut result = enhanced_prompt; let mut result = enhanced_prompt;
if !continuity_context.is_empty() { if !continuity_context.is_empty() {
result.push_str(&continuity_context); result.push_str(&continuity_context);
@@ -240,6 +288,8 @@ pub async fn post_conversation_hook(
warn!("[intelligence_hooks] Failed to update soul with agent name: {}", e); warn!("[intelligence_hooks] Failed to update soul with agent name: {}", e);
} else { } else {
debug!("[intelligence_hooks] Updated agent name to '{}' in soul", name); debug!("[intelligence_hooks] Updated agent name to '{}' in soul", name);
// Invalidate cache since soul.md changed
invalidate_identity_cache(agent_id);
} }
} }
drop(manager); drop(manager);
@@ -340,21 +390,34 @@ async fn build_memory_context(
Ok(context) Ok(context)
} }
/// Build identity-enhanced system prompt /// Build identity-enhanced system prompt and cache the result.
async fn build_identity_prompt( async fn build_identity_prompt_cached(
agent_id: &str, agent_id: &str,
memory_context: &str, memory_context: &str,
identity_state: &IdentityManagerState, identity_state: &IdentityManagerState,
cache: &RwLock<HashMap<String, CachedIdentity>>,
) -> Result<String, String> { ) -> Result<String, String> {
// IdentityManagerState is Arc<tokio::sync::Mutex<AgentIdentityManager>>
// tokio::sync::Mutex::lock() returns MutexGuard directly
let mut manager = identity_state.lock().await; let mut manager = identity_state.lock().await;
// Read current soul content for hashing
let soul_content = manager.get_file(agent_id, crate::intelligence::identity::IdentityFile::Soul);
let soul_hash = content_hash(&soul_content);
let prompt = manager.build_system_prompt( let prompt = manager.build_system_prompt(
agent_id, agent_id,
if memory_context.is_empty() { None } else { Some(memory_context) }, if memory_context.is_empty() { None } else { Some(memory_context) },
).await; ).await;
// Cache the result
drop(manager); // Release lock before acquiring write guard
{
let mut guard = cache.write().await;
guard.insert(agent_id.to_string(), CachedIdentity {
prompt: prompt.clone(),
soul_hash,
});
}
Ok(prompt) Ok(prompt)
} }

View File

@@ -646,22 +646,24 @@ const HARDCODED_PROMPTS: Record<string, { system: string; user: (arg: string) =>
}, },
suggestions: { suggestions: {
system: `你是对话分析助手和智能管家。根据对话内容和用户画像信息,生成 3 个个性化建议。 system: `你是 ZCLAW 的管家助手,需要站在用户角度思考他们真正需要什么,生成 3 个个性化建议。
## 生成规则 ## 生成规则
1. 2 条对话续问(深入当前话题,帮助用户继续探索 1. 第 1 条 — 深入追问:基于当前话题,提出一个有洞察力的追问,帮助用户深入探索
2. 1 条管家关怀(基于用户消息中提供的痛点、经验或技能信息 2. 第 2 条 — 实用行动:建议一个具体的、可操作的下一步(调用技能、执行工具、查看数据等
3. 第 3 条 — 管家关怀:
- 如果有未解决痛点 → 回访建议,如"上次你提到X后来解决了吗" - 如果有未解决痛点 → 回访建议,如"上次你提到X后来解决了吗"
- 如果有相关经验 → 引导复用,如"上次用X方法解决了类似问题要再试试吗" - 如果有相关经验 → 引导复用,如"上次用X方法解决了类似问题要再试试吗"
- 如果有匹配技能 → 推荐使用,如"你可以试试 [技能名] 来处理这个" - 如果有匹配技能 → 推荐使用,如"你可以试试 [技能名] 来处理这个"
- 如果没有提供痛点/经验/技能信息 → 全部生成对话续问 - 如果没有提供痛点/经验/技能信息 → 给出一个启发性的思考角度
3. 每个不超过 30 个中文字符 4. 每个不超过 30 个中文字符
4. 不要重复对话中已讨论过的内容 5. 不要重复对话中已讨论过的内容
5. 使用与用户相同的语言 6. 不要生成空泛的建议(如"继续分析"、"换个角度"
7. 使用与用户相同的语言
只输出 JSON 数组,包含恰好 3 个字符串。不要输出任何其他内容。 只输出 JSON 数组,包含恰好 3 个字符串。不要输出任何其他内容。
示例:["科室绩效分析可以按哪些维度拆解?", "上次的 researcher 技能能用在查房数据整理上吗?", "自动生成合规检查报告的模板有哪些"]`, 示例:["科室绩效分析可以按哪些维度拆解?", " researcher 技能帮你查一下相关文献?", "上次你提到排班冲突的问题,需要我再帮你想解决方案吗"]`,
user: (context: string) => `以下是对话中最近的消息:\n\n${context}\n\n请生成 3 个后续问题`, user: (context: string) => `以下是对话中最近的消息:\n\n${context}\n\n请生成 3 个后续建议1 深入追问 + 1 实用行动 + 1 管家关怀)`,
}, },
}; };

View File

@@ -40,6 +40,10 @@ import { fetchSuggestionContext, type SuggestionContext } from '../../lib/sugges
const log = createLogger('StreamStore'); const log = createLogger('StreamStore');
// Module-level prefetch for suggestion context — started during streaming,
// consumed on stream completion. Saves ~0.5-1s vs fetching after stream ends.
let _activeSuggestionContextPrefetch: Promise<SuggestionContext> | null = null;
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Error formatting — convert raw LLM/API errors to user-friendly messages // Error formatting — convert raw LLM/API errors to user-friendly messages
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -400,44 +404,50 @@ function createCompleteHandler(
} }
} }
// Parallel: memory extraction + intelligence context fetch // Decoupled: suggestion generation runs immediately with prefetched context,
// memory extraction + reflection run independently in background.
const filtered = msgs const filtered = msgs
.filter(m => m.role === 'user' || m.role === 'assistant') .filter(m => m.role === 'user' || m.role === 'assistant')
.map(m => ({ role: m.role, content: m.content })); .map(m => ({ role: m.role, content: m.content }));
const convId = useConversationStore.getState().currentConversationId; const convId = useConversationStore.getState().currentConversationId;
const lastUserContent = typeof lastContent === 'string' ? lastContent : '';
const suggestionContextPromise = fetchSuggestionContext(agentId, lastUserContent); // Build conversation messages for suggestions
// Fire-and-forget background tasks
Promise.all([
getMemoryExtractor().extractFromConversation(filtered, agentId, convId ?? undefined)
.catch(err => log.warn('Memory extraction failed:', err)),
intelligenceClient.reflection.recordConversation()
.catch(err => log.warn('Recording conversation failed:', err)),
suggestionContextPromise,
]).then(([, , context]) => {
// Conditional reflection (after context is ready)
intelligenceClient.reflection.shouldReflect().then(shouldReflect => {
if (shouldReflect) {
intelligenceClient.reflection.reflect(agentId, []).catch(err => {
log.warn('Reflection failed:', err);
});
}
});
// Follow-up suggestions with enriched context
const latestMsgs = chat.getMessages() || []; const latestMsgs = chat.getMessages() || [];
const conversationMessages = latestMsgs const conversationMessages = latestMsgs
.filter(m => m.role === 'user' || m.role === 'assistant') .filter(m => m.role === 'user' || m.role === 'assistant')
.filter(m => !m.streaming) .filter(m => !m.streaming)
.map(m => ({ role: m.role, content: m.content })); .map(m => ({ role: m.role, content: m.content }));
generateLLMSuggestions(conversationMessages, set, context).catch(err => { // Consume prefetched context (started in sendMessage during streaming)
const prefetchPromise = _activeSuggestionContextPrefetch;
_activeSuggestionContextPrefetch = null;
// Fire suggestion generation immediately — don't wait for memory extraction
const fireSuggestions = (ctx?: SuggestionContext) => {
generateLLMSuggestions(conversationMessages, set, ctx).catch(err => {
log.warn('Suggestion generation error:', err); log.warn('Suggestion generation error:', err);
set({ suggestionsLoading: false }); set({ suggestionsLoading: false });
}); });
};
if (prefetchPromise) {
prefetchPromise.then(fireSuggestions).catch(() => fireSuggestions());
} else {
fireSuggestions();
}
// Background tasks run independently — never block suggestions
getMemoryExtractor().extractFromConversation(filtered, agentId, convId ?? undefined)
.catch(err => log.warn('Memory extraction failed:', err));
intelligenceClient.reflection.recordConversation()
.catch(err => log.warn('Recording conversation failed:', err))
.then(() => intelligenceClient.reflection.shouldReflect())
.then(shouldReflect => {
if (shouldReflect) {
intelligenceClient.reflection.reflect(agentId, []).catch(err => {
log.warn('Reflection failed:', err);
}); });
}
}).catch(() => {});
}; };
} }
@@ -573,9 +583,9 @@ async function generateLLMSuggestions(
set({ suggestionsLoading: true }); set({ suggestionsLoading: true });
try { try {
const recentMessages = messages.slice(-6); const recentMessages = messages.slice(-20);
const conversationContext = recentMessages const conversationContext = recentMessages
.map(m => `${m.role === 'user' ? '用户' : '助手'}: ${m.content}`) .map(m => `${m.role === 'user' ? '用户' : '助手'}: ${m.content.slice(0, 200)}`)
.join('\n\n'); .join('\n\n');
// Build dynamic user message with intelligence context // Build dynamic user message with intelligence context
@@ -799,6 +809,9 @@ export const useStreamStore = create<StreamState>()(
}); });
set({ isStreaming: true, activeRunId: null }); set({ isStreaming: true, activeRunId: null });
// Prefetch suggestion context during streaming — saves ~0.5-1s post-stream
_activeSuggestionContextPrefetch = fetchSuggestionContext(agentId, content);
// Delta buffer — batches updates at ~60fps // Delta buffer — batches updates at ~60fps
const buffer = new DeltaBuffer(assistantId, _chat); const buffer = new DeltaBuffer(assistantId, _chat);
@@ -1072,10 +1085,20 @@ export const useStreamStore = create<StreamState>()(
.filter(m => !m.streaming) .filter(m => !m.streaming)
.map(m => ({ role: m.role, content: m.content })); .map(m => ({ role: m.role, content: m.content }));
generateLLMSuggestions(conversationMessages, set).catch(err => { // Path B: use prefetched context for agent stream — fixes zero-personalization
const prefetchPromise = _activeSuggestionContextPrefetch;
_activeSuggestionContextPrefetch = null;
const fireSuggestions = (ctx?: SuggestionContext) => {
generateLLMSuggestions(conversationMessages, set, ctx).catch(err => {
log.warn('Suggestion generation error:', err); log.warn('Suggestion generation error:', err);
set({ suggestionsLoading: false }); set({ suggestionsLoading: false });
}); });
};
if (prefetchPromise) {
prefetchPromise.then(fireSuggestions).catch(() => fireSuggestions());
} else {
fireSuggestions();
}
} }
} }
} else if (delta.stream === 'hand') { } else if (delta.stream === 'hand') {