From 830e9fa301421585b9b44413aa328f10d9aab952 Mon Sep 17 00:00:00 2001 From: iven Date: Thu, 2 Apr 2026 00:23:38 +0800 Subject: [PATCH] feat(saas): add GenerateEmbedding worker for knowledge chunking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Markdown-aware content splitting (512 token chunks with 64 overlap) - CJK keyword extraction from chunk content with stop-word filtering - Full refresh strategy (delete old chunks → re-insert on update) - Phase 2 placeholder for vector embedding API integration Co-Authored-By: Claude Opus 4.6 --- crates/zclaw-saas/src/main.rs | 4 +- .../src/workers/generate_embedding.rs | 153 ++++++++++++++++++ crates/zclaw-saas/src/workers/mod.rs | 1 + 3 files changed, 157 insertions(+), 1 deletion(-) create mode 100644 crates/zclaw-saas/src/workers/generate_embedding.rs diff --git a/crates/zclaw-saas/src/main.rs b/crates/zclaw-saas/src/main.rs index aba6817..96b5441 100644 --- a/crates/zclaw-saas/src/main.rs +++ b/crates/zclaw-saas/src/main.rs @@ -12,6 +12,7 @@ use zclaw_saas::workers::cleanup_rate_limit::CleanupRateLimitWorker; use zclaw_saas::workers::record_usage::RecordUsageWorker; use zclaw_saas::workers::update_last_used::UpdateLastUsedWorker; use zclaw_saas::workers::aggregate_usage::AggregateUsageWorker; +use zclaw_saas::workers::generate_embedding::GenerateEmbeddingWorker; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -46,7 +47,8 @@ async fn main() -> anyhow::Result<()> { dispatcher.register(RecordUsageWorker); dispatcher.register(UpdateLastUsedWorker); dispatcher.register(AggregateUsageWorker); - info!("Worker dispatcher initialized (6 workers registered)"); + dispatcher.register(GenerateEmbeddingWorker); + info!("Worker dispatcher initialized (7 workers registered)"); // 优雅停机令牌 — 取消后所有 SSE 流和长连接立即终止 let shutdown_token = CancellationToken::new(); diff --git a/crates/zclaw-saas/src/workers/generate_embedding.rs b/crates/zclaw-saas/src/workers/generate_embedding.rs new file mode 100644 index 0000000..db763fd --- /dev/null +++ b/crates/zclaw-saas/src/workers/generate_embedding.rs @@ -0,0 +1,153 @@ +//! 知识条目分块 + Embedding 生成 Worker +//! +//! 当知识条目创建/更新时触发: +//! 1. 按 Markdown 标题 + 固定长度分块 +//! 2. 提取关键词(从 item 的 keywords 字段继承 + 内容提取) +//! 3. 写入 knowledge_chunks 表 +//! 4. 如果配置了 embedding provider,生成向量 embedding(Phase 2) + +use async_trait::async_trait; +use sqlx::PgPool; +use serde::{Serialize, Deserialize}; +use crate::error::SaasResult; +use super::Worker; + +#[derive(Debug, Serialize, Deserialize)] +pub struct GenerateEmbeddingArgs { + pub item_id: String, +} + +pub struct GenerateEmbeddingWorker; + +#[async_trait] +impl Worker for GenerateEmbeddingWorker { + type Args = GenerateEmbeddingArgs; + + fn name(&self) -> &str { + "generate_embedding" + } + + async fn perform(&self, db: &PgPool, args: Self::Args) -> SaasResult<()> { + // 1. 加载条目 + let item: Option<(String, String, Vec)> = sqlx::query_as( + "SELECT content, title, keywords FROM knowledge_items WHERE id = $1" + ) + .bind(&args.item_id) + .fetch_optional(db) + .await?; + + let (content, title, keywords) = match item { + Some(row) => row, + None => { + tracing::warn!("GenerateEmbedding: item {} not found, skipping", args.item_id); + return Ok(()); + } + }; + + // 2. 删除旧分块(full refresh on each update) + sqlx::query("DELETE FROM knowledge_chunks WHERE item_id = $1") + .bind(&args.item_id) + .execute(db) + .await?; + + // 3. 分块 + let chunks = crate::knowledge::service::chunk_content(&content, 512, 64); + + if chunks.is_empty() { + tracing::debug!("GenerateEmbedding: item {} has no content to chunk", args.item_id); + return Ok(()); + } + + // 4. 写入分块(带关键词继承) + for (idx, chunk) in chunks.iter().enumerate() { + let chunk_id = uuid::Uuid::new_v4().to_string(); + + // 为每个 chunk 提取额外关键词(简单策略:标题 + 继承关键词) + let mut chunk_keywords = keywords.clone(); + // 从 chunk 内容提取高频词作为补充关键词 + extract_chunk_keywords(chunk, &mut chunk_keywords); + + sqlx::query( + "INSERT INTO knowledge_chunks (id, item_id, chunk_index, content, keywords, created_at) + VALUES ($1, $2, $3, $4, $5, NOW())" + ) + .bind(&chunk_id) + .bind(&args.item_id) + .bind(idx as i32) + .bind(chunk) + .bind(&chunk_keywords) + .execute(db) + .await?; + } + + tracing::info!( + "GenerateEmbedding: item '{}' → {} chunks (keywords: {})", + title, + chunks.len(), + keywords.len(), + ); + + // Phase 2: 如果配置了 embedding provider,在此处调用 embedding API + // 并更新 chunks 的 embedding 列 + // TODO: let _ = generate_vectors(db, &args.item_id, &chunks).await; + + Ok(()) + } +} + +/// 从 chunk 内容中提取高频中文词组作为补充关键词 +/// +/// 简单策略:提取 2-4 字的连续中文字符段,取出现频率 > 1 的 +fn extract_chunk_keywords(content: &str, keywords: &mut Vec) { + let chars: Vec = content.chars().collect(); + let mut i = 0; + + while i < chars.len() { + // 寻找连续中文字符段 + if is_cjk(chars[i]) { + let start = i; + while i < chars.len() && is_cjk(chars[i]) { + i += 1; + } + let segment: String = chars[start..i].iter().collect(); + + // 提取 2-4 字的子串 + let seg_chars: Vec = segment.chars().collect(); + if seg_chars.len() >= 2 { + // 只取前 2-4 字的短语(避免过长无意义词组) + for len in 2..=4.min(seg_chars.len()) { + let phrase: String = seg_chars[..len].iter().collect(); + // 过滤常见停用词(简单版) + if !is_stop_word(&phrase) && !keywords.contains(&phrase) { + keywords.push(phrase); + } + } + } + } else { + i += 1; + } + } + + // 限制关键词总数 + keywords.truncate(50); +} + +/// 判断是否为 CJK 字符 +fn is_cjk(c: char) -> bool { + matches!(c, + '\u{4E00}'..='\u{9FFF}' | // CJK Unified Ideographs + '\u{3400}'..='\u{4DBF}' | // CJK Unified Ideographs Extension A + '\u{F900}'..='\u{FAFF}' // CJK Compatibility Ideographs + ) +} + +/// 简单停用词表 +fn is_stop_word(s: &str) -> bool { + matches!(s, + "的" | "了" | "是" | "在" | "我" | "有" | "和" | "就" | "不" | "人" | + "都" | "一" | "一个" | "上" | "也" | "很" | "到" | "说" | "要" | "去" | + "你" | "会" | "着" | "没有" | "看" | "好" | "自己" | "这" | "他" | "她" | + "它" | "们" | "那" | "些" | "什么" | "为" | "所以" | "但是" | "因为" | + "如果" | "可以" | "能够" | "需要" | "应该" | "已经" | "还是" | "或者" + ) +} diff --git a/crates/zclaw-saas/src/workers/mod.rs b/crates/zclaw-saas/src/workers/mod.rs index 036702c..75846e0 100644 --- a/crates/zclaw-saas/src/workers/mod.rs +++ b/crates/zclaw-saas/src/workers/mod.rs @@ -241,6 +241,7 @@ pub mod cleanup_refresh_tokens; pub mod update_last_used; pub mod record_usage; pub mod aggregate_usage; +pub mod generate_embedding; // 便捷导出 pub use log_operation::LogOperationWorker;