Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
Batch fix covering multiple modules:
- P2-01: HandRegistry Semaphore-based max_concurrent enforcement
- P2-03: Populate toolCount/metricCount from Hand trait methods
- P2-06: heartbeat_update_config minimum interval validation
- P2-07: ReflectionResult used_fallback marker for rule-based fallback
- P2-08/09: identity_propose_change parameter naming consistency
- P2-10: ClassroomMetadata is_placeholder flag for LLM failure
- P2-11: classroomStore userDidCloseDuringGeneration intent tracking
- P2-12: workflowStore pipeline_create sends actionType
- P2-13/14: PipelineInfo step_count + PipelineStepInfo for proper step mapping
- P2-15: Pipe transform support in context.resolve (8 transforms)
- P2-16: Mustache {{...}} → \${...} auto-normalization
- P2-17: SaaSLogin password placeholder 6→8
- P2-19: serialize_skill_md + update_skill preserve tools field
- P2-22: ToolOutputGuard sensitive patterns from warn→block
- P2-23: Mutex::unwrap() → unwrap_or_else in relay/service.rs
- P3-01/03/07/08/09: Various P3 fixes
- DEFECT_LIST.md: comprehensive status sync (43/51 fixed, 8 remaining)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
313 lines
13 KiB
Rust
313 lines
13 KiB
Rust
//! Hand execution and run tracking
|
|
//!
|
|
//! # Approval Architecture
|
|
//!
|
|
//! Hands with `needs_approval: true` go through a two-phase flow:
|
|
//! 1. **Entry point** (Tauri command `hand_execute`): checks `needs_approval` flag and
|
|
//! `autonomy_level`. If approval is required, creates a `PendingApproval` and returns
|
|
//! immediately — the hand is NOT executed yet.
|
|
//! 2. **Approval** (Tauri command `hand_approve`): user approves → `respond_to_approval()`
|
|
//! spawns `hands.execute()` directly (bypassing this `execute_hand()` method).
|
|
//!
|
|
//! This method (`execute_hand`) is the **direct execution path** used when approval is
|
|
//! NOT required, or when the user has opted into autonomous mode. For defense-in-depth,
|
|
//! we log a warning if a `needs_approval` hand reaches this path — it means the approval
|
|
//! gate was bypassed (e.g., by the scheduler or trigger manager, which intentionally bypass
|
|
//! approval for automated triggers).
|
|
|
|
use std::sync::Arc;
|
|
use zclaw_types::{Result, HandRun, HandRunId, HandRunStatus, HandRunFilter, TriggerSource};
|
|
use zclaw_hands::{HandContext, HandResult};
|
|
|
|
use super::Kernel;
|
|
|
|
impl Kernel {
|
|
/// Get the hands registry
|
|
pub fn hands(&self) -> &Arc<zclaw_hands::HandRegistry> {
|
|
&self.hands
|
|
}
|
|
|
|
/// List all registered hands
|
|
pub async fn list_hands(&self) -> Vec<zclaw_hands::HandConfig> {
|
|
self.hands.list().await
|
|
}
|
|
|
|
/// Execute a hand with the given input, tracking the run.
|
|
///
|
|
/// **Note:** For hands with `needs_approval: true`, the Tauri command layer should
|
|
/// route through the approval flow instead of calling this method directly. Automated
|
|
/// triggers (scheduler, trigger manager) intentionally bypass approval.
|
|
pub async fn execute_hand(
|
|
&self,
|
|
hand_id: &str,
|
|
input: serde_json::Value,
|
|
) -> Result<(HandResult, HandRunId)> {
|
|
// Defense-in-depth audit: log if a needs_approval hand reaches the direct path
|
|
let configs = self.hands.list().await;
|
|
if let Some(config) = configs.iter().find(|c| c.id == hand_id) {
|
|
if config.needs_approval {
|
|
tracing::warn!(
|
|
"[Kernel] Hand '{}' has needs_approval=true but reached direct execution path. \
|
|
Caller should route through approval flow instead.",
|
|
hand_id
|
|
);
|
|
}
|
|
}
|
|
let run_id = HandRunId::new();
|
|
let now = chrono::Utc::now().to_rfc3339();
|
|
|
|
// Create the initial HandRun record
|
|
let mut run = HandRun {
|
|
id: run_id,
|
|
hand_name: hand_id.to_string(),
|
|
trigger_source: TriggerSource::Manual,
|
|
params: input.clone(),
|
|
status: HandRunStatus::Pending,
|
|
result: None,
|
|
error: None,
|
|
duration_ms: None,
|
|
created_at: now.clone(),
|
|
started_at: None,
|
|
completed_at: None,
|
|
};
|
|
self.memory.save_hand_run(&run).await?;
|
|
|
|
// Transition to Running
|
|
run.status = HandRunStatus::Running;
|
|
run.started_at = Some(chrono::Utc::now().to_rfc3339());
|
|
self.memory.update_hand_run(&run).await?;
|
|
|
|
// Register cancellation flag
|
|
let cancel_flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
|
|
self.running_hand_runs.insert(run_id, cancel_flag.clone());
|
|
|
|
// Execute the hand (with optional timeout from HandConfig)
|
|
let context = HandContext::default();
|
|
let start = std::time::Instant::now();
|
|
|
|
// Determine timeout: prefer HandConfig.timeout_secs, fallback to context default (300s)
|
|
let timeout_secs = self.hands.get_config(hand_id)
|
|
.await
|
|
.map(|c| if c.timeout_secs > 0 { c.timeout_secs } else { context.timeout_secs })
|
|
.unwrap_or(context.timeout_secs);
|
|
|
|
let hand_result = tokio::time::timeout(
|
|
std::time::Duration::from_secs(timeout_secs),
|
|
self.hands.execute(hand_id, &context, input),
|
|
).await;
|
|
|
|
let duration = start.elapsed();
|
|
|
|
// Handle timeout
|
|
let hand_result = match hand_result {
|
|
Ok(result) => result,
|
|
Err(_) => {
|
|
// Timeout elapsed
|
|
cancel_flag.store(true, std::sync::atomic::Ordering::Relaxed);
|
|
let mut run_update = run.clone();
|
|
run_update.status = HandRunStatus::Failed;
|
|
run_update.error = Some(format!("Hand execution timed out after {}s", timeout_secs));
|
|
run_update.completed_at = Some(chrono::Utc::now().to_rfc3339());
|
|
run_update.duration_ms = Some(duration.as_millis() as u64);
|
|
self.memory.update_hand_run(&run_update).await?;
|
|
self.running_hand_runs.remove(&run_id);
|
|
return Err(zclaw_types::ZclawError::Timeout(format!("Hand '{}' timed out after {}s", hand_id, timeout_secs)));
|
|
}
|
|
};
|
|
|
|
// Check if cancelled during execution
|
|
if cancel_flag.load(std::sync::atomic::Ordering::Relaxed) {
|
|
let mut run_update = run.clone();
|
|
run_update.status = HandRunStatus::Cancelled;
|
|
run_update.completed_at = Some(chrono::Utc::now().to_rfc3339());
|
|
run_update.duration_ms = Some(duration.as_millis() as u64);
|
|
self.memory.update_hand_run(&run_update).await?;
|
|
self.running_hand_runs.remove(&run_id);
|
|
return Err(zclaw_types::ZclawError::Internal("Hand execution cancelled".to_string()));
|
|
}
|
|
|
|
// Remove from running map
|
|
self.running_hand_runs.remove(&run_id);
|
|
|
|
// Update HandRun with result
|
|
let completed_at = chrono::Utc::now().to_rfc3339();
|
|
match &hand_result {
|
|
Ok(res) => {
|
|
run.status = HandRunStatus::Completed;
|
|
run.result = Some(res.output.clone());
|
|
run.error = res.error.clone();
|
|
}
|
|
Err(e) => {
|
|
run.status = HandRunStatus::Failed;
|
|
run.error = Some(e.to_string());
|
|
}
|
|
}
|
|
run.duration_ms = Some(duration.as_millis() as u64);
|
|
run.completed_at = Some(completed_at);
|
|
self.memory.update_hand_run(&run).await?;
|
|
|
|
hand_result.map(|res| (res, run_id))
|
|
}
|
|
|
|
/// Execute a hand with a specific trigger source (for scheduled/event triggers).
|
|
///
|
|
/// Automated trigger sources (Scheduler, Event, System) bypass the approval gate
|
|
/// by design — the user explicitly configured these automated triggers.
|
|
/// Manual trigger sources should go through the approval flow at the Tauri command layer.
|
|
pub async fn execute_hand_with_source(
|
|
&self,
|
|
hand_id: &str,
|
|
input: serde_json::Value,
|
|
trigger_source: TriggerSource,
|
|
) -> Result<(HandResult, HandRunId)> {
|
|
// Audit: warn if a Manual trigger bypasses approval
|
|
if trigger_source == TriggerSource::Manual {
|
|
let configs = self.hands.list().await;
|
|
if let Some(config) = configs.iter().find(|c| c.id == hand_id) {
|
|
if config.needs_approval {
|
|
tracing::warn!(
|
|
"[Kernel] Hand '{}' (Manual trigger) has needs_approval=true but bypassed approval. \
|
|
This should go through the approval flow.",
|
|
hand_id
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
let run_id = HandRunId::new();
|
|
let now = chrono::Utc::now().to_rfc3339();
|
|
|
|
let mut run = HandRun {
|
|
id: run_id,
|
|
hand_name: hand_id.to_string(),
|
|
trigger_source,
|
|
params: input.clone(),
|
|
status: HandRunStatus::Pending,
|
|
result: None,
|
|
error: None,
|
|
duration_ms: None,
|
|
created_at: now,
|
|
started_at: None,
|
|
completed_at: None,
|
|
};
|
|
self.memory.save_hand_run(&run).await?;
|
|
|
|
run.status = HandRunStatus::Running;
|
|
run.started_at = Some(chrono::Utc::now().to_rfc3339());
|
|
self.memory.update_hand_run(&run).await?;
|
|
|
|
let cancel_flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
|
|
self.running_hand_runs.insert(run_id, cancel_flag.clone());
|
|
|
|
let context = HandContext::default();
|
|
let start = std::time::Instant::now();
|
|
|
|
// P2-02: Apply timeout to execute_hand_with_source (same as execute_hand)
|
|
let timeout_secs = self.hands.get_config(hand_id)
|
|
.await
|
|
.map(|c| if c.timeout_secs > 0 { c.timeout_secs } else { context.timeout_secs })
|
|
.unwrap_or(context.timeout_secs);
|
|
|
|
let hand_result = tokio::time::timeout(
|
|
std::time::Duration::from_secs(timeout_secs),
|
|
self.hands.execute(hand_id, &context, input),
|
|
).await;
|
|
let duration = start.elapsed();
|
|
|
|
// Check if cancelled during execution
|
|
if cancel_flag.load(std::sync::atomic::Ordering::Relaxed) {
|
|
run.status = HandRunStatus::Cancelled;
|
|
run.completed_at = Some(chrono::Utc::now().to_rfc3339());
|
|
run.duration_ms = Some(duration.as_millis() as u64);
|
|
self.memory.update_hand_run(&run).await?;
|
|
self.running_hand_runs.remove(&run_id);
|
|
return Err(zclaw_types::ZclawError::Internal("Hand execution cancelled".to_string()));
|
|
}
|
|
|
|
self.running_hand_runs.remove(&run_id);
|
|
|
|
let completed_at = chrono::Utc::now().to_rfc3339();
|
|
// Handle timeout result
|
|
let hand_result = match hand_result {
|
|
Ok(result) => result,
|
|
Err(_) => {
|
|
// Timeout elapsed
|
|
cancel_flag.store(true, std::sync::atomic::Ordering::Relaxed);
|
|
run.status = HandRunStatus::Failed;
|
|
run.error = Some(format!("Hand execution timed out after {}s", timeout_secs));
|
|
run.duration_ms = Some(duration.as_millis() as u64);
|
|
run.completed_at = Some(completed_at);
|
|
self.memory.update_hand_run(&run).await?;
|
|
return Err(zclaw_types::ZclawError::Internal(
|
|
format!("Hand '{}' timed out after {}s", hand_id, timeout_secs)
|
|
));
|
|
}
|
|
};
|
|
|
|
match &hand_result {
|
|
Ok(res) => {
|
|
run.status = HandRunStatus::Completed;
|
|
run.result = Some(res.output.clone());
|
|
run.error = res.error.clone();
|
|
}
|
|
Err(e) => {
|
|
run.status = HandRunStatus::Failed;
|
|
run.error = Some(e.to_string());
|
|
}
|
|
}
|
|
run.duration_ms = Some(duration.as_millis() as u64);
|
|
run.completed_at = Some(completed_at);
|
|
self.memory.update_hand_run(&run).await?;
|
|
|
|
hand_result.map(|res| (res, run_id))
|
|
}
|
|
|
|
// ============================================================
|
|
// Hand Run Tracking
|
|
// ============================================================
|
|
|
|
/// Get a hand run by ID
|
|
pub async fn get_hand_run(&self, id: &HandRunId) -> Result<Option<HandRun>> {
|
|
self.memory.get_hand_run(id).await
|
|
}
|
|
|
|
/// List hand runs with filter
|
|
pub async fn list_hand_runs(&self, filter: &HandRunFilter) -> Result<Vec<HandRun>> {
|
|
self.memory.list_hand_runs(filter).await
|
|
}
|
|
|
|
/// Count hand runs matching filter
|
|
pub async fn count_hand_runs(&self, filter: &HandRunFilter) -> Result<u32> {
|
|
self.memory.count_hand_runs(filter).await
|
|
}
|
|
|
|
/// Cancel a running hand execution
|
|
pub async fn cancel_hand_run(&self, id: &HandRunId) -> Result<()> {
|
|
if let Some((_, flag)) = self.running_hand_runs.remove(id) {
|
|
flag.store(true, std::sync::atomic::Ordering::Relaxed);
|
|
|
|
// Note: the actual status update happens in execute_hand_with_source
|
|
// when it detects the cancel flag
|
|
Ok(())
|
|
} else {
|
|
// Not currently running — check if exists at all
|
|
let run = self.memory.get_hand_run(id).await?;
|
|
match run {
|
|
Some(r) if r.status == HandRunStatus::Pending => {
|
|
let mut updated = r;
|
|
updated.status = HandRunStatus::Cancelled;
|
|
updated.completed_at = Some(chrono::Utc::now().to_rfc3339());
|
|
self.memory.update_hand_run(&updated).await?;
|
|
Ok(())
|
|
}
|
|
Some(r) => Err(zclaw_types::ZclawError::InvalidInput(
|
|
format!("Cannot cancel hand run {} with status {}", id, r.status)
|
|
)),
|
|
None => Err(zclaw_types::ZclawError::NotFound(
|
|
format!("Hand run {} not found", id)
|
|
)),
|
|
}
|
|
}
|
|
}
|
|
}
|