fix(audit): 修复深度审计 P2 问题 — 自主授权后端守卫、反思历史累积、心跳持久化
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
- M5-补: hand_execute/skill_execute 接收 autonomy_level 参数,后端三层守卫
(supervised 全部审批 / assisted 尊重 needs_approval / autonomous 跳过)
- M3: hand_approve/hand_cancel 移除 _hand_name 下划线,添加审计日志
- M4-补: 反思历史累积存储到 reflection:history:{agent_id} 数组(最多20条)
get_history 优先读持久化历史,保留 latest key 向后兼容
- 心跳历史: VikingStorage 持久化 HeartbeatResult 数组,tick() 也存历史
heartbeat_init 恢复历史,重启后不丢失
- L2: 确认 gatewayStore 仅注释引用,无需修改
- 身份回滚: 确认 IdentityChangeProposal.tsx 已实现 HistoryItem + restoreSnapshot
- 更新 DEEP_AUDIT_REPORT.md 完成度 72% (核心 92%, 真实可用 80%)
This commit is contained in:
@@ -169,12 +169,28 @@ impl HeartbeatEngine {
|
||||
// Execute heartbeat tick
|
||||
let result = execute_tick(&agent_id, &config, &alert_sender).await;
|
||||
|
||||
// Store history
|
||||
// Store history in-memory
|
||||
let mut hist = history.lock().await;
|
||||
hist.push(result);
|
||||
if hist.len() > 100 {
|
||||
*hist = hist.split_off(50);
|
||||
}
|
||||
|
||||
// Persist history to VikingStorage (fire-and-forget)
|
||||
let history_to_persist: Vec<HeartbeatResult> = hist.clone();
|
||||
let aid = agent_id.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Ok(storage) = crate::viking_commands::get_storage().await {
|
||||
let key = format!("heartbeat:history:{}", aid);
|
||||
if let Ok(json) = serde_json::to_string(&history_to_persist) {
|
||||
if let Err(e) = zclaw_growth::VikingStorage::store_metadata_json(
|
||||
&*storage, &key, &json,
|
||||
).await {
|
||||
tracing::warn!("[heartbeat] Failed to persist history: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -191,9 +207,34 @@ impl HeartbeatEngine {
|
||||
*self.running.lock().await
|
||||
}
|
||||
|
||||
/// Execute a single tick manually
|
||||
/// Execute a single tick manually and persist the result to history
|
||||
pub async fn tick(&self) -> HeartbeatResult {
|
||||
execute_tick(&self.agent_id, &self.config, &self.alert_sender).await
|
||||
let result = execute_tick(&self.agent_id, &self.config, &self.alert_sender).await;
|
||||
|
||||
// Store in history (same as the periodic loop)
|
||||
let mut hist = self.history.lock().await;
|
||||
hist.push(result.clone());
|
||||
if hist.len() > 100 {
|
||||
*hist = hist.split_off(50);
|
||||
}
|
||||
|
||||
// Persist to VikingStorage
|
||||
let history_to_persist: Vec<HeartbeatResult> = hist.clone();
|
||||
let aid = self.agent_id.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Ok(storage) = crate::viking_commands::get_storage().await {
|
||||
let key = format!("heartbeat:history:{}", aid);
|
||||
if let Ok(json) = serde_json::to_string(&history_to_persist) {
|
||||
if let Err(e) = zclaw_growth::VikingStorage::store_metadata_json(
|
||||
&*storage, &key, &json,
|
||||
).await {
|
||||
tracing::warn!("[heartbeat] Failed to persist history: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
/// Subscribe to alerts
|
||||
@@ -208,6 +249,37 @@ impl HeartbeatEngine {
|
||||
hist.iter().rev().take(limit).cloned().collect()
|
||||
}
|
||||
|
||||
/// Restore heartbeat history from VikingStorage metadata (called during init)
|
||||
async fn restore_history(&self) {
|
||||
let key = format!("heartbeat:history:{}", self.agent_id);
|
||||
match crate::viking_commands::get_storage().await {
|
||||
Ok(storage) => {
|
||||
match zclaw_growth::VikingStorage::get_metadata_json(&*storage, &key).await {
|
||||
Ok(Some(json)) => {
|
||||
if let Ok(persisted) = serde_json::from_str::<Vec<HeartbeatResult>>(&json) {
|
||||
let count = persisted.len();
|
||||
let mut hist = self.history.lock().await;
|
||||
*hist = persisted;
|
||||
tracing::info!(
|
||||
"[heartbeat] Restored {} history entries for {}",
|
||||
count, self.agent_id
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
tracing::debug!("[heartbeat] No persisted history for {}", self.agent_id);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("[heartbeat] Failed to restore history: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("[heartbeat] Storage unavailable during init: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Update configuration
|
||||
pub async fn update_config(&self, updates: HeartbeatConfig) {
|
||||
let mut config = self.config.lock().await;
|
||||
@@ -648,6 +720,9 @@ pub async fn heartbeat_init(
|
||||
// Restore last interaction time from VikingStorage metadata
|
||||
restore_last_interaction(&agent_id).await;
|
||||
|
||||
// Restore heartbeat history from VikingStorage metadata
|
||||
engine.restore_history().await;
|
||||
|
||||
let mut engines = state.lock().await;
|
||||
engines.insert(agent_id, engine);
|
||||
Ok(())
|
||||
|
||||
@@ -229,7 +229,7 @@ impl ReflectionEngine {
|
||||
self.history = self.history.split_off(10);
|
||||
}
|
||||
|
||||
// 8. Persist result and state to VikingStorage (fire-and-forget)
|
||||
// 8. Persist result, state, and history to VikingStorage (fire-and-forget)
|
||||
let state_to_persist = self.state.clone();
|
||||
let result_to_persist = result.clone();
|
||||
let agent_id_owned = agent_id.to_string();
|
||||
@@ -245,7 +245,7 @@ impl ReflectionEngine {
|
||||
}
|
||||
}
|
||||
|
||||
// Persist result as JSON string
|
||||
// Persist latest result as JSON string
|
||||
let result_key = format!("reflection:latest:{}", agent_id_owned);
|
||||
if let Ok(result_json) = serde_json::to_string(&result_to_persist) {
|
||||
if let Err(e) = zclaw_growth::VikingStorage::store_metadata_json(
|
||||
@@ -254,6 +254,28 @@ impl ReflectionEngine {
|
||||
tracing::warn!("[reflection] Failed to persist result: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Persist full history array (append new result)
|
||||
let history_key = format!("reflection:history:{}", agent_id_owned);
|
||||
let mut history: Vec<ReflectionResult> =
|
||||
match zclaw_growth::VikingStorage::get_metadata_json(
|
||||
&*storage, &history_key,
|
||||
).await {
|
||||
Ok(Some(json)) => serde_json::from_str(&json).unwrap_or_default(),
|
||||
_ => Vec::new(),
|
||||
};
|
||||
history.push(result_to_persist);
|
||||
// Keep last 20 entries
|
||||
if history.len() > 20 {
|
||||
history = history.split_off(history.len() - 20);
|
||||
}
|
||||
if let Ok(history_json) = serde_json::to_string(&history) {
|
||||
if let Err(e) = zclaw_growth::VikingStorage::store_metadata_json(
|
||||
&*storage, &history_key, &history_json,
|
||||
).await {
|
||||
tracing::warn!("[reflection] Failed to persist history: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -661,13 +683,56 @@ pub async fn reflection_reflect(
|
||||
}
|
||||
|
||||
/// Get reflection history
|
||||
///
|
||||
/// Returns in-memory history first. If empty and an agent_id is provided,
|
||||
/// falls back to the persisted history array from VikingStorage metadata,
|
||||
/// then to the single latest result for backward compatibility.
|
||||
#[tauri::command]
|
||||
pub async fn reflection_get_history(
|
||||
limit: Option<usize>,
|
||||
agent_id: Option<String>,
|
||||
state: tauri::State<'_, ReflectionEngineState>,
|
||||
) -> Result<Vec<ReflectionResult>, String> {
|
||||
let limit = limit.unwrap_or(10);
|
||||
let engine = state.lock().await;
|
||||
Ok(engine.get_history(limit.unwrap_or(10)).into_iter().cloned().collect())
|
||||
let mut results: Vec<ReflectionResult> = engine.get_history(limit)
|
||||
.into_iter()
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
// If no in-memory results and we have an agent_id, load persisted history
|
||||
if results.is_empty() {
|
||||
if let Some(ref aid) = agent_id {
|
||||
if let Ok(storage) = crate::viking_commands::get_storage().await {
|
||||
let history_key = format!("reflection:history:{}", aid);
|
||||
match zclaw_growth::VikingStorage::get_metadata_json(&*storage, &history_key).await {
|
||||
Ok(Some(json)) => {
|
||||
if let Ok(mut persisted) = serde_json::from_str::<Vec<ReflectionResult>>(&json) {
|
||||
persisted.reverse();
|
||||
persisted.truncate(limit);
|
||||
results = persisted;
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
// Fallback: try loading single latest result (pre-history format)
|
||||
let latest_key = format!("reflection:latest:{}", aid);
|
||||
if let Ok(Some(json)) = zclaw_growth::VikingStorage::get_metadata_json(
|
||||
&*storage, &latest_key,
|
||||
).await {
|
||||
if let Ok(persisted) = serde_json::from_str::<ReflectionResult>(&json) {
|
||||
results.push(persisted);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("[reflection] Failed to load persisted history: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
/// Get reflection state
|
||||
|
||||
@@ -668,10 +668,16 @@ pub async fn skill_execute(
|
||||
id: String,
|
||||
context: SkillContext,
|
||||
input: serde_json::Value,
|
||||
autonomy_level: Option<String>,
|
||||
) -> Result<SkillResult, String> {
|
||||
// Validate skill ID
|
||||
let id = validate_id(&id, "skill_id")?;
|
||||
|
||||
// Autonomy guard: supervised mode blocks skill execution entirely
|
||||
if autonomy_level.as_deref() == Some("supervised") {
|
||||
return Err("技能执行在监督模式下需要用户审批".to_string());
|
||||
}
|
||||
|
||||
let kernel_lock = state.lock().await;
|
||||
|
||||
let kernel = kernel_lock.as_ref()
|
||||
@@ -808,28 +814,48 @@ pub async fn hand_execute(
|
||||
state: State<'_, KernelState>,
|
||||
id: String,
|
||||
input: serde_json::Value,
|
||||
autonomy_level: Option<String>,
|
||||
) -> Result<HandResult, String> {
|
||||
let kernel_lock = state.lock().await;
|
||||
|
||||
let kernel = kernel_lock.as_ref()
|
||||
.ok_or_else(|| "Kernel not initialized. Call kernel_init first.".to_string())?;
|
||||
|
||||
// Check if hand requires approval before execution
|
||||
let hands = kernel.list_hands().await;
|
||||
if let Some(hand_config) = hands.iter().find(|h| h.id == id) {
|
||||
if hand_config.needs_approval {
|
||||
let approval = kernel.create_approval(id.clone(), input).await;
|
||||
return Ok(HandResult {
|
||||
success: false,
|
||||
output: serde_json::json!({
|
||||
"status": "pending_approval",
|
||||
"approval_id": approval.id,
|
||||
"hand_id": approval.hand_id,
|
||||
"message": "This hand requires approval before execution"
|
||||
}),
|
||||
error: None,
|
||||
duration_ms: None,
|
||||
});
|
||||
// Autonomy guard: supervised mode requires approval for ALL hands
|
||||
if autonomy_level.as_deref() == Some("supervised") {
|
||||
let approval = kernel.create_approval(id.clone(), input).await;
|
||||
return Ok(HandResult {
|
||||
success: false,
|
||||
output: serde_json::json!({
|
||||
"status": "pending_approval",
|
||||
"approval_id": approval.id,
|
||||
"hand_id": approval.hand_id,
|
||||
"message": "监督模式下所有 Hand 执行需要用户审批"
|
||||
}),
|
||||
error: None,
|
||||
duration_ms: None,
|
||||
});
|
||||
}
|
||||
|
||||
// Check if hand requires approval (assisted mode or no autonomy level specified).
|
||||
// In autonomous mode, the user has opted in to bypass per-hand approval gates.
|
||||
if autonomy_level.as_deref() != Some("autonomous") {
|
||||
let hands = kernel.list_hands().await;
|
||||
if let Some(hand_config) = hands.iter().find(|h| h.id == id) {
|
||||
if hand_config.needs_approval {
|
||||
let approval = kernel.create_approval(id.clone(), input).await;
|
||||
return Ok(HandResult {
|
||||
success: false,
|
||||
output: serde_json::json!({
|
||||
"status": "pending_approval",
|
||||
"approval_id": approval.id,
|
||||
"hand_id": approval.hand_id,
|
||||
"message": "This hand requires approval before execution"
|
||||
}),
|
||||
error: None,
|
||||
duration_ms: None,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1127,7 +1153,7 @@ pub async fn approval_respond(
|
||||
#[tauri::command]
|
||||
pub async fn hand_approve(
|
||||
state: State<'_, KernelState>,
|
||||
_hand_name: String,
|
||||
hand_name: String,
|
||||
run_id: String,
|
||||
approved: bool,
|
||||
reason: Option<String>,
|
||||
@@ -1136,28 +1162,41 @@ pub async fn hand_approve(
|
||||
let kernel = kernel_lock.as_ref()
|
||||
.ok_or_else(|| "Kernel not initialized".to_string())?;
|
||||
|
||||
tracing::info!(
|
||||
"[hand_approve] hand={}, run_id={}, approved={}, reason={:?}",
|
||||
hand_name, run_id, approved, reason
|
||||
);
|
||||
|
||||
// run_id maps to approval id
|
||||
kernel.respond_to_approval(&run_id, approved, reason).await
|
||||
.map_err(|e| format!("Failed to approve hand: {}", e))?;
|
||||
|
||||
Ok(serde_json::json!({ "status": if approved { "approved" } else { "rejected" } }))
|
||||
Ok(serde_json::json!({
|
||||
"status": if approved { "approved" } else { "rejected" },
|
||||
"hand_name": hand_name,
|
||||
}))
|
||||
}
|
||||
|
||||
/// Cancel a hand execution
|
||||
#[tauri::command]
|
||||
pub async fn hand_cancel(
|
||||
state: State<'_, KernelState>,
|
||||
_hand_name: String,
|
||||
hand_name: String,
|
||||
run_id: String,
|
||||
) -> Result<serde_json::Value, String> {
|
||||
let kernel_lock = state.lock().await;
|
||||
let kernel = kernel_lock.as_ref()
|
||||
.ok_or_else(|| "Kernel not initialized".to_string())?;
|
||||
|
||||
tracing::info!(
|
||||
"[hand_cancel] hand={}, run_id={}",
|
||||
hand_name, run_id
|
||||
);
|
||||
|
||||
kernel.cancel_approval(&run_id).await
|
||||
.map_err(|e| format!("Failed to cancel hand: {}", e))?;
|
||||
|
||||
Ok(serde_json::json!({ "status": "cancelled" }))
|
||||
Ok(serde_json::json!({ "status": "cancelled", "hand_name": hand_name }))
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
|
||||
Reference in New Issue
Block a user