Files
zclaw_openfang/crates/zclaw-saas/src/workers/generate_embedding.rs
iven c3593d3438
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
feat(knowledge): Phase A 知识库可见性隔离 + 结构化数据源 + 蒸馏Worker
- knowledge_items 增加 visibility(public/private) + account_id 字段
- 新建 structured_sources + structured_rows 表 (Excel JSONB 行级存储)
- 结构化数据源 CRUD API (5 路由: list/get/rows/delete/query)
- 安全查询: JSONB GIN 索引 + 可见性过滤 + 行数限制
- 蒸馏 Worker: 复用 Provider Key Pool 调 DeepSeek/Qwen API
- L0 质量过滤: 长度/隐私检测
- create_item 增加 is_admin 参数控制可见性默认值
- generate_embedding: extract_keywords_from_text 改为 pub 复用

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-12 18:36:05 +08:00

169 lines
5.7 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! 知识条目分块 + Embedding 生成 Worker
//!
//! 当知识条目创建/更新时触发:
//! 1. 按 Markdown 标题 + 固定长度分块
//! 2. 提取关键词(从 item 的 keywords 字段继承 + 内容提取)
//! 3. 写入 knowledge_chunks 表
//! 4. 如果配置了 embedding provider生成向量 embeddingPhase 2
use async_trait::async_trait;
use sqlx::PgPool;
use serde::{Serialize, Deserialize};
use crate::error::SaasResult;
use super::Worker;
#[derive(Debug, Serialize, Deserialize)]
pub struct GenerateEmbeddingArgs {
pub item_id: String,
}
pub struct GenerateEmbeddingWorker;
#[async_trait]
impl Worker for GenerateEmbeddingWorker {
type Args = GenerateEmbeddingArgs;
fn name(&self) -> &str {
"generate_embedding"
}
async fn perform(&self, db: &PgPool, args: Self::Args) -> SaasResult<()> {
// 1. 加载条目
let item: Option<(String, String, Vec<String>)> = sqlx::query_as(
"SELECT content, title, keywords FROM knowledge_items WHERE id = $1"
)
.bind(&args.item_id)
.fetch_optional(db)
.await?;
let (content, title, keywords) = match item {
Some(row) => row,
None => {
tracing::warn!("GenerateEmbedding: item {} not found, skipping", args.item_id);
return Ok(());
}
};
// 2. 分块
let chunks = crate::knowledge::service::chunk_content(&content, 512, 64);
if chunks.is_empty() {
tracing::debug!("GenerateEmbedding: item {} has no content to chunk", args.item_id);
return Ok(());
}
// 3. 在事务中删除旧分块 + 插入新分块(防止并发竞争条件)
let mut tx = db.begin().await?;
// 锁定条目行防止并发 worker 同时处理同一条目
let locked: Option<(String,)> = sqlx::query_as(
"SELECT id FROM knowledge_items WHERE id = $1 FOR UPDATE"
)
.bind(&args.item_id)
.fetch_optional(&mut *tx)
.await?;
if locked.is_none() {
tx.rollback().await?;
tracing::warn!("GenerateEmbedding: item {} was deleted during processing", args.item_id);
return Ok(());
}
sqlx::query("DELETE FROM knowledge_chunks WHERE item_id = $1")
.bind(&args.item_id)
.execute(&mut *tx)
.await?;
for (idx, chunk) in chunks.iter().enumerate() {
let chunk_id = uuid::Uuid::new_v4().to_string();
let mut chunk_keywords = keywords.clone();
extract_keywords_from_text(chunk, &mut chunk_keywords);
sqlx::query(
"INSERT INTO knowledge_chunks (id, item_id, chunk_index, content, keywords, created_at)
VALUES ($1, $2, $3, $4, $5, NOW())"
)
.bind(&chunk_id)
.bind(&args.item_id)
.bind(idx as i32)
.bind(chunk)
.bind(&chunk_keywords)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
tracing::info!(
"GenerateEmbedding: item '{}' → {} chunks (keywords: {})",
title,
chunks.len(),
keywords.len(),
);
// NOTE (Phase 2 - deferred): Embedding vector generation is not yet implemented.
// When pgvector is configured with an embedding provider, uncomment and implement:
// generate_vectors(db, &args.item_id, &chunks).await
// This will call the configured embedding API and update knowledge_chunks.embedding column.
// See: docs/memory_pipeline_embedding_fix.md for the embedding configuration steps.
Ok(())
}
}
/// 从 chunk 内容中提取高频中文词组作为补充关键词(公开,供 distill_knowledge worker 复用)
pub fn extract_keywords_from_text(content: &str, keywords: &mut Vec<String>) {
let chars: Vec<char> = content.chars().collect();
let mut i = 0;
while i < chars.len() {
// 寻找连续中文字符段
if is_cjk(chars[i]) {
let start = i;
while i < chars.len() && is_cjk(chars[i]) {
i += 1;
}
let segment: String = chars[start..i].iter().collect();
// 提取 2-4 字的子串
let seg_chars: Vec<char> = segment.chars().collect();
if seg_chars.len() >= 2 {
// 只取前 2-4 字的短语(避免过长无意义词组)
for len in 2..=4.min(seg_chars.len()) {
let phrase: String = seg_chars[..len].iter().collect();
// 过滤常见停用词(简单版)
if !is_stop_word(&phrase) && !keywords.contains(&phrase) {
keywords.push(phrase);
}
}
}
} else {
i += 1;
}
}
// 限制关键词总数
keywords.truncate(50);
}
/// 判断是否为 CJK 字符
fn is_cjk(c: char) -> bool {
matches!(c,
'\u{4E00}'..='\u{9FFF}' | // CJK Unified Ideographs
'\u{3400}'..='\u{4DBF}' | // CJK Unified Ideographs Extension A
'\u{F900}'..='\u{FAFF}' // CJK Compatibility Ideographs
)
}
/// 简单停用词表
fn is_stop_word(s: &str) -> bool {
matches!(s,
"" | "" | "" | "" | "" | "" | "" | "" | "" | "" |
"" | "" | "一个" | "" | "" | "" | "" | "" | "" | "" |
"" | "" | "" | "没有" | "" | "" | "自己" | "" | "" | "" |
"" | "" | "" | "" | "什么" | "" | "所以" | "但是" | "因为" |
"如果" | "可以" | "能够" | "需要" | "应该" | "已经" | "还是" | "或者"
)
}