Files
zclaw_openfang/wiki/pipeline.md
iven c10e50d58e docs(wiki): Phase D完成 — 6模块页重构(routing/chat/butler/hands-skills/pipeline/data-model)
- routing.md: 移除Store/lib列表+5节模板 (330→131行)
- chat.md: 添加集成契约+不变量 (180→134行)
- butler.md: 移除重复→引用memory/hands-skills (215→150行)
- hands-skills.md: 5节模板+契约+不变量 (281→170行)
- pipeline.md: 添加契约+重组 (157→154行)
- data-model.md: 添加契约+双库架构图 (181→153行)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-22 21:53:17 +08:00

155 lines
7.2 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

---
title: Pipeline DSL
updated: 2026-04-22
status: active
tags: [module, pipeline, dsl]
---
# Pipeline DSL
> 从 [[index]] 导航。关联模块: [[hands-skills]] [[chat]]
## 1. 设计决策
**WHY DAG 执行器**: 工作流步骤之间存在数据依赖DAG (有向无环图) 通过拓扑排序自动推导执行顺序,支持无依赖节点的并行执行,比线性管道更灵活、更高效。
**WHY YAML 模板**: 声明式定义 + 可版本控制。非技术用户可直接编辑 YAML 文件调整步骤和参数无需重新编译。模板可随项目仓库同步、diff、review。
**WHY 18 模板覆盖 8 行业**: 管家模式面向行业垂直场景。每个行业目录包含该领域典型工作流(如汕头设计的供应链采集、医疗的政策合规),用户可直接使用或定制。
**WHY v2 解析器**: v1 仅支持线性步骤序列v2 引入 DAG 依赖声明 (`depends_on` 字段)支持复杂分支和并行。v1 解析器保留用于向后兼容。
**WHY ActionRegistry**: Pipeline 步骤与具体执行逻辑解耦。`action_type` 字符串映射到注册的处理函数,新增步骤类型只需注册新 action不改动执行器核心。
## 2. 关键文件 + 数据流
### 核心文件
| 文件 | 职责 |
|------|------|
| `crates/zclaw-pipeline/src/executor.rs` | DAG 执行器 — 拓扑排序 + 并行执行 |
| `crates/zclaw-pipeline/src/parser_v2.rs` | YAML v2 解析器 (11 tests) |
| `crates/zclaw-pipeline/src/parser.rs` | YAML v1 解析器 (兼容) |
| `crates/zclaw-pipeline/src/state.rs` | 运行状态管理 |
| `crates/zclaw-pipeline/src/intent.rs` | Pipeline 意图匹配 |
| `crates/zclaw-pipeline/src/trigger.rs` | 定时/事件触发器 |
| `desktop/src/lib/pipeline-client.ts` | 前端 Pipeline 客户端 |
| `desktop/src/store/workflowBuilderStore.ts` | 工作流编辑器状态 |
| `desktop/src/components/pipeline/` | Pipeline UI 组件 |
| `desktop/src-tauri/src/pipeline_commands/` | 12 个 Tauri 命令 (4 文件) |
| `pipelines/` | 18 个 YAML 模板 (8 目录) |
### 架构流程
```
用户选择模板 (Workflow 面板)
→ pipeline-client.ts: invoke('pipeline_load_template')
→ parser_v2: 解析 YAML → PipelineDefinition (steps + depends_on)
→ executor: 构建 DAG → 拓扑排序 → 检测循环依赖
→ 并行执行无依赖节点:
→ ActionRegistry.resolve(action_type) → 具体 Handler
→ Handler 执行 → step_result (output + status)
→ 有依赖节点等待前驱完成 → 执行
→ 全部完成 → PipelineRun.status = Completed
→ 前端轮询 progress 或 Tauri Event 推送状态
```
### 集成契约
| 方向 | 接口 | 说明 |
|------|------|------|
| Called by ← UI | `pipelineStore.ts` / `workflowBuilderStore.ts` | 工作流面板交互: 列出模板、创建/运行/取消 Pipeline |
| Calls → runtime | Tauri invoke (discovery 12 命令) | pipeline_commands/ 转发到 DAG executor |
| Calls → skills/hands | `ActionRegistry.resolve(action_type)` | Pipeline 步骤可能调用 Skill 或 Hand 执行具体动作 |
| Called by ← chat | `intent_router.rs` | 聊天消息意图匹配到 Pipeline 模板 |
| Calls → memory | 记忆检索 (via runtime) | Pipeline 执行时可检索历史记忆增强步骤上下文 |
## 3. 代码逻辑
### 运行状态
```rust
enum RunStatus { Pending, Running, Completed, Failed, Cancelled }
```
### DAG 执行流程详解
1. **解析阶段**: `parser_v2.rs` 将 YAML 反序列化为 `PipelineDefinition`,包含 `steps: Vec<StepDef>` 和每个 step 的 `depends_on: Vec<String>`
2. **构建阶段**: `executor.rs` 将 steps 映射为 DAG 节点,建立邻接表 (step_id → [依赖的 step_ids])
3. **排序阶段**: Kahn 算法拓扑排序,检测循环依赖 — 若排序后节点数 < 总节点数说明存在环返回错误
4. **执行阶段**: 按拓扑序逐层执行同层无依赖节点并行每步通过 `ActionRegistry` 解析 `action_type` 到具体 Handler
5. **完成阶段**: 全部步骤成功 `Completed`任一步骤失败 整体 `Failed`用户可随时 `Cancel`
### Tauri 命令分布
| 文件 | 命令数 | 命令 |
|------|--------|------|
| discovery.rs | 8 | list/get/run/progress/cancel/result/runs/refresh |
| crud.rs | 3 | create/update/delete |
| intent_router.rs | 1 | route_intent |
| presentation.rs | 2 | analyze_presentation/pipeline_templates |
前端 invoke 匹配: 8 个调用对应 8 discovery 命令另有 2 @reserved (`orchestration_execute`/`orchestration_validate`无前端 UI)。
### 模板分布 (18 YAML, 8 目录)
```
pipelines/
├── _templates/ (2) — article-summary, competitor-analysis
├── design-shantou/ (4) — 汕头玩具/服装: 通信/竞品/供应链/趋势
├── education/ (4) — 课堂/教案/研究→测验/学生分析
├── healthcare/ (3) — 数据报告/会议纪要/政策合规
├── legal/ (1) — 合同审查
├── marketing/ (1) — 营销活动
├── productivity/ (1) — 会议摘要
└── research/ (1) — 文献综述
```
### 不变量
> **DAG 节点必须有明确的依赖关系,循环依赖会在 topological sort 阶段被检测并报错。**
> **模板 YAML 结构不重复 — 每个行业目录的模板聚焦该领域特有场景。**
### 测试链路
| 功能 | 测试文件 | 测试数 |
|------|---------|--------|
| YAML 解析 v2 | parser_v2.rs | 11 |
| DAG 执行 | executor.rs | 2 |
| 意图匹配 | intent.rs | 5 |
| 状态管理 | state.rs | 6 |
| 触发器 | trigger.rs | 5 |
| 类型 | types.rs + types_v2.rs | 4 |
| 解析 v1 | parser.rs | 5 |
| 引擎上下文/阶段 | engine/ | 8 |
| 演示 | presentation/ | 13 |
| **合计** | 13 文件 | **59** |
## 4. 活跃问题 + 注意事项
| 优先级 | 问题 | 说明 |
|--------|------|------|
| P2 | Pipeline+Skill E2E 通过率 37.5% | Tauri IPC 可用但参数格式问题非核心链路 |
| P3 | Deepseek 中继任务卡 processing | Provider Key 禁用后已有任务不自动清理 |
| | pipeline_create 反序列化 | BUG-L2 已修复 (04-17 回归)需持续关注 |
**注意事项**: Pipeline 步骤中的 `action_type` 必须在 `ActionRegistry` 中注册未注册的 action 会导致步骤 Failed模板 YAML `depends_on` 字段引用的 step_id 必须存在否则解析阶段报错前端 `workflowBuilderStore.ts` 负责编辑器状态 CRUD 操作通过 `pipeline-client.ts` 调用 Tauri 命令不直接操作文件系统
## 5. 变更日志
> 最近 5 条与 Pipeline 相关的变更。完整日志见 [[log]]。
| 日期 | 变更 |
|------|------|
| 2026-04-21 | Phase 0+1 修复: Skill 工具调用桥接 complete_with_tools() + Hand 字段映射 runId |
| 2026-04-17 | E2E 回归: pipeline_create 反序列化 BUG-L2 修复 |
| 2026-04-16 | 3 P0 修复 + 5 E2E Bug 修复Pipeline Tauri 命令数校正 |
| 2026-04-09 | Pipeline+Hands 双交付DAG 执行器稳定化 |
| 2026-04-01 | 17 YAML 模板 + DAG 执行器初始版本 |
## 关联模块
- [[hands-skills]] Pipeline 步骤可能调用 Hand/Skill 执行具体动作
- [[chat]] Pipeline 可通过聊天意图匹配触发 (`intent_router.rs`)
- [[memory]] Pipeline 执行时可检索记忆增强上下文