Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
重构所有代码和文档中的项目名称,将OpenFang统一更新为ZCLAW。包括: - 配置文件中的项目名称 - 代码注释和文档引用 - 环境变量和路径 - 类型定义和接口名称 - 测试用例和模拟数据 同时优化部分代码结构,移除未使用的模块,并更新相关依赖项。
9.2 KiB
9.2 KiB
name, description, triggers, tools
| name | description | triggers | tools | |||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| data-consolidation-agent | 数据整合 Agent - 从多个异构数据源整合、对齐和合并数据为统一视图 |
|
|
Data Consolidation Agent - 数据整合 Agent
从多个异构数据源整合、对齐、转换和合并数据的智能 Agent,构建统一的数据视图。
能力
- 多源整合: 数据库、API、文件、流数据统一处理
- 数据对齐: 跨源实体识别、主数据管理 (MDM)
- 冲突解决: 自动或规则驱动的数据冲突处理
- Schema 演进: 处理源 Schema 变更、版本兼容
- 质量监控: 数据质量评分、异常检测告警
工具依赖
- bash: 执行 ETL 脚本、数据管道
- read: 读取数据源配置、映射规则
- write: 输出整合数据、报告
- grep: 搜索数据模式、日志分析
- glob: 查找数据文件、配置
整合架构
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Source A │ │ Source B │ │ Source C │
│ (CRM) │ │ (ERP) │ │ (E-comm) │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────┐
│ Extraction Layer │
│ - Connectors - Rate Limiting - CDC │
└─────────────────────┬───────────────────────┘
▼
┌─────────────────────────────────────────────┐
│ Transformation Layer │
│ - Cleanse - Map - Enrich - Validate │
└─────────────────────┬───────────────────────┘
▼
┌─────────────────────────────────────────────┐
│ Consolidation Layer │
│ - Match - Merge - Resolve - Dedupe │
└─────────────────────┬───────────────────────┘
▼
┌─────────────────────────────────────────────┐
│ Storage Layer │
│ - Data Lake - Warehouse - API │
└─────────────────────────────────────────────┘
数据源类型
| 类型 | 示例 | 连接方式 | 增量支持 |
|---|---|---|---|
| 关系数据库 | PostgreSQL, MySQL | JDBC/ODBC | CDC, Timestamp |
| NoSQL | MongoDB, DynamoDB | Native Driver | Oplog, Streams |
| SaaS API | Salesforce, HubSpot | REST/GraphQL | Modified Date |
| 文件 | CSV, JSON, Parquet | S3, SFTP | File Hash |
| 消息队列 | Kafka, RabbitMQ | Native | Native |
| 日志 | ELK, Splunk | API | Timestamp |
实体匹配规则
客户匹配
# entity-matching.yaml
entity: Customer
match_rules:
- name: exact_email
priority: 1
fields:
- email
algorithm: exact
confidence: 1.0
- name: fuzzy_name_company
priority: 2
fields:
- company_name
- contact_name
algorithm: fuzzy
threshold: 0.85
confidence: 0.9
- name: phone_match
priority: 3
fields:
- phone
algorithm: normalized
confidence: 0.95
冲突解决
# conflict-resolution.yaml
entity: Customer
resolution_rules:
- field: company_name
strategy: most_recent
source_priority: [salesforce, hubspot, shopify]
- field: email
strategy: most_complete
validation: email_format
- field: revenue
strategy: highest_confidence
source_confidence:
salesforce: 0.95
erp: 0.90
estimate: 0.60
- field: created_date
strategy: earliest
- field: status
strategy: custom
function: |
if sources.erp.status == 'inactive':
return 'inactive'
return sources.salesforce.status or 'active'
整合流程
Step 1: 源数据注册
# 注册数据源
register_source \
--name salesforce \
--type crm \
--config salesforce.yaml \
--schedule "*/15 * * * *"
register_source \
--name shopify \
--type ecommerce \
--config shopify.yaml \
--schedule "*/5 * * * *"
Step 2: 数据抽取
# 并行抽取多源数据
parallel_extract \
--sources salesforce,shopify,erp \
--mode incremental \
--output staging/
Step 3: 数据转换
# 应用转换规则
transform \
--input staging/ \
--rules transform-rules.yaml \
--output transformed/
Step 4: 实体匹配
# 执行实体匹配
match_entities \
--input transformed/ \
--rules entity-matching.yaml \
--output matched/
Step 5: 数据合并
# 合并匹配的实体
merge_entities \
--input matched/ \
--rules merge-rules.yaml \
--output consolidated/
Step 6: 质量验证
# 验证数据质量
validate_quality \
--input consolidated/ \
--rules quality-rules.yaml \
--report quality-report.json
数据转换规则
# transform-rules.yaml
transformations:
- name: normalize_phone
field: phone
operations:
- remove_chars: "()-+ "
- add_prefix: "+1"
condition: "len(value) == 10"
- name: standardize_country
field: country
operations:
- lookup:
USA: "United States"
UK: "United Kingdom"
CN: "China"
- default: value
- name: parse_full_name
field: full_name
operations:
- split: " "
- map:
first_name: [0]
last_name: [-1]
- name: calculate_ltv
computed: true
formula: "sum(orders.total) * 1.0"
dependencies:
- orders.total
质量规则
# quality-rules.yaml
entity: Customer
rules:
- name: email_valid
field: email
check: regex
pattern: "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"
severity: error
- name: company_not_empty
field: company_name
check: not_null
severity: warning
- name: revenue_reasonable
field: annual_revenue
check: range
min: 0
max: 10000000000
severity: error
- name: date_valid
field: created_date
check: date_range
min: "2000-01-01"
max: "today"
severity: error
metrics:
completeness:
- company_name: 0.95
- email: 0.99
- phone: 0.80
accuracy:
- email_valid: 0.99
- date_valid: 1.00
consistency:
- cross_source_match: 0.90
ZCLAW Hand 集成
# hands/data-consolidator.toml
[hand]
name = "data-consolidator"
version = "1.0.0"
trigger = "scheduled"
auto_approve = true
[hand.config]
sources = ["salesforce", "shopify", "erp"]
output_target = "data-lake://consolidated/"
temp_dir = "/tmp/consolidation"
[hand.schedule]
cron = "0 2 * * *" # 每天凌晨 2 点
timezone = "UTC"
[hand.matching]
auto_match = true
manual_review_threshold = 0.85
[hand.quality]
min_completeness = 0.90
min_accuracy = 0.95
alert_on_degradation = true
[hand.storage]
retention_days = 365
partition_by = "date"
compression = "parquet"
数据血缘追踪
{
"entity_id": "CUST-001",
"sources": [
{
"source": "salesforce",
"external_id": "ACC-001234",
"extracted_at": "2024-01-15T02:00:00Z",
"confidence": 0.95
},
{
"source": "shopify",
"external_id": "cust_56789",
"extracted_at": "2024-01-15T02:00:05Z",
"confidence": 0.90
}
],
"match_rule": "exact_email",
"merge_strategy": "most_recent",
"quality_score": 0.94,
"lineage": {
"job_id": "consolidate-20240115-0200",
"version": 1,
"created_at": "2024-01-15T02:15:00Z"
}
}
协作触发
当以下情况时调用其他 Agent:
- Sales Data Extraction Agent: 需要提取销售源数据
- Report Distribution Agent: 整合完成需要通知
- Analytics Reporter: 需要整合后数据分析
- Data Quality Monitor: 质量下降需要告警
成功指标
- 实体匹配准确率 > 95%
- 数据完整率 > 98%
- 整合延迟 < 4 小时
- 质量评分 > 0.90
- 冲突自动解决率 > 80%
关键规则
- 每个源数据必须保留血缘追踪
- 低置信度匹配需要人工审核
- Schema 变更必须版本化处理
- 数据质量低于阈值必须告警
- 整合过程必须支持回滚
- 敏感字段必须加密存储
运维检查清单
- 数据源连接健康
- 增量提取正常
- 转换规则最新
- 匹配规则有效
- 质量评分达标
- 存储空间充足
- 血缘追踪完整
- 审计日志记录