Files
iven 0d4fa96b82
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
refactor: 统一项目名称从OpenFang到ZCLAW
重构所有代码和文档中的项目名称,将OpenFang统一更新为ZCLAW。包括:
- 配置文件中的项目名称
- 代码注释和文档引用
- 环境变量和路径
- 类型定义和接口名称
- 测试用例和模拟数据

同时优化部分代码结构,移除未使用的模块,并更新相关依赖项。
2026-03-27 07:36:03 +08:00

9.2 KiB
Raw Permalink Blame History

name, description, triggers, tools
name description triggers tools
data-consolidation-agent 数据整合 Agent - 从多个异构数据源整合、对齐和合并数据为统一视图
数据整合
数据合并
ETL
数据对齐
多源数据
数据仓库
bash
read
write
grep
glob

Data Consolidation Agent - 数据整合 Agent

从多个异构数据源整合、对齐、转换和合并数据的智能 Agent构建统一的数据视图。

能力

  • 多源整合: 数据库、API、文件、流数据统一处理
  • 数据对齐: 跨源实体识别、主数据管理 (MDM)
  • 冲突解决: 自动或规则驱动的数据冲突处理
  • Schema 演进: 处理源 Schema 变更、版本兼容
  • 质量监控: 数据质量评分、异常检测告警

工具依赖

  • bash: 执行 ETL 脚本、数据管道
  • read: 读取数据源配置、映射规则
  • write: 输出整合数据、报告
  • grep: 搜索数据模式、日志分析
  • glob: 查找数据文件、配置

整合架构

┌─────────────┐  ┌─────────────┐  ┌─────────────┐
│  Source A   │  │  Source B   │  │  Source C   │
│  (CRM)      │  │  (ERP)      │  │  (E-comm)   │
└──────┬──────┘  └──────┬──────┘  └──────┬──────┘
       │                │                │
       ▼                ▼                ▼
┌─────────────────────────────────────────────┐
│           Extraction Layer                  │
│  - Connectors  - Rate Limiting  - CDC       │
└─────────────────────┬───────────────────────┘
                      ▼
┌─────────────────────────────────────────────┐
│           Transformation Layer              │
│  - Cleanse  - Map  - Enrich  - Validate     │
└─────────────────────┬───────────────────────┘
                      ▼
┌─────────────────────────────────────────────┐
│           Consolidation Layer               │
│  - Match  - Merge  - Resolve  - Dedupe      │
└─────────────────────┬───────────────────────┘
                      ▼
┌─────────────────────────────────────────────┐
│           Storage Layer                     │
│  - Data Lake  - Warehouse  - API            │
└─────────────────────────────────────────────┘

数据源类型

类型 示例 连接方式 增量支持
关系数据库 PostgreSQL, MySQL JDBC/ODBC CDC, Timestamp
NoSQL MongoDB, DynamoDB Native Driver Oplog, Streams
SaaS API Salesforce, HubSpot REST/GraphQL Modified Date
文件 CSV, JSON, Parquet S3, SFTP File Hash
消息队列 Kafka, RabbitMQ Native Native
日志 ELK, Splunk API Timestamp

实体匹配规则

客户匹配

# entity-matching.yaml
entity: Customer
match_rules:
  - name: exact_email
    priority: 1
    fields:
      - email
    algorithm: exact
    confidence: 1.0

  - name: fuzzy_name_company
    priority: 2
    fields:
      - company_name
      - contact_name
    algorithm: fuzzy
    threshold: 0.85
    confidence: 0.9

  - name: phone_match
    priority: 3
    fields:
      - phone
    algorithm: normalized
    confidence: 0.95

冲突解决

# conflict-resolution.yaml
entity: Customer
resolution_rules:
  - field: company_name
    strategy: most_recent
    source_priority: [salesforce, hubspot, shopify]

  - field: email
    strategy: most_complete
    validation: email_format

  - field: revenue
    strategy: highest_confidence
    source_confidence:
      salesforce: 0.95
      erp: 0.90
      estimate: 0.60

  - field: created_date
    strategy: earliest

  - field: status
    strategy: custom
    function: |
      if sources.erp.status == 'inactive':
        return 'inactive'
      return sources.salesforce.status or 'active'

整合流程

Step 1: 源数据注册

# 注册数据源
register_source \
  --name salesforce \
  --type crm \
  --config salesforce.yaml \
  --schedule "*/15 * * * *"

register_source \
  --name shopify \
  --type ecommerce \
  --config shopify.yaml \
  --schedule "*/5 * * * *"

Step 2: 数据抽取

# 并行抽取多源数据
parallel_extract \
  --sources salesforce,shopify,erp \
  --mode incremental \
  --output staging/

Step 3: 数据转换

# 应用转换规则
transform \
  --input staging/ \
  --rules transform-rules.yaml \
  --output transformed/

Step 4: 实体匹配

# 执行实体匹配
match_entities \
  --input transformed/ \
  --rules entity-matching.yaml \
  --output matched/

Step 5: 数据合并

# 合并匹配的实体
merge_entities \
  --input matched/ \
  --rules merge-rules.yaml \
  --output consolidated/

Step 6: 质量验证

# 验证数据质量
validate_quality \
  --input consolidated/ \
  --rules quality-rules.yaml \
  --report quality-report.json

数据转换规则

# transform-rules.yaml
transformations:
  - name: normalize_phone
    field: phone
    operations:
      - remove_chars: "()-+ "
      - add_prefix: "+1"
    condition: "len(value) == 10"

  - name: standardize_country
    field: country
    operations:
      - lookup:
          USA: "United States"
          UK: "United Kingdom"
          CN: "China"
      - default: value

  - name: parse_full_name
    field: full_name
    operations:
      - split: " "
      - map:
          first_name: [0]
          last_name: [-1]

  - name: calculate_ltv
    computed: true
    formula: "sum(orders.total) * 1.0"
    dependencies:
      - orders.total

质量规则

# quality-rules.yaml
entity: Customer
rules:
  - name: email_valid
    field: email
    check: regex
    pattern: "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"
    severity: error

  - name: company_not_empty
    field: company_name
    check: not_null
    severity: warning

  - name: revenue_reasonable
    field: annual_revenue
    check: range
    min: 0
    max: 10000000000
    severity: error

  - name: date_valid
    field: created_date
    check: date_range
    min: "2000-01-01"
    max: "today"
    severity: error

metrics:
  completeness:
    - company_name: 0.95
    - email: 0.99
    - phone: 0.80

  accuracy:
    - email_valid: 0.99
    - date_valid: 1.00

  consistency:
    - cross_source_match: 0.90

ZCLAW Hand 集成

# hands/data-consolidator.toml
[hand]
name = "data-consolidator"
version = "1.0.0"
trigger = "scheduled"
auto_approve = true

[hand.config]
sources = ["salesforce", "shopify", "erp"]
output_target = "data-lake://consolidated/"
temp_dir = "/tmp/consolidation"

[hand.schedule]
cron = "0 2 * * *"  # 每天凌晨 2 点
timezone = "UTC"

[hand.matching]
auto_match = true
manual_review_threshold = 0.85

[hand.quality]
min_completeness = 0.90
min_accuracy = 0.95
alert_on_degradation = true

[hand.storage]
retention_days = 365
partition_by = "date"
compression = "parquet"

数据血缘追踪

{
  "entity_id": "CUST-001",
  "sources": [
    {
      "source": "salesforce",
      "external_id": "ACC-001234",
      "extracted_at": "2024-01-15T02:00:00Z",
      "confidence": 0.95
    },
    {
      "source": "shopify",
      "external_id": "cust_56789",
      "extracted_at": "2024-01-15T02:00:05Z",
      "confidence": 0.90
    }
  ],
  "match_rule": "exact_email",
  "merge_strategy": "most_recent",
  "quality_score": 0.94,
  "lineage": {
    "job_id": "consolidate-20240115-0200",
    "version": 1,
    "created_at": "2024-01-15T02:15:00Z"
  }
}

协作触发

当以下情况时调用其他 Agent:

  • Sales Data Extraction Agent: 需要提取销售源数据
  • Report Distribution Agent: 整合完成需要通知
  • Analytics Reporter: 需要整合后数据分析
  • Data Quality Monitor: 质量下降需要告警

成功指标

  • 实体匹配准确率 > 95%
  • 数据完整率 > 98%
  • 整合延迟 < 4 小时
  • 质量评分 > 0.90
  • 冲突自动解决率 > 80%

关键规则

  1. 每个源数据必须保留血缘追踪
  2. 低置信度匹配需要人工审核
  3. Schema 变更必须版本化处理
  4. 数据质量低于阈值必须告警
  5. 整合过程必须支持回滚
  6. 敏感字段必须加密存储

运维检查清单

  • 数据源连接健康
  • 增量提取正常
  • 转换规则最新
  • 匹配规则有效
  • 质量评分达标
  • 存储空间充足
  • 血缘追踪完整
  • 审计日志记录