fix(audit): 第五轮审计修复 — 反思LLM分析、语义路由、并行执行、错误中文化
- P2: 反思引擎接入 LLM 深度行为分析 (analyze_patterns_with_llm) - P3-M6: 语义路由 RuntimeLlmIntentDriver 真实 LLM 匹配 - P3-L1: V2 Pipeline execute_parallel 改用 buffer_unordered 真正并行 - P3-S10: Rust 用户可见错误提示统一中文化 累计修复 27 项,完成度 ~72% → ~78%
This commit is contained in:
@@ -10,6 +10,7 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use async_trait::async_trait;
|
||||
use futures::stream::{self, StreamExt};
|
||||
use serde_json::{Value, json};
|
||||
|
||||
use crate::types_v2::{Stage, ConditionalBranch};
|
||||
@@ -269,7 +270,7 @@ impl StageEngine {
|
||||
|
||||
self.emit_event(StageEvent::Progress {
|
||||
stage_id: stage_id.to_string(),
|
||||
message: "Calling LLM...".to_string(),
|
||||
message: "正在调用 LLM...".to_string(),
|
||||
});
|
||||
|
||||
let prompt_str = resolved_prompt.as_str()
|
||||
@@ -302,7 +303,7 @@ impl StageEngine {
|
||||
stage_id: &str,
|
||||
each: &str,
|
||||
stage_template: &Stage,
|
||||
_max_workers: usize,
|
||||
max_workers: usize,
|
||||
context: &mut ExecutionContextV2,
|
||||
) -> Result<Value, StageError> {
|
||||
// Resolve the array to iterate over
|
||||
@@ -313,29 +314,58 @@ impl StageEngine {
|
||||
return Ok(Value::Array(vec![]));
|
||||
}
|
||||
|
||||
let workers = max_workers.max(1).min(total);
|
||||
let stage_template = stage_template.clone();
|
||||
|
||||
// Clone Arc drivers for concurrent tasks
|
||||
let llm_driver = self.llm_driver.clone();
|
||||
let skill_driver = self.skill_driver.clone();
|
||||
let hand_driver = self.hand_driver.clone();
|
||||
let event_callback = self.event_callback.clone();
|
||||
|
||||
self.emit_event(StageEvent::Progress {
|
||||
stage_id: stage_id.to_string(),
|
||||
message: format!("Processing {} items", total),
|
||||
message: format!("并行处理 {} 项 (workers={})", total, workers),
|
||||
});
|
||||
|
||||
// Sequential execution with progress tracking
|
||||
// Note: True parallel execution would require Send-safe drivers
|
||||
let mut outputs = Vec::with_capacity(total);
|
||||
// Parallel execution using buffer_unordered
|
||||
let results: Vec<(usize, Result<StageResult, StageError>)> = stream::iter(
|
||||
items.into_iter().enumerate().map(|(index, item)| {
|
||||
let child_ctx = context.child_context(item, index, total);
|
||||
let stage = stage_template.clone();
|
||||
let llm = llm_driver.clone();
|
||||
let skill = skill_driver.clone();
|
||||
let hand = hand_driver.clone();
|
||||
let cb = event_callback.clone();
|
||||
|
||||
for (index, item) in items.into_iter().enumerate() {
|
||||
let mut child_context = context.child_context(item.clone(), index, total);
|
||||
async move {
|
||||
let engine = StageEngine {
|
||||
llm_driver: llm,
|
||||
skill_driver: skill,
|
||||
hand_driver: hand,
|
||||
event_callback: cb,
|
||||
max_workers: workers,
|
||||
};
|
||||
let mut ctx = child_ctx;
|
||||
let result = engine.execute(&stage, &mut ctx).await;
|
||||
(index, result)
|
||||
}
|
||||
})
|
||||
)
|
||||
.buffer_unordered(workers)
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
self.emit_event(StageEvent::ParallelProgress {
|
||||
stage_id: stage_id.to_string(),
|
||||
completed: index,
|
||||
total,
|
||||
});
|
||||
// Sort by original index to preserve order
|
||||
let mut ordered: Vec<_> = results.into_iter().collect();
|
||||
ordered.sort_by_key(|(idx, _)| *idx);
|
||||
|
||||
match self.execute(stage_template, &mut child_context).await {
|
||||
Ok(result) => outputs.push(result.output),
|
||||
Err(e) => outputs.push(json!({ "error": e.to_string(), "index": index })),
|
||||
let outputs: Vec<Value> = ordered.into_iter().map(|(index, result)| {
|
||||
match result {
|
||||
Ok(sr) => sr.output,
|
||||
Err(e) => json!({ "error": e.to_string(), "index": index }),
|
||||
}
|
||||
}
|
||||
}).collect();
|
||||
|
||||
Ok(Value::Array(outputs))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user