Files
zclaw_openfang/desktop/src-tauri/src/kernel_commands/hand.rs
iven 0a04b260a4
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
refactor(desktop): ChatStore structured split + IDB persistence + stream cancel
Split monolithic chatStore.ts (908 lines) into 4 focused stores:
- chatStore.ts: facade layer, owns messages[], backward-compatible selectors
- conversationStore.ts: conversation CRUD, agent switching, IndexedDB persistence
- streamStore.ts: streaming orchestration, chat mode, suggestions
- messageStore.ts: token tracking

Key fixes from 3-round deep audit:
- C1: Fix Rust serde camelCase vs TS snake_case mismatch (toolStart/toolEnd/iterationStart)
- C2: Fix IDB async rehydration race with persist.hasHydrated() subscribe
- C3: Add sessionKey to partialize to survive page refresh
- H3: Fix IDB migration retry on failure (don't set migrated=true in catch)
- M3: Fix ToolCallStep deduplication (toolStart creates, toolEnd updates)
- M-NEW-2: Clear sessionKey on cancelStream

Also adds:
- Rust backend stream cancellation via AtomicBool + cancel_stream command
- IndexedDB storage adapter with one-time localStorage migration
- HMR cleanup for cross-store subscriptions
2026-04-03 00:24:16 +08:00

440 lines
15 KiB
Rust

//! Hand commands: list, execute, approve, cancel, get, run_status, run_list, run_cancel
//!
//! Hands are autonomous capabilities registered in the Kernel's HandRegistry.
//! Hand execution can require approval depending on autonomy level and config.
use serde::{Deserialize, Serialize};
use serde_json;
use tauri::{AppHandle, Emitter, State};
use super::KernelState;
// ============================================================================
// Hands Commands - Autonomous Capabilities
// ============================================================================
/// Hand information response for frontend
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct HandInfoResponse {
pub id: String,
pub name: String,
pub description: String,
pub status: String,
pub requirements_met: bool,
pub needs_approval: bool,
pub dependencies: Vec<String>,
pub tags: Vec<String>,
pub enabled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub category: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub icon: Option<String>,
#[serde(default)]
pub tool_count: u32,
#[serde(default)]
pub metric_count: u32,
}
impl From<zclaw_hands::HandConfig> for HandInfoResponse {
fn from(config: zclaw_hands::HandConfig) -> Self {
// Determine status based on enabled and dependencies
let status = if !config.enabled {
"unavailable".to_string()
} else if config.needs_approval {
"needs_approval".to_string()
} else {
"idle".to_string()
};
// Extract category from tags if present
let category = config.tags.iter().find(|t| {
["research", "automation", "browser", "data", "media", "communication"].contains(&t.as_str())
}).cloned();
// Map tags to icon
let icon = if config.tags.contains(&"browser".to_string()) {
Some("globe".to_string())
} else if config.tags.contains(&"research".to_string()) {
Some("search".to_string())
} else if config.tags.contains(&"media".to_string()) {
Some("video".to_string())
} else if config.tags.contains(&"data".to_string()) {
Some("database".to_string())
} else if config.tags.contains(&"communication".to_string()) {
Some("message-circle".to_string())
} else {
Some("zap".to_string())
};
Self {
id: config.id,
name: config.name,
description: config.description,
status,
requirements_met: config.enabled && config.dependencies.is_empty(),
needs_approval: config.needs_approval,
dependencies: config.dependencies,
tags: config.tags,
enabled: config.enabled,
category,
icon,
tool_count: 0,
metric_count: 0,
}
}
}
/// Hand execution result
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct HandResult {
pub success: bool,
pub output: serde_json::Value,
pub error: Option<String>,
pub duration_ms: Option<u64>,
}
impl From<zclaw_hands::HandResult> for HandResult {
fn from(result: zclaw_hands::HandResult) -> Self {
Self {
success: result.success,
output: result.output,
error: result.error,
duration_ms: result.duration_ms,
}
}
}
/// List all registered hands
///
/// Returns hands from the Kernel's HandRegistry.
/// Hands are registered during kernel initialization.
// @connected
#[tauri::command]
pub async fn hand_list(
state: State<'_, KernelState>,
) -> Result<Vec<HandInfoResponse>, String> {
let kernel_lock = state.lock().await;
let kernel = kernel_lock.as_ref()
.ok_or_else(|| "Kernel not initialized. Call kernel_init first.".to_string())?;
let hands = kernel.list_hands().await;
Ok(hands.into_iter().map(HandInfoResponse::from).collect())
}
/// Execute a hand
///
/// Executes a hand with the given ID and input.
/// If the hand has `needs_approval = true`, creates a pending approval instead.
/// Returns the hand result as JSON, or a pending status with approval ID.
// @connected
#[tauri::command]
pub async fn hand_execute(
state: State<'_, KernelState>,
id: String,
input: serde_json::Value,
autonomy_level: Option<String>,
) -> Result<HandResult, String> {
let kernel_lock = state.lock().await;
let kernel = kernel_lock.as_ref()
.ok_or_else(|| "Kernel not initialized. Call kernel_init first.".to_string())?;
// Autonomy guard: supervised mode requires approval for ALL hands
if autonomy_level.as_deref() == Some("supervised") {
let approval = kernel.create_approval(id.clone(), input).await;
return Ok(HandResult {
success: false,
output: serde_json::json!({
"status": "pending_approval",
"approval_id": approval.id,
"hand_id": approval.hand_id,
"message": "监督模式下所有 Hand 执行需要用户审批"
}),
error: None,
duration_ms: None,
});
}
// Check if hand requires approval (assisted mode or no autonomy level specified).
// In autonomous mode, the user has opted in to bypass per-hand approval gates.
if autonomy_level.as_deref() != Some("autonomous") {
let hands = kernel.list_hands().await;
if let Some(hand_config) = hands.iter().find(|h| h.id == id) {
if hand_config.needs_approval {
let approval = kernel.create_approval(id.clone(), input).await;
return Ok(HandResult {
success: false,
output: serde_json::json!({
"status": "pending_approval",
"approval_id": approval.id,
"hand_id": approval.hand_id,
"message": "This hand requires approval before execution"
}),
error: None,
duration_ms: None,
});
}
}
}
// Execute hand directly (returns result + run_id for tracking)
let (result, _run_id) = kernel.execute_hand(&id, input).await
.map_err(|e| format!("Failed to execute hand: {}", e))?;
Ok(HandResult::from(result))
}
/// Approve a hand execution
///
/// When approved, the kernel's `respond_to_approval` internally spawns the Hand
/// execution. We additionally emit Tauri events so the frontend can track when
/// the execution finishes.
// @connected
#[tauri::command]
pub async fn hand_approve(
app: AppHandle,
state: State<'_, KernelState>,
hand_name: String,
run_id: String,
approved: bool,
reason: Option<String>,
) -> Result<serde_json::Value, String> {
let kernel_lock = state.lock().await;
let kernel = kernel_lock.as_ref()
.ok_or_else(|| "Kernel not initialized".to_string())?;
tracing::info!(
"[hand_approve] hand={}, run_id={}, approved={}, reason={:?}",
hand_name, run_id, approved, reason
);
// Verify the approval belongs to the specified hand before responding.
// This prevents cross-hand approval attacks where a run_id from one hand
// is used to approve a different hand's pending execution.
let approvals = kernel.list_approvals().await;
let entry = approvals.iter().find(|a| a.id == run_id && a.status == "pending")
.ok_or_else(|| format!("Approval not found or already resolved: {}", run_id))?;
if entry.hand_id != hand_name {
return Err(format!(
"Approval run_id {} belongs to hand '{}', not '{}' as requested",
run_id, entry.hand_id, hand_name
));
}
kernel.respond_to_approval(&run_id, approved, reason).await
.map_err(|e| format!("Failed to approve hand: {}", e))?;
// When approved, monitor the Hand execution and emit events to the frontend
if approved {
let approval_id = run_id.clone();
let hand_id = hand_name.clone();
let kernel_state: KernelState = (*state).clone();
tokio::spawn(async move {
// Poll the approval status until it transitions from "approved" to
// "completed" or "failed" (set by the kernel's spawned task).
// Timeout after 5 minutes to avoid hanging forever.
let timeout = tokio::time::Duration::from_secs(300);
let poll_interval = tokio::time::Duration::from_millis(500);
let result = tokio::time::timeout(timeout, async {
loop {
tokio::time::sleep(poll_interval).await;
let kernel_lock = kernel_state.lock().await;
if let Some(kernel) = kernel_lock.as_ref() {
// Use get_approval to check any status (not just "pending")
if let Some(entry) = kernel.get_approval(&approval_id).await {
match entry.status.as_str() {
"completed" => {
tracing::info!("[hand_approve] Hand '{}' execution completed for approval {}", hand_id, approval_id);
return (true, None::<String>);
}
"failed" => {
let error_msg = entry.input.get("error")
.and_then(|v| v.as_str())
.unwrap_or("Unknown error")
.to_string();
tracing::warn!("[hand_approve] Hand '{}' execution failed for approval {}: {}", hand_id, approval_id, error_msg);
return (false, Some(error_msg));
}
_ => {} // still running (status is "approved")
}
} else {
// Entry disappeared entirely — kernel was likely restarted
return (false, Some("Approval entry disappeared".to_string()));
}
} else {
return (false, Some("Kernel not available".to_string()));
}
}
}).await;
let (success, error) = match result {
Ok((s, e)) => (s, e),
Err(_) => (false, Some("Hand execution timed out (5 minutes)".to_string())),
};
let _ = app.emit("hand-execution-complete", serde_json::json!({
"approvalId": approval_id,
"handId": hand_id,
"success": success,
"error": error,
}));
});
}
Ok(serde_json::json!({
"status": if approved { "approved" } else { "rejected" },
"hand_name": hand_name,
}))
}
/// Cancel a hand execution
// @connected
#[tauri::command]
pub async fn hand_cancel(
state: State<'_, KernelState>,
hand_name: String,
run_id: String,
) -> Result<serde_json::Value, String> {
let kernel_lock = state.lock().await;
let kernel = kernel_lock.as_ref()
.ok_or_else(|| "Kernel not initialized".to_string())?;
tracing::info!(
"[hand_cancel] hand={}, run_id={}",
hand_name, run_id
);
// Verify the approval belongs to the specified hand before cancelling
let approvals = kernel.list_approvals().await;
let entry = approvals.iter().find(|a| a.id == run_id && a.status == "pending")
.ok_or_else(|| format!("Approval not found or already resolved: {}", run_id))?;
if entry.hand_id != hand_name {
return Err(format!(
"Approval run_id {} belongs to hand '{}', not '{}' as requested",
run_id, entry.hand_id, hand_name
));
}
kernel.cancel_approval(&run_id).await
.map_err(|e| format!("Failed to cancel hand: {}", e))?;
Ok(serde_json::json!({ "status": "cancelled", "hand_name": hand_name }))
}
// ============================================================
// Hand Stub Commands (not yet fully implemented)
// ============================================================
/// Get detailed info for a single hand
// @connected
#[tauri::command]
pub async fn hand_get(
state: State<'_, KernelState>,
name: String,
) -> Result<serde_json::Value, String> {
let kernel_lock = state.lock().await;
let kernel = kernel_lock.as_ref()
.ok_or_else(|| "Kernel not initialized".to_string())?;
let hands = kernel.list_hands().await;
let found = hands.iter().find(|h| h.id == name)
.ok_or_else(|| format!("Hand '{}' not found", name))?;
Ok(serde_json::to_value(found)
.map_err(|e| format!("Serialization error: {}", e))?)
}
/// Get status of a specific hand run
// @connected
#[tauri::command]
pub async fn hand_run_status(
state: State<'_, KernelState>,
run_id: String,
) -> Result<serde_json::Value, String> {
let kernel_lock = state.lock().await;
let kernel = kernel_lock.as_ref()
.ok_or_else(|| "Kernel not initialized".to_string())?;
let parsed_id: zclaw_types::HandRunId = run_id.parse()
.map_err(|e| format!("Invalid run ID: {}", e))?;
let run = kernel.get_hand_run(&parsed_id).await
.map_err(|e| format!("Failed to get hand run: {}", e))?;
match run {
Some(r) => Ok(serde_json::to_value(r)
.map_err(|e| format!("Serialization error: {}", e))?),
None => Ok(serde_json::json!({
"status": "not_found",
"run_id": run_id,
"message": "Hand run not found"
})),
}
}
/// List run history for a hand (or all hands)
// @connected
#[tauri::command]
pub async fn hand_run_list(
state: State<'_, KernelState>,
hand_name: Option<String>,
status: Option<String>,
limit: Option<u32>,
offset: Option<u32>,
) -> Result<serde_json::Value, String> {
let kernel_lock = state.lock().await;
let kernel = kernel_lock.as_ref()
.ok_or_else(|| "Kernel not initialized".to_string())?;
let filter = zclaw_types::HandRunFilter {
hand_name,
status: status.map(|s| s.parse()).transpose()
.map_err(|e| format!("Invalid status filter: {}", e))?,
limit,
offset,
};
let runs = kernel.list_hand_runs(&filter).await
.map_err(|e| format!("Failed to list hand runs: {}", e))?;
let total = kernel.count_hand_runs(&filter).await
.map_err(|e| format!("Failed to count hand runs: {}", e))?;
Ok(serde_json::json!({
"runs": runs,
"total": total,
"limit": filter.limit.unwrap_or(20),
"offset": filter.offset.unwrap_or(0),
}))
}
/// Cancel a running hand execution
// @reserved: 暂无前端集成
#[tauri::command]
pub async fn hand_run_cancel(
state: State<'_, KernelState>,
run_id: String,
) -> Result<serde_json::Value, String> {
let kernel_lock = state.lock().await;
let kernel = kernel_lock.as_ref()
.ok_or_else(|| "Kernel not initialized".to_string())?;
let parsed_id: zclaw_types::HandRunId = run_id.parse()
.map_err(|e| format!("Invalid run ID: {}", e))?;
kernel.cancel_hand_run(&parsed_id).await
.map_err(|e| format!("Failed to cancel hand run: {}", e))?;
Ok(serde_json::json!({
"status": "cancelled",
"run_id": run_id
}))
}