Files
zclaw_openfang/skills/data-consolidation-agent/SKILL.md
iven 0d4fa96b82
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
refactor: 统一项目名称从OpenFang到ZCLAW
重构所有代码和文档中的项目名称,将OpenFang统一更新为ZCLAW。包括:
- 配置文件中的项目名称
- 代码注释和文档引用
- 环境变量和路径
- 类型定义和接口名称
- 测试用例和模拟数据

同时优化部分代码结构,移除未使用的模块,并更新相关依赖项。
2026-03-27 07:36:03 +08:00

385 lines
9.2 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

---
name: data-consolidation-agent
description: "数据整合 Agent - 从多个异构数据源整合、对齐和合并数据为统一视图"
triggers:
- "数据整合"
- "数据合并"
- "ETL"
- "数据对齐"
- "多源数据"
- "数据仓库"
tools:
- bash
- read
- write
- grep
- glob
---
# Data Consolidation Agent - 数据整合 Agent
从多个异构数据源整合、对齐、转换和合并数据的智能 Agent构建统一的数据视图。
## 能力
- **多源整合**: 数据库、API、文件、流数据统一处理
- **数据对齐**: 跨源实体识别、主数据管理 (MDM)
- **冲突解决**: 自动或规则驱动的数据冲突处理
- **Schema 演进**: 处理源 Schema 变更、版本兼容
- **质量监控**: 数据质量评分、异常检测告警
## 工具依赖
- bash: 执行 ETL 脚本、数据管道
- read: 读取数据源配置、映射规则
- write: 输出整合数据、报告
- grep: 搜索数据模式、日志分析
- glob: 查找数据文件、配置
## 整合架构
```
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Source A │ │ Source B │ │ Source C │
│ (CRM) │ │ (ERP) │ │ (E-comm) │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────┐
│ Extraction Layer │
│ - Connectors - Rate Limiting - CDC │
└─────────────────────┬───────────────────────┘
┌─────────────────────────────────────────────┐
│ Transformation Layer │
│ - Cleanse - Map - Enrich - Validate │
└─────────────────────┬───────────────────────┘
┌─────────────────────────────────────────────┐
│ Consolidation Layer │
│ - Match - Merge - Resolve - Dedupe │
└─────────────────────┬───────────────────────┘
┌─────────────────────────────────────────────┐
│ Storage Layer │
│ - Data Lake - Warehouse - API │
└─────────────────────────────────────────────┘
```
## 数据源类型
| 类型 | 示例 | 连接方式 | 增量支持 |
|------|------|----------|----------|
| 关系数据库 | PostgreSQL, MySQL | JDBC/ODBC | CDC, Timestamp |
| NoSQL | MongoDB, DynamoDB | Native Driver | Oplog, Streams |
| SaaS API | Salesforce, HubSpot | REST/GraphQL | Modified Date |
| 文件 | CSV, JSON, Parquet | S3, SFTP | File Hash |
| 消息队列 | Kafka, RabbitMQ | Native | Native |
| 日志 | ELK, Splunk | API | Timestamp |
## 实体匹配规则
### 客户匹配
```yaml
# entity-matching.yaml
entity: Customer
match_rules:
- name: exact_email
priority: 1
fields:
- email
algorithm: exact
confidence: 1.0
- name: fuzzy_name_company
priority: 2
fields:
- company_name
- contact_name
algorithm: fuzzy
threshold: 0.85
confidence: 0.9
- name: phone_match
priority: 3
fields:
- phone
algorithm: normalized
confidence: 0.95
```
### 冲突解决
```yaml
# conflict-resolution.yaml
entity: Customer
resolution_rules:
- field: company_name
strategy: most_recent
source_priority: [salesforce, hubspot, shopify]
- field: email
strategy: most_complete
validation: email_format
- field: revenue
strategy: highest_confidence
source_confidence:
salesforce: 0.95
erp: 0.90
estimate: 0.60
- field: created_date
strategy: earliest
- field: status
strategy: custom
function: |
if sources.erp.status == 'inactive':
return 'inactive'
return sources.salesforce.status or 'active'
```
## 整合流程
### Step 1: 源数据注册
```bash
# 注册数据源
register_source \
--name salesforce \
--type crm \
--config salesforce.yaml \
--schedule "*/15 * * * *"
register_source \
--name shopify \
--type ecommerce \
--config shopify.yaml \
--schedule "*/5 * * * *"
```
### Step 2: 数据抽取
```bash
# 并行抽取多源数据
parallel_extract \
--sources salesforce,shopify,erp \
--mode incremental \
--output staging/
```
### Step 3: 数据转换
```bash
# 应用转换规则
transform \
--input staging/ \
--rules transform-rules.yaml \
--output transformed/
```
### Step 4: 实体匹配
```bash
# 执行实体匹配
match_entities \
--input transformed/ \
--rules entity-matching.yaml \
--output matched/
```
### Step 5: 数据合并
```bash
# 合并匹配的实体
merge_entities \
--input matched/ \
--rules merge-rules.yaml \
--output consolidated/
```
### Step 6: 质量验证
```bash
# 验证数据质量
validate_quality \
--input consolidated/ \
--rules quality-rules.yaml \
--report quality-report.json
```
## 数据转换规则
```yaml
# transform-rules.yaml
transformations:
- name: normalize_phone
field: phone
operations:
- remove_chars: "()-+ "
- add_prefix: "+1"
condition: "len(value) == 10"
- name: standardize_country
field: country
operations:
- lookup:
USA: "United States"
UK: "United Kingdom"
CN: "China"
- default: value
- name: parse_full_name
field: full_name
operations:
- split: " "
- map:
first_name: [0]
last_name: [-1]
- name: calculate_ltv
computed: true
formula: "sum(orders.total) * 1.0"
dependencies:
- orders.total
```
## 质量规则
```yaml
# quality-rules.yaml
entity: Customer
rules:
- name: email_valid
field: email
check: regex
pattern: "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"
severity: error
- name: company_not_empty
field: company_name
check: not_null
severity: warning
- name: revenue_reasonable
field: annual_revenue
check: range
min: 0
max: 10000000000
severity: error
- name: date_valid
field: created_date
check: date_range
min: "2000-01-01"
max: "today"
severity: error
metrics:
completeness:
- company_name: 0.95
- email: 0.99
- phone: 0.80
accuracy:
- email_valid: 0.99
- date_valid: 1.00
consistency:
- cross_source_match: 0.90
```
## ZCLAW Hand 集成
```toml
# hands/data-consolidator.toml
[hand]
name = "data-consolidator"
version = "1.0.0"
trigger = "scheduled"
auto_approve = true
[hand.config]
sources = ["salesforce", "shopify", "erp"]
output_target = "data-lake://consolidated/"
temp_dir = "/tmp/consolidation"
[hand.schedule]
cron = "0 2 * * *" # 每天凌晨 2 点
timezone = "UTC"
[hand.matching]
auto_match = true
manual_review_threshold = 0.85
[hand.quality]
min_completeness = 0.90
min_accuracy = 0.95
alert_on_degradation = true
[hand.storage]
retention_days = 365
partition_by = "date"
compression = "parquet"
```
## 数据血缘追踪
```json
{
"entity_id": "CUST-001",
"sources": [
{
"source": "salesforce",
"external_id": "ACC-001234",
"extracted_at": "2024-01-15T02:00:00Z",
"confidence": 0.95
},
{
"source": "shopify",
"external_id": "cust_56789",
"extracted_at": "2024-01-15T02:00:05Z",
"confidence": 0.90
}
],
"match_rule": "exact_email",
"merge_strategy": "most_recent",
"quality_score": 0.94,
"lineage": {
"job_id": "consolidate-20240115-0200",
"version": 1,
"created_at": "2024-01-15T02:15:00Z"
}
}
```
## 协作触发
当以下情况时调用其他 Agent:
- **Sales Data Extraction Agent**: 需要提取销售源数据
- **Report Distribution Agent**: 整合完成需要通知
- **Analytics Reporter**: 需要整合后数据分析
- **Data Quality Monitor**: 质量下降需要告警
## 成功指标
- 实体匹配准确率 > 95%
- 数据完整率 > 98%
- 整合延迟 < 4 小时
- 质量评分 > 0.90
- 冲突自动解决率 > 80%
## 关键规则
1. 每个源数据必须保留血缘追踪
2. 低置信度匹配需要人工审核
3. Schema 变更必须版本化处理
4. 数据质量低于阈值必须告警
5. 整合过程必须支持回滚
6. 敏感字段必须加密存储
## 运维检查清单
- [ ] 数据源连接健康
- [ ] 增量提取正常
- [ ] 转换规则最新
- [ ] 匹配规则有效
- [ ] 质量评分达标
- [ ] 存储空间充足
- [ ] 血缘追踪完整
- [ ] 审计日志记录