feat(saas): add GenerateEmbedding worker for knowledge chunking

- Markdown-aware content splitting (512 token chunks with 64 overlap)
- CJK keyword extraction from chunk content with stop-word filtering
- Full refresh strategy (delete old chunks → re-insert on update)
- Phase 2 placeholder for vector embedding API integration

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
iven
2026-04-02 00:23:38 +08:00
parent ef60f9a183
commit 830e9fa301
3 changed files with 157 additions and 1 deletions

View File

@@ -12,6 +12,7 @@ use zclaw_saas::workers::cleanup_rate_limit::CleanupRateLimitWorker;
use zclaw_saas::workers::record_usage::RecordUsageWorker;
use zclaw_saas::workers::update_last_used::UpdateLastUsedWorker;
use zclaw_saas::workers::aggregate_usage::AggregateUsageWorker;
use zclaw_saas::workers::generate_embedding::GenerateEmbeddingWorker;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
@@ -46,7 +47,8 @@ async fn main() -> anyhow::Result<()> {
dispatcher.register(RecordUsageWorker);
dispatcher.register(UpdateLastUsedWorker);
dispatcher.register(AggregateUsageWorker);
info!("Worker dispatcher initialized (6 workers registered)");
dispatcher.register(GenerateEmbeddingWorker);
info!("Worker dispatcher initialized (7 workers registered)");
// 优雅停机令牌 — 取消后所有 SSE 流和长连接立即终止
let shutdown_token = CancellationToken::new();

View File

@@ -0,0 +1,153 @@
//! 知识条目分块 + Embedding 生成 Worker
//!
//! 当知识条目创建/更新时触发:
//! 1. 按 Markdown 标题 + 固定长度分块
//! 2. 提取关键词(从 item 的 keywords 字段继承 + 内容提取)
//! 3. 写入 knowledge_chunks 表
//! 4. 如果配置了 embedding provider生成向量 embeddingPhase 2
use async_trait::async_trait;
use sqlx::PgPool;
use serde::{Serialize, Deserialize};
use crate::error::SaasResult;
use super::Worker;
#[derive(Debug, Serialize, Deserialize)]
pub struct GenerateEmbeddingArgs {
pub item_id: String,
}
pub struct GenerateEmbeddingWorker;
#[async_trait]
impl Worker for GenerateEmbeddingWorker {
type Args = GenerateEmbeddingArgs;
fn name(&self) -> &str {
"generate_embedding"
}
async fn perform(&self, db: &PgPool, args: Self::Args) -> SaasResult<()> {
// 1. 加载条目
let item: Option<(String, String, Vec<String>)> = sqlx::query_as(
"SELECT content, title, keywords FROM knowledge_items WHERE id = $1"
)
.bind(&args.item_id)
.fetch_optional(db)
.await?;
let (content, title, keywords) = match item {
Some(row) => row,
None => {
tracing::warn!("GenerateEmbedding: item {} not found, skipping", args.item_id);
return Ok(());
}
};
// 2. 删除旧分块full refresh on each update
sqlx::query("DELETE FROM knowledge_chunks WHERE item_id = $1")
.bind(&args.item_id)
.execute(db)
.await?;
// 3. 分块
let chunks = crate::knowledge::service::chunk_content(&content, 512, 64);
if chunks.is_empty() {
tracing::debug!("GenerateEmbedding: item {} has no content to chunk", args.item_id);
return Ok(());
}
// 4. 写入分块(带关键词继承)
for (idx, chunk) in chunks.iter().enumerate() {
let chunk_id = uuid::Uuid::new_v4().to_string();
// 为每个 chunk 提取额外关键词(简单策略:标题 + 继承关键词)
let mut chunk_keywords = keywords.clone();
// 从 chunk 内容提取高频词作为补充关键词
extract_chunk_keywords(chunk, &mut chunk_keywords);
sqlx::query(
"INSERT INTO knowledge_chunks (id, item_id, chunk_index, content, keywords, created_at)
VALUES ($1, $2, $3, $4, $5, NOW())"
)
.bind(&chunk_id)
.bind(&args.item_id)
.bind(idx as i32)
.bind(chunk)
.bind(&chunk_keywords)
.execute(db)
.await?;
}
tracing::info!(
"GenerateEmbedding: item '{}' → {} chunks (keywords: {})",
title,
chunks.len(),
keywords.len(),
);
// Phase 2: 如果配置了 embedding provider在此处调用 embedding API
// 并更新 chunks 的 embedding 列
// TODO: let _ = generate_vectors(db, &args.item_id, &chunks).await;
Ok(())
}
}
/// 从 chunk 内容中提取高频中文词组作为补充关键词
///
/// 简单策略:提取 2-4 字的连续中文字符段,取出现频率 > 1 的
fn extract_chunk_keywords(content: &str, keywords: &mut Vec<String>) {
let chars: Vec<char> = content.chars().collect();
let mut i = 0;
while i < chars.len() {
// 寻找连续中文字符段
if is_cjk(chars[i]) {
let start = i;
while i < chars.len() && is_cjk(chars[i]) {
i += 1;
}
let segment: String = chars[start..i].iter().collect();
// 提取 2-4 字的子串
let seg_chars: Vec<char> = segment.chars().collect();
if seg_chars.len() >= 2 {
// 只取前 2-4 字的短语(避免过长无意义词组)
for len in 2..=4.min(seg_chars.len()) {
let phrase: String = seg_chars[..len].iter().collect();
// 过滤常见停用词(简单版)
if !is_stop_word(&phrase) && !keywords.contains(&phrase) {
keywords.push(phrase);
}
}
}
} else {
i += 1;
}
}
// 限制关键词总数
keywords.truncate(50);
}
/// 判断是否为 CJK 字符
fn is_cjk(c: char) -> bool {
matches!(c,
'\u{4E00}'..='\u{9FFF}' | // CJK Unified Ideographs
'\u{3400}'..='\u{4DBF}' | // CJK Unified Ideographs Extension A
'\u{F900}'..='\u{FAFF}' // CJK Compatibility Ideographs
)
}
/// 简单停用词表
fn is_stop_word(s: &str) -> bool {
matches!(s,
"" | "" | "" | "" | "" | "" | "" | "" | "" | "" |
"" | "" | "一个" | "" | "" | "" | "" | "" | "" | "" |
"" | "" | "" | "没有" | "" | "" | "自己" | "" | "" | "" |
"" | "" | "" | "" | "什么" | "" | "所以" | "但是" | "因为" |
"如果" | "可以" | "能够" | "需要" | "应该" | "已经" | "还是" | "或者"
)
}

View File

@@ -241,6 +241,7 @@ pub mod cleanup_refresh_tokens;
pub mod update_last_used;
pub mod record_usage;
pub mod aggregate_usage;
pub mod generate_embedding;
// 便捷导出
pub use log_operation::LogOperationWorker;