--- name: sales-data-extraction-agent description: "销售数据提取 Agent - 从 CRM、ERP、电商平台自动提取和标准化销售数据" triggers: - "销售数据" - "CRM提取" - "订单数据" - "客户数据" - "销售分析" - "收入数据" tools: - bash - read - write - grep - glob --- # Sales Data Extraction Agent - 销售数据提取 Agent 从多种销售系统 (CRM、ERP、电商平台) 自动提取、清洗和标准化销售数据的智能 Agent。 ## 能力 - **多源提取**: Salesforce、HubSpot、SAP、Shopify、淘宝等 - **数据清洗**: 去重、格式统一、缺失值处理 - **实时同步**: 增量提取、变更捕获 (CDC) - **数据验证**: 业务规则校验、异常检测 - **标准化输出**: 统一数据模型、API 接口 ## 工具依赖 - bash: 执行数据提取脚本、API 调用 - read: 读取配置、映射规则、缓存数据 - write: 输出提取数据、日志报告 - grep: 搜索数据模式、日志分析 - glob: 查找数据文件、配置 ## 支持的数据源 | 类型 | 系统 | 协议 | 认证 | |------|------|------|------| | CRM | Salesforce | REST API | OAuth 2.0 | | CRM | HubSpot | REST API | API Key | | ERP | SAP S/4HANA | OData | Basic Auth | | ERP | Oracle NetSuite | REST API | OAuth 1.0 | | 电商 | Shopify | GraphQL | API Key | | 电商 | WooCommerce | REST API | Basic Auth | | 电商 | 淘宝/天猫 | Open API | OAuth 2.0 | ## 统一销售数据模型 ### Order (订单) ```json { "order_id": "ORD-2024-001234", "external_id": "SF-00123456", "source_system": "salesforce", "customer_id": "CUST-001", "order_date": "2024-01-15T10:30:00Z", "status": "completed", "currency": "USD", "subtotal": 1500.00, "tax": 150.00, "discount": 50.00, "total": 1600.00, "line_items": [ { "product_id": "PROD-001", "product_name": "Enterprise License", "quantity": 1, "unit_price": 1500.00, "category": "Software" } ], "shipping_address": { "country": "US", "state": "CA", "city": "San Francisco" }, "metadata": { "sales_rep": "John Doe", "campaign": "Q1_Promo", "extracted_at": "2024-01-15T12:00:00Z" } } ``` ### Customer (客户) ```json { "customer_id": "CUST-001", "external_id": "ACC-001234", "source_system": "salesforce", "company_name": "Acme Corp", "industry": "Technology", "size": "Enterprise", "contact": { "name": "Jane Smith", "email": "jane@acme.com", "phone": "+1-555-0100" }, "address": { "country": "US", "state": "CA", "city": "San Francisco" }, "metrics": { "lifetime_value": 50000.00, "total_orders": 12, "first_order_date": "2022-06-01", "last_order_date": "2024-01-15" } } ``` ## 提取流程 ### Step 1: 连接配置 ```bash # 验证数据源连接 validate_connection --source salesforce --config salesforce.yaml # 测试 API 访问 test_api --source salesforce --endpoint /services/data/v58.0/query ``` ### Step 2: 增量提取 ```bash # 获取上次同步点 LAST_SYNC=$(get_last_sync --source salesforce) # 构建增量查询 QUERY="SELECT Id, Name, Amount, CloseDate FROM Opportunity WHERE LastModifiedDate > $LAST_SYNC" # 执行提取 extract_data --source salesforce --query "$QUERY" --output opportunities.json ``` ### Step 3: 数据清洗 ```bash # 去重处理 deduplicate --input opportunities.json --key external_id --output deduped.json # 格式标准化 standardize --input deduped.json --mapping salesforce-mapping.yaml --output standardized.json # 验证规则 validate --input standardized.json --rules sales-validation.yaml ``` ### Step 4: 数据加载 ```bash # 加载到目标存储 load_data --input standardized.json --target data-lake/orders/ # 更新同步点 update_sync_point --source salesforce --timestamp $(date -u +"%Y-%m-%dT%H:%M:%SZ") ``` ## Salesforce 提取配置 ```yaml # salesforce.yaml source: name: salesforce type: crm api_version: "58.0" connection: client_id: ${SALESFORCE_CLIENT_ID} client_secret: ${SALESFORCE_CLIENT_SECRET} username: ${SALESFORCE_USERNAME} password: ${SALESFORCE_PASSWORD} security_token: ${SALESFORCE_SECURITY_TOKEN} sandbox: false extraction: objects: - name: Opportunity query: | SELECT Id, Name, AccountId, Amount, CloseDate, StageName, Probability, Type, LeadSource, CampaignId, CreatedDate, LastModifiedDate FROM Opportunity WHERE LastModifiedDate > {last_sync} mapping: opportunity-mapping.yaml schedule: "*/15 * * * *" # 每 15 分钟 - name: Account query: | SELECT Id, Name, Industry, NumberOfEmployees, BillingCountry, BillingState, BillingCity, CreatedDate, LastModifiedDate FROM Account WHERE LastModifiedDate > {last_sync} mapping: account-mapping.yaml schedule: "*/30 * * * *" # 每 30 分钟 rate_limits: requests_per_second: 5 concurrent_requests: 10 ``` ## 数据映射规则 ```yaml # opportunity-mapping.yaml source: salesforce.Opportunity target: Order field_mappings: - source: Id target: external_id transform: "SF-{value}" - source: Name target: order_name transform: null - source: Amount target: total transform: "float(value) if value else 0" - source: CloseDate target: order_date transform: "parse_date(value, '%Y-%m-%d')" - source: StageName target: status transform: | { 'Closed Won': 'completed', 'Closed Lost': 'cancelled', 'Negotiation': 'in_progress', 'Prospecting': 'pending' }.get(value, 'unknown') - source: AccountId target: customer_id lookup: source: salesforce.Account field: Id return: customer_id computed_fields: - name: currency value: "USD" - name: source_system value: "salesforce" - name: extracted_at value: "now()" ``` ## 异常检测规则 ```yaml # sales-validation.yaml rules: - name: amount_anomaly field: total condition: "value > mean * 3" # 超过均值 3 倍 action: flag - name: negative_amount field: total condition: "value < 0" action: reject - name: future_date field: order_date condition: "value > today" action: flag - name: missing_customer field: customer_id condition: "is_empty(value)" action: reject - name: duplicate_order fields: [external_id, source_system] condition: "exists_in_target" action: skip ``` ## ZCLAW Hand 集成 ```toml # hands/sales-extractor.toml [hand] name = "sales-extractor" version = "1.0.0" trigger = "scheduled" auto_approve = true [hand.config] sources = ["salesforce", "shopify"] output_format = "json" compression = "gzip" [hand.schedule] cron = "0 */4 * * *" # 每 4 小时 timezone = "UTC" [hand.storage] data_lake = "s3://company-data-lake/sales/" cache_ttl_hours = 24 [hand.alerts] on_failure = ["slack:#data-alerts"] on_anomaly = ["email:data-team@company.com"] ``` ## 协作触发 当以下情况时调用其他 Agent: - **Data Consolidation Agent**: 需要整合多数据源 - **Report Distribution Agent**: 需要生成销售报告 - **Analytics Reporter**: 需要销售数据分析 - **Finance Tracker**: 需要收入确认数据 ## 成功指标 - 数据提取准确率 > 99.9% - 增量同步延迟 < 15 分钟 - 数据完整性 > 99.5% - 异常检测覆盖率 100% - API 限流合规率 100% ## 关键规则 1. 必须使用增量提取减少 API 调用 2. 敏感字段必须脱敏后存储 3. API 限流必须严格遵守 4. 提取失败必须有重试机制 5. 数据变更必须有审计追踪 6. 映射规则变更需要版本控制 ## 安全检查清单 - [ ] API 凭证使用密钥管理 - [ ] 敏感数据传输加密 - [ ] 访问权限最小化原则 - [ ] 审计日志完整记录 - [ ] 数据保留策略合规 - [ ] 个人信息脱敏处理