Files
zclaw_openfang/crates/zclaw-saas/src/workers/generate_embedding.rs
iven 305984c982
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
fix(saas): P2 code quality fixes + config PATCH/PUT alignment
P2 code quality (SEC2-P2-01~10):
- P2-04: Replace vague TODO with detailed Phase 2 design note in generate_embedding.rs
- P2-05: Add NOTE(fire-and-forget) annotations to 4 long-running tokio::spawn in main.rs
- P2-07: Add DESIGN NOTE to scheduler explaining sequential execution rationale
- P2-08: Add compile-time table name whitelist + runtime char validation in db.rs
- P2-02: Verified N/A (only zclaw-pipeline uses serde_yaml_bw, no inconsistency)
- P2-06: Verified N/A (bind loop correctly matches 6-column placeholders)
- P2-03: Remains OPEN (requires upstream sqlx release)

Config HTTP method alignment (B3-4):
- Fix admin-v2 config.ts: request.patch -> request.put to match backend .put() route
- Fix backend handler doc comment: PATCH -> PUT
- Add @reserved annotations to 6 config handlers without frontend callers
2026-04-03 21:32:17 +08:00

171 lines
5.7 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! 知识条目分块 + Embedding 生成 Worker
//!
//! 当知识条目创建/更新时触发:
//! 1. 按 Markdown 标题 + 固定长度分块
//! 2. 提取关键词(从 item 的 keywords 字段继承 + 内容提取)
//! 3. 写入 knowledge_chunks 表
//! 4. 如果配置了 embedding provider生成向量 embeddingPhase 2
use async_trait::async_trait;
use sqlx::PgPool;
use serde::{Serialize, Deserialize};
use crate::error::SaasResult;
use super::Worker;
#[derive(Debug, Serialize, Deserialize)]
pub struct GenerateEmbeddingArgs {
pub item_id: String,
}
pub struct GenerateEmbeddingWorker;
#[async_trait]
impl Worker for GenerateEmbeddingWorker {
type Args = GenerateEmbeddingArgs;
fn name(&self) -> &str {
"generate_embedding"
}
async fn perform(&self, db: &PgPool, args: Self::Args) -> SaasResult<()> {
// 1. 加载条目
let item: Option<(String, String, Vec<String>)> = sqlx::query_as(
"SELECT content, title, keywords FROM knowledge_items WHERE id = $1"
)
.bind(&args.item_id)
.fetch_optional(db)
.await?;
let (content, title, keywords) = match item {
Some(row) => row,
None => {
tracing::warn!("GenerateEmbedding: item {} not found, skipping", args.item_id);
return Ok(());
}
};
// 2. 分块
let chunks = crate::knowledge::service::chunk_content(&content, 512, 64);
if chunks.is_empty() {
tracing::debug!("GenerateEmbedding: item {} has no content to chunk", args.item_id);
return Ok(());
}
// 3. 在事务中删除旧分块 + 插入新分块(防止并发竞争条件)
let mut tx = db.begin().await?;
// 锁定条目行防止并发 worker 同时处理同一条目
let locked: Option<(String,)> = sqlx::query_as(
"SELECT id FROM knowledge_items WHERE id = $1 FOR UPDATE"
)
.bind(&args.item_id)
.fetch_optional(&mut *tx)
.await?;
if locked.is_none() {
tx.rollback().await?;
tracing::warn!("GenerateEmbedding: item {} was deleted during processing", args.item_id);
return Ok(());
}
sqlx::query("DELETE FROM knowledge_chunks WHERE item_id = $1")
.bind(&args.item_id)
.execute(&mut *tx)
.await?;
for (idx, chunk) in chunks.iter().enumerate() {
let chunk_id = uuid::Uuid::new_v4().to_string();
let mut chunk_keywords = keywords.clone();
extract_chunk_keywords(chunk, &mut chunk_keywords);
sqlx::query(
"INSERT INTO knowledge_chunks (id, item_id, chunk_index, content, keywords, created_at)
VALUES ($1, $2, $3, $4, $5, NOW())"
)
.bind(&chunk_id)
.bind(&args.item_id)
.bind(idx as i32)
.bind(chunk)
.bind(&chunk_keywords)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
tracing::info!(
"GenerateEmbedding: item '{}' → {} chunks (keywords: {})",
title,
chunks.len(),
keywords.len(),
);
// NOTE (Phase 2 - deferred): Embedding vector generation is not yet implemented.
// When pgvector is configured with an embedding provider, uncomment and implement:
// generate_vectors(db, &args.item_id, &chunks).await
// This will call the configured embedding API and update knowledge_chunks.embedding column.
// See: docs/memory_pipeline_embedding_fix.md for the embedding configuration steps.
Ok(())
}
}
/// 从 chunk 内容中提取高频中文词组作为补充关键词
///
/// 简单策略:提取 2-4 字的连续中文字符段,取出现频率 > 1 的
fn extract_chunk_keywords(content: &str, keywords: &mut Vec<String>) {
let chars: Vec<char> = content.chars().collect();
let mut i = 0;
while i < chars.len() {
// 寻找连续中文字符段
if is_cjk(chars[i]) {
let start = i;
while i < chars.len() && is_cjk(chars[i]) {
i += 1;
}
let segment: String = chars[start..i].iter().collect();
// 提取 2-4 字的子串
let seg_chars: Vec<char> = segment.chars().collect();
if seg_chars.len() >= 2 {
// 只取前 2-4 字的短语(避免过长无意义词组)
for len in 2..=4.min(seg_chars.len()) {
let phrase: String = seg_chars[..len].iter().collect();
// 过滤常见停用词(简单版)
if !is_stop_word(&phrase) && !keywords.contains(&phrase) {
keywords.push(phrase);
}
}
}
} else {
i += 1;
}
}
// 限制关键词总数
keywords.truncate(50);
}
/// 判断是否为 CJK 字符
fn is_cjk(c: char) -> bool {
matches!(c,
'\u{4E00}'..='\u{9FFF}' | // CJK Unified Ideographs
'\u{3400}'..='\u{4DBF}' | // CJK Unified Ideographs Extension A
'\u{F900}'..='\u{FAFF}' // CJK Compatibility Ideographs
)
}
/// 简单停用词表
fn is_stop_word(s: &str) -> bool {
matches!(s,
"" | "" | "" | "" | "" | "" | "" | "" | "" | "" |
"" | "" | "一个" | "" | "" | "" | "" | "" | "" | "" |
"" | "" | "" | "没有" | "" | "" | "自己" | "" | "" | "" |
"" | "" | "" | "" | "什么" | "" | "所以" | "但是" | "因为" |
"如果" | "可以" | "能够" | "需要" | "应该" | "已经" | "还是" | "或者"
)
}