fix(kernel): BREAK-02 记忆提取链路闭合 + BREAK-03 审批 HandRun 跟踪

BREAK-02 记忆提取链路闭合:
- Kernel 新增 viking: Arc<VikingAdapter> 共享存储后端
- VikingAdapter 在 boot() 中初始化, 全生命周期共享
- create_middleware_chain() 注册 MemoryMiddleware (priority 150)
- CompactionMiddleware 的 growth 参数从 None 改为 GrowthIntegration
- zclaw-runtime 重新导出 VikingAdapter

BREAK-03 审批后 HandRun 跟踪:
- respond_to_approval() 添加完整 HandRun 生命周期跟踪
- Pending → Running → Completed/Failed 状态转换
- 支持 duration_ms 计时和 cancellation 注册
- 与 execute_hand() 保持一致的跟踪粒度
This commit is contained in:
iven
2026-03-29 23:45:52 +08:00
parent 09df242cf8
commit 7e90cea117
2 changed files with 69 additions and 3 deletions

View File

@@ -130,6 +130,8 @@ pub struct Kernel {
pending_approvals: Arc<Mutex<Vec<ApprovalEntry>>>,
/// Running hand runs that can be cancelled (run_id -> cancelled flag)
running_hand_runs: Arc<dashmap::DashMap<HandRunId, Arc<std::sync::atomic::AtomicBool>>>,
/// Shared memory storage backend for Growth system
viking: Arc<zclaw_runtime::VikingAdapter>,
/// A2A router for inter-agent messaging (gated by multi-agent feature)
#[cfg(feature = "multi-agent")]
a2a_router: Arc<A2aRouter>,
@@ -190,6 +192,9 @@ impl Kernel {
// Initialize trigger manager
let trigger_manager = crate::trigger_manager::TriggerManager::new(hands.clone());
// Initialize Growth system — shared VikingAdapter for memory storage
let viking = Arc::new(zclaw_runtime::VikingAdapter::in_memory());
// Restore persisted agents
let persisted = memory.list_agents().await?;
for agent in persisted {
@@ -217,6 +222,7 @@ impl Kernel {
trigger_manager,
pending_approvals: Arc::new(Mutex::new(Vec::new())),
running_hand_runs: Arc::new(dashmap::DashMap::new()),
viking,
#[cfg(feature = "multi-agent")]
a2a_router,
#[cfg(feature = "multi-agent")]
@@ -239,19 +245,30 @@ impl Kernel {
fn create_middleware_chain(&self) -> Option<zclaw_runtime::middleware::MiddlewareChain> {
let mut chain = zclaw_runtime::middleware::MiddlewareChain::new();
// Growth integration — shared VikingAdapter for memory middleware & compaction
let growth = zclaw_runtime::GrowthIntegration::new(self.viking.clone());
// Compaction middleware — only register when threshold > 0
let threshold = self.config.compaction_threshold();
if threshold > 0 {
use std::sync::Arc;
let growth_for_compaction = zclaw_runtime::GrowthIntegration::new(self.viking.clone());
let mw = zclaw_runtime::middleware::compaction::CompactionMiddleware::new(
threshold,
zclaw_runtime::CompactionConfig::default(),
Some(self.driver.clone()),
None, // growth not wired in kernel yet
Some(growth_for_compaction),
);
chain.register(Arc::new(mw));
}
// Memory middleware — auto-extract memories after conversations
{
use std::sync::Arc;
let mw = zclaw_runtime::middleware::memory::MemoryMiddleware::new(growth);
chain.register(Arc::new(mw));
}
// Loop guard middleware
{
use std::sync::Arc;
@@ -965,13 +982,62 @@ impl Kernel {
let input = entry.input.clone();
drop(approvals); // Release lock before async hand execution
// Execute the hand in background
// Execute the hand in background with HandRun tracking
let hands = self.hands.clone();
let approvals = self.pending_approvals.clone();
let memory = self.memory.clone();
let running_hand_runs = self.running_hand_runs.clone();
let id_owned = id.to_string();
tokio::spawn(async move {
// Create HandRun record for tracking
let run_id = HandRunId::new();
let now = chrono::Utc::now().to_rfc3339();
let mut run = HandRun {
id: run_id,
hand_name: hand_id.clone(),
trigger_source: TriggerSource::Manual,
params: input.clone(),
status: HandRunStatus::Pending,
result: None,
error: None,
duration_ms: None,
created_at: now.clone(),
started_at: None,
completed_at: None,
};
let _ = memory.save_hand_run(&run).await;
run.status = HandRunStatus::Running;
run.started_at = Some(chrono::Utc::now().to_rfc3339());
let _ = memory.update_hand_run(&run).await;
// Register cancellation flag
let cancel_flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
running_hand_runs.insert(run.id, cancel_flag.clone());
let context = HandContext::default();
let start = std::time::Instant::now();
let result = hands.execute(&hand_id, &context, input).await;
let duration = start.elapsed();
// Remove from running map
running_hand_runs.remove(&run.id);
// Update HandRun with result
let completed_at = chrono::Utc::now().to_rfc3339();
match &result {
Ok(res) => {
run.status = HandRunStatus::Completed;
run.result = Some(res.output.clone());
run.error = res.error.clone();
}
Err(e) => {
run.status = HandRunStatus::Failed;
run.error = Some(e.to_string());
}
}
run.duration_ms = Some(duration.as_millis() as u64);
run.completed_at = Some(completed_at);
let _ = memory.update_hand_run(&run).await;
// Update approval status based on execution result
let mut approvals = approvals.lock().await;
@@ -980,7 +1046,6 @@ impl Kernel {
Ok(_) => entry.status = "completed".to_string(),
Err(e) => {
entry.status = "failed".to_string();
// Store error in input metadata
if let Some(obj) = entry.input.as_object_mut() {
obj.insert("error".to_string(), Value::String(format!("{}", e)));
}