From c3593d343892e2183583916158a11344fb37d3e9 Mon Sep 17 00:00:00 2001 From: iven Date: Sun, 12 Apr 2026 18:36:05 +0800 Subject: [PATCH] =?UTF-8?q?feat(knowledge):=20Phase=20A=20=E7=9F=A5?= =?UTF-8?q?=E8=AF=86=E5=BA=93=E5=8F=AF=E8=A7=81=E6=80=A7=E9=9A=94=E7=A6=BB?= =?UTF-8?q?=20+=20=E7=BB=93=E6=9E=84=E5=8C=96=E6=95=B0=E6=8D=AE=E6=BA=90?= =?UTF-8?q?=20+=20=E8=92=B8=E9=A6=8FWorker?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - knowledge_items 增加 visibility(public/private) + account_id 字段 - 新建 structured_sources + structured_rows 表 (Excel JSONB 行级存储) - 结构化数据源 CRUD API (5 路由: list/get/rows/delete/query) - 安全查询: JSONB GIN 索引 + 可见性过滤 + 行数限制 - 蒸馏 Worker: 复用 Provider Key Pool 调 DeepSeek/Qwen API - L0 质量过滤: 长度/隐私检测 - create_item 增加 is_admin 参数控制可见性默认值 - generate_embedding: extract_keywords_from_text 改为 pub 复用 Co-Authored-By: Claude Opus 4.6 --- ...000001_knowledge_visibility_structured.sql | 77 +++++ ...000001_knowledge_visibility_structured.sql | 7 + crates/zclaw-saas/src/knowledge/handlers.rs | 101 ++++++- crates/zclaw-saas/src/knowledge/mod.rs | 8 +- crates/zclaw-saas/src/knowledge/service.rs | 273 +++++++++++++++++- crates/zclaw-saas/src/knowledge/types.rs | 124 +++++++- crates/zclaw-saas/src/main.rs | 13 +- .../src/workers/distill_knowledge.rs | 253 ++++++++++++++++ .../src/workers/generate_embedding.rs | 8 +- crates/zclaw-saas/src/workers/mod.rs | 2 + 10 files changed, 846 insertions(+), 20 deletions(-) create mode 100644 crates/zclaw-saas/migrations/20260413000001_knowledge_visibility_structured.sql create mode 100644 crates/zclaw-saas/migrations/down/20260413000001_knowledge_visibility_structured.sql create mode 100644 crates/zclaw-saas/src/workers/distill_knowledge.rs diff --git a/crates/zclaw-saas/migrations/20260413000001_knowledge_visibility_structured.sql b/crates/zclaw-saas/migrations/20260413000001_knowledge_visibility_structured.sql new file mode 100644 index 0000000..713ccce --- /dev/null +++ b/crates/zclaw-saas/migrations/20260413000001_knowledge_visibility_structured.sql @@ -0,0 +1,77 @@ +-- Phase A: 知识库可见性隔离 + 结构化数据源 +-- 1. knowledge_items 增加 visibility + account_id (公共/私有隔离) +-- 2. 新建 structured_sources (Excel/CSV 数据源元数据) +-- 3. 新建 structured_rows (行级 JSONB 存储) + +-- ============================================================ +-- 1. knowledge_items 可见性扩展 +-- ============================================================ + +ALTER TABLE knowledge_items + ADD COLUMN IF NOT EXISTS visibility VARCHAR(20) DEFAULT 'public' + CHECK (visibility IN ('public', 'private')); + +ALTER TABLE knowledge_items + ADD COLUMN IF NOT EXISTS account_id TEXT REFERENCES accounts(id); + +-- NULL account_id + public = Admin 上传的公共知识 +-- 有 account_id + private = 用户私有知识 + +CREATE INDEX IF NOT EXISTS idx_ki_visibility + ON knowledge_items(visibility, account_id) + WHERE visibility = 'private'; + +-- ============================================================ +-- 2. 结构化数据源 (Excel / CSV) +-- ============================================================ + +CREATE TABLE IF NOT EXISTS structured_sources ( + id TEXT PRIMARY KEY, + account_id TEXT REFERENCES accounts(id), -- NULL=公共 (Admin上传) + title VARCHAR(255) NOT NULL, -- "2026春季面料目录" + description TEXT, + original_file_name VARCHAR(500), + sheet_names TEXT[] DEFAULT '{}', -- 工作表名称列表 + row_count INT DEFAULT 0, + column_headers TEXT[] DEFAULT '{}', -- 合并所有列头 (用于搜索发现) + visibility VARCHAR(20) DEFAULT 'public' + CHECK (visibility IN ('public', 'private')), + industry_id TEXT, -- 关联行业 (可选) + status VARCHAR(20) DEFAULT 'active' + CHECK (status IN ('active', 'archived')), + created_by TEXT NOT NULL REFERENCES accounts(id), + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_ss_visibility + ON structured_sources(visibility, account_id) + WHERE visibility = 'private'; + +CREATE INDEX IF NOT EXISTS idx_ss_industry + ON structured_sources(industry_id) + WHERE industry_id IS NOT NULL; + +-- ============================================================ +-- 3. 结构化数据行 (Excel 每行一条) +-- ============================================================ + +CREATE TABLE IF NOT EXISTS structured_rows ( + id TEXT PRIMARY KEY, + source_id TEXT NOT NULL REFERENCES structured_sources(id) ON DELETE CASCADE, + sheet_name VARCHAR(255), -- 工作表名称 + row_index INT NOT NULL, -- 行号 + headers TEXT[] NOT NULL, -- 列头 ["型号","面料","克重","价格"] + row_data JSONB NOT NULL, -- {"型号":"A100","面料":"纯棉","克重":200,"价格":45} + created_at TIMESTAMPTZ DEFAULT NOW() +); + +-- JSONB GIN 索引: 支持对 row_data 任意字段精确查询 +CREATE INDEX IF NOT EXISTS idx_sr_data + ON structured_rows USING GIN(row_data jsonb_path_ops); + +CREATE INDEX IF NOT EXISTS idx_sr_source + ON structured_rows(source_id); + +CREATE UNIQUE INDEX IF NOT EXISTS idx_sr_source_row + ON structured_rows(source_id, sheet_name, row_index); diff --git a/crates/zclaw-saas/migrations/down/20260413000001_knowledge_visibility_structured.sql b/crates/zclaw-saas/migrations/down/20260413000001_knowledge_visibility_structured.sql new file mode 100644 index 0000000..de3c029 --- /dev/null +++ b/crates/zclaw-saas/migrations/down/20260413000001_knowledge_visibility_structured.sql @@ -0,0 +1,7 @@ +-- Down migration: 知识库可见性隔离 + 结构化数据源 + +DROP TABLE IF EXISTS structured_rows; +DROP TABLE IF EXISTS structured_sources; + +ALTER TABLE knowledge_items DROP COLUMN IF EXISTS visibility; +ALTER TABLE knowledge_items DROP COLUMN IF EXISTS account_id; diff --git a/crates/zclaw-saas/src/knowledge/handlers.rs b/crates/zclaw-saas/src/knowledge/handlers.rs index d5aaf8e..735cc3b 100644 --- a/crates/zclaw-saas/src/knowledge/handlers.rs +++ b/crates/zclaw-saas/src/knowledge/handlers.rs @@ -190,7 +190,8 @@ pub async fn create_item( return Err(SaasError::InvalidInput("内容不能超过 100KB".into())); } - let item = service::create_item(&state.db, &ctx.account_id, &req).await?; + let is_admin = ctx.role == "admin" || ctx.role == "super_admin"; + let item = service::create_item(&state.db, &ctx.account_id, &req, is_admin).await?; // 异步触发 embedding 生成 if let Err(e) = state.worker_dispatcher.dispatch( @@ -219,6 +220,7 @@ pub async fn batch_create_items( return Err(SaasError::InvalidInput("单次批量创建不能超过 50 条".into())); } + let is_admin = ctx.role == "admin" || ctx.role == "super_admin"; let mut created = Vec::new(); for req in &items { if req.title.trim().is_empty() || req.content.trim().is_empty() { @@ -229,7 +231,7 @@ pub async fn batch_create_items( tracing::warn!("Batch create: skipping item '{}' (content too long)", req.title); continue; } - match service::create_item(&state.db, &ctx.account_id, req).await { + match service::create_item(&state.db, &ctx.account_id, req, is_admin).await { Ok(item) => { if let Err(e) = state.worker_dispatcher.dispatch( "generate_embedding", @@ -534,6 +536,7 @@ pub async fn import_items( return Err(SaasError::InvalidInput("单次导入不能超过 20 个文件".into())); } + let is_admin = ctx.role == "admin" || ctx.role == "super_admin"; let mut created = Vec::new(); for file in &req.files { // 内容长度检查(数据库限制 100KB) @@ -561,9 +564,10 @@ pub async fn import_items( related_questions: None, priority: None, tags: file.tags.clone(), + visibility: None, }; - match service::create_item(&state.db, &ctx.account_id, &item_req).await { + match service::create_item(&state.db, &ctx.account_id, &item_req, is_admin).await { Ok(item) => { if let Err(e) = state.worker_dispatcher.dispatch( "generate_embedding", @@ -590,3 +594,94 @@ pub async fn import_items( fn check_permission(ctx: &AuthContext, permission: &str) -> SaasResult<()> { crate::auth::handlers::check_permission(ctx, permission) } + +fn is_admin(ctx: &AuthContext) -> bool { + ctx.role == "admin" || ctx.role == "super_admin" +} + +// === 结构化数据源管理 === + +/// GET /api/v1/structured/sources +pub async fn list_structured_sources( + State(state): State, + Extension(ctx): Extension, + Query(query): Query, +) -> SaasResult> { + check_permission(&ctx, "knowledge:read")?; + let page = query.page.unwrap_or(1).max(1); + let page_size = query.page_size.unwrap_or(20).max(1).min(100); + + let (sources, total) = service::list_structured_sources( + &state.db, + Some(&ctx.account_id), + query.industry_id.as_deref(), + query.status.as_deref(), + page, + page_size, + ).await?; + + Ok(Json(serde_json::json!({ + "items": sources, + "total": total, + "page": page, + "page_size": page_size, + }))) +} + +/// GET /api/v1/structured/sources/:id +pub async fn get_structured_source( + State(state): State, + Extension(ctx): Extension, + Path(id): Path, +) -> SaasResult> { + check_permission(&ctx, "knowledge:read")?; + let source = service::get_structured_source(&state.db, &id, Some(&ctx.account_id)).await? + .ok_or_else(|| SaasError::NotFound("数据源不存在".into()))?; + Ok(Json(serde_json::to_value(source).unwrap_or_default())) +} + +/// GET /api/v1/structured/sources/:id/rows +pub async fn list_structured_source_rows( + State(state): State, + Extension(ctx): Extension, + Path(id): Path, + Query(query): Query, +) -> SaasResult> { + check_permission(&ctx, "knowledge:read")?; + let page = query.page.unwrap_or(1).max(1); + let page_size = query.page_size.unwrap_or(50).max(1).min(200); + + let (rows, total) = service::list_structured_rows( + &state.db, &id, Some(&ctx.account_id), + query.sheet_name.as_deref(), page, page_size, + ).await?; + + Ok(Json(serde_json::json!({ + "rows": rows, + "total": total, + "page": page, + "page_size": page_size, + }))) +} + +/// DELETE /api/v1/structured/sources/:id +pub async fn delete_structured_source( + State(state): State, + Extension(ctx): Extension, + Path(id): Path, +) -> SaasResult> { + check_permission(&ctx, "knowledge:admin")?; + service::delete_structured_source(&state.db, &id).await?; + Ok(Json(serde_json::json!({"deleted": true}))) +} + +/// POST /api/v1/structured/query +pub async fn query_structured( + State(state): State, + Extension(ctx): Extension, + Json(req): Json, +) -> SaasResult>> { + check_permission(&ctx, "knowledge:search")?; + let results = service::query_structured(&state.db, &req, Some(&ctx.account_id)).await?; + Ok(Json(results)) +} diff --git a/crates/zclaw-saas/src/knowledge/mod.rs b/crates/zclaw-saas/src/knowledge/mod.rs index 7f1fa0f..65fc4d0 100644 --- a/crates/zclaw-saas/src/knowledge/mod.rs +++ b/crates/zclaw-saas/src/knowledge/mod.rs @@ -1,4 +1,4 @@ -//! 知识库模块 — 行业知识管理、RAG 检索、版本控制 +//! 知识库模块 — 行业知识管理、RAG 检索、版本控制、结构化数据 pub mod types; pub mod service; @@ -36,4 +36,10 @@ pub fn routes() -> axum::Router { .route("/api/v1/knowledge/analytics/top-items", get(handlers::analytics_top_items)) .route("/api/v1/knowledge/analytics/quality", get(handlers::analytics_quality)) .route("/api/v1/knowledge/analytics/gaps", get(handlers::analytics_gaps)) + // 结构化数据源管理 + .route("/api/v1/structured/sources", get(handlers::list_structured_sources)) + .route("/api/v1/structured/sources/:id", get(handlers::get_structured_source)) + .route("/api/v1/structured/sources/:id/rows", get(handlers::list_structured_source_rows)) + .route("/api/v1/structured/sources/:id", delete(handlers::delete_structured_source)) + .route("/api/v1/structured/query", post(handlers::query_structured)) } diff --git a/crates/zclaw-saas/src/knowledge/service.rs b/crates/zclaw-saas/src/knowledge/service.rs index 6108a06..81428d9 100644 --- a/crates/zclaw-saas/src/knowledge/service.rs +++ b/crates/zclaw-saas/src/knowledge/service.rs @@ -276,6 +276,7 @@ pub async fn create_item( pool: &PgPool, account_id: &str, req: &CreateItemRequest, + is_admin: bool, ) -> SaasResult { let id = uuid::Uuid::new_v4().to_string(); let keywords = req.keywords.as_deref().unwrap_or(&[]); @@ -283,6 +284,16 @@ pub async fn create_item( let priority = req.priority.unwrap_or(0); let tags = req.tags.as_deref().unwrap_or(&[]); + // visibility: Admin 默认 public,普通用户默认 private + let visibility = req.visibility.as_deref().unwrap_or_else(|| { + if is_admin { "public" } else { "private" } + }); + if !is_admin && visibility == "public" { + return Err(crate::error::SaasError::InvalidInput( + "普通用户只能创建私有知识条目".into(), + )); + } + // 验证 category_id 存在性 let cat_exists: bool = sqlx::query_scalar( "SELECT EXISTS(SELECT 1 FROM knowledge_categories WHERE id = $1)" @@ -299,10 +310,12 @@ pub async fn create_item( // 使用事务保证 item + version 原子性 let mut tx = pool.begin().await?; + let item_account_id: Option<&str> = if visibility == "public" { None } else { Some(account_id) }; + let item = sqlx::query_as::<_, KnowledgeItem>( "INSERT INTO knowledge_items \ - (id, category_id, title, content, keywords, related_questions, priority, tags, created_by) \ - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) \ + (id, category_id, title, content, keywords, related_questions, priority, tags, created_by, visibility, account_id) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) \ RETURNING *" ) .bind(&id) @@ -314,6 +327,8 @@ pub async fn create_item( .bind(priority) .bind(tags) .bind(account_id) + .bind(visibility) + .bind(item_account_id) .fetch_one(&mut *tx) .await?; @@ -781,3 +796,257 @@ pub async fn analytics_gaps(pool: &PgPool) -> SaasResult { "gaps": gaps.into_iter().map(|(v,)| v).collect::>() })) } + +// === 结构化数据源 CRUD === + +/// 创建结构化数据源 +pub async fn create_structured_source( + pool: &PgPool, + account_id: &str, + is_admin: bool, + req: &CreateStructuredSourceRequest, +) -> SaasResult { + let id = uuid::Uuid::new_v4().to_string(); + let visibility = req.visibility.as_deref().unwrap_or_else(|| { + if is_admin { "public" } else { "private" } + }); + let source_account_id: Option<&str> = if visibility == "public" { None } else { Some(account_id) }; + + let source = sqlx::query_as::<_, StructuredSource>( + "INSERT INTO structured_sources \ + (id, account_id, title, description, original_file_name, sheet_names, column_headers, \ + visibility, industry_id, created_by) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) \ + RETURNING *" + ) + .bind(&id) + .bind(source_account_id) + .bind(&req.title) + .bind(&req.description) + .bind(&req.original_file_name) + .bind(req.sheet_names.as_deref().unwrap_or(&vec![])) + .bind(req.column_headers.as_deref().unwrap_or(&vec![])) + .bind(visibility) + .bind(&req.industry_id) + .bind(account_id) + .fetch_one(pool) + .await?; + + Ok(source) +} + +/// 批量写入结构化数据行 +pub async fn insert_structured_rows( + pool: &PgPool, + source_id: &str, + rows: &[(Option, i32, Vec, serde_json::Value)], +) -> SaasResult { + let mut tx = pool.begin().await?; + let mut count: i64 = 0; + + for (sheet_name, row_index, headers, row_data) in rows { + let row_id = uuid::Uuid::new_v4().to_string(); + sqlx::query( + "INSERT INTO structured_rows (id, source_id, sheet_name, row_index, headers, row_data) \ + VALUES ($1, $2, $3, $4, $5, $6)" + ) + .bind(&row_id) + .bind(source_id) + .bind(sheet_name) + .bind(*row_index) + .bind(headers) + .bind(row_data) + .execute(&mut *tx) + .await?; + count += 1; + } + + sqlx::query( + "UPDATE structured_sources SET row_count = (SELECT COUNT(*) FROM structured_rows WHERE source_id = $1), \ + updated_at = NOW() WHERE id = $1" + ) + .bind(source_id) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(count) +} + +/// 列出结构化数据源(分页,含可见性过滤) +pub async fn list_structured_sources( + pool: &PgPool, + viewer_account_id: Option<&str>, + industry_id: Option<&str>, + status: Option<&str>, + page: i64, + page_size: i64, +) -> SaasResult<(Vec, i64)> { + let offset = (page - 1) * page_size; + + let items: Vec = sqlx::query_as( + "SELECT * FROM structured_sources \ + WHERE (visibility = 'public' OR account_id = $1) \ + AND ($2::text IS NULL OR industry_id = $2) \ + AND ($3::text IS NULL OR status = $3) \ + ORDER BY updated_at DESC \ + LIMIT $4 OFFSET $5" + ) + .bind(viewer_account_id) + .bind(industry_id) + .bind(status) + .bind(page_size) + .bind(offset) + .fetch_all(pool) + .await?; + + let total: (i64,) = sqlx::query_as( + "SELECT COUNT(*) FROM structured_sources \ + WHERE (visibility = 'public' OR account_id = $1) \ + AND ($2::text IS NULL OR industry_id = $2) \ + AND ($3::text IS NULL OR status = $3)" + ) + .bind(viewer_account_id) + .bind(industry_id) + .bind(status) + .fetch_one(pool) + .await?; + + Ok((items, total.0)) +} + +/// 获取结构化数据源详情 +pub async fn get_structured_source( + pool: &PgPool, + source_id: &str, + viewer_account_id: Option<&str>, +) -> SaasResult> { + let source = sqlx::query_as::<_, StructuredSource>( + "SELECT * FROM structured_sources WHERE id = $1 \ + AND (visibility = 'public' OR account_id = $2)" + ) + .bind(source_id) + .bind(viewer_account_id) + .fetch_optional(pool) + .await?; + Ok(source) +} + +/// 列出结构化数据源的行数据(分页) +pub async fn list_structured_rows( + pool: &PgPool, + source_id: &str, + viewer_account_id: Option<&str>, + sheet_name: Option<&str>, + page: i64, + page_size: i64, +) -> SaasResult<(Vec, i64)> { + let source = get_structured_source(pool, source_id, viewer_account_id).await?; + if source.is_none() { + return Err(crate::error::SaasError::NotFound("数据源不存在或无权限".into())); + } + + let offset = (page - 1) * page_size; + let rows: Vec = sqlx::query_as( + "SELECT * FROM structured_rows \ + WHERE source_id = $1 \ + AND ($2::text IS NULL OR sheet_name = $2) \ + ORDER BY row_index \ + LIMIT $3 OFFSET $4" + ) + .bind(source_id) + .bind(sheet_name) + .bind(page_size) + .bind(offset) + .fetch_all(pool) + .await?; + + let total: (i64,) = sqlx::query_as( + "SELECT COUNT(*) FROM structured_rows \ + WHERE source_id = $1 \ + AND ($2::text IS NULL OR sheet_name = $2)" + ) + .bind(source_id) + .bind(sheet_name) + .fetch_one(pool) + .await?; + + Ok((rows, total.0)) +} + +/// 删除结构化数据源(级联删除行) +pub async fn delete_structured_source(pool: &PgPool, source_id: &str) -> SaasResult<()> { + let result = sqlx::query("DELETE FROM structured_sources WHERE id = $1") + .bind(source_id) + .execute(pool) + .await?; + if result.rows_affected() == 0 { + return Err(crate::error::SaasError::NotFound("数据源不存在".into())); + } + Ok(()) +} + +/// 安全的结构化查询(关键词匹配 + 可见性过滤) +pub async fn query_structured( + pool: &PgPool, + request: &StructuredQueryRequest, + viewer_account_id: Option<&str>, +) -> SaasResult> { + let limit = request.limit.unwrap_or(20).min(50); + let pattern = format!("%{}%", + request.query.replace('\\', "\\\\").replace('%', "\\%").replace('_', "\\_") + ); + + let source_filter = if let Some(ref sid) = request.source_id { + format!("AND ss.id = '{}'", sid.replace('\'', "''")) + } else { + String::new() + }; + let industry_filter = if let Some(ref iid) = request.industry_id { + format!("AND ss.industry_id = '{}'", iid.replace('\'', "''")) + } else { + String::new() + }; + + let rows: Vec<(String, String, Vec, serde_json::Value)> = sqlx::query_as( + &format!( + "SELECT sr.source_id, ss.title, sr.headers, sr.row_data \ + FROM structured_rows sr \ + JOIN structured_sources ss ON sr.source_id = ss.id \ + WHERE (ss.visibility = 'public' OR ss.account_id = $1) \ + AND ss.status = 'active' \ + {} {} \ + AND (sr.row_data::text ILIKE $2 \ + OR array_to_string(sr.headers, ' ') ILIKE $2) \ + ORDER BY ss.title, sr.row_index \ + LIMIT {}", + source_filter, industry_filter, limit + ) + ) + .bind(viewer_account_id) + .bind(&pattern) + .fetch_all(pool) + .await?; + + let mut results_map: std::collections::HashMap = + std::collections::HashMap::new(); + + for (source_id, source_title, headers, row_data) in rows { + let entry = results_map.entry(source_id.clone()) + .or_insert_with(|| StructuredQueryResult { + source_id: source_id.clone(), + source_title: source_title.clone(), + headers: headers.clone(), + rows: Vec::new(), + total_matched: 0, + generated_sql: None, + }); + + if let Ok(map) = serde_json::from_value::>(row_data) { + entry.rows.push(map); + } + entry.total_matched += 1; + } + + Ok(results_map.into_values().collect()) +} diff --git a/crates/zclaw-saas/src/knowledge/types.rs b/crates/zclaw-saas/src/knowledge/types.rs index b787aad..fc4a856 100644 --- a/crates/zclaw-saas/src/knowledge/types.rs +++ b/crates/zclaw-saas/src/knowledge/types.rs @@ -2,6 +2,7 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; // === 分类 === @@ -63,6 +64,8 @@ pub struct KnowledgeItem { pub source: String, pub tags: Vec, pub created_by: String, + pub visibility: Option, + pub account_id: Option, pub created_at: DateTime, pub updated_at: DateTime, } @@ -76,6 +79,7 @@ pub struct CreateItemRequest { pub related_questions: Option>, pub priority: Option, pub tags: Option>, + pub visibility: Option, } #[derive(Debug, Deserialize)] @@ -115,6 +119,7 @@ pub struct ItemResponse { pub source: String, pub tags: Vec, pub created_by: String, + pub visibility: Option, pub reference_count: i64, pub created_at: String, pub updated_at: String, @@ -167,14 +172,6 @@ pub struct KnowledgeUsage { // === 搜索 === -#[derive(Debug, Deserialize)] -pub struct SearchRequest { - pub query: String, - pub category_id: Option, - pub limit: Option, - pub min_score: Option, -} - #[derive(Debug, Serialize)] pub struct SearchResult { pub chunk_id: String, @@ -223,3 +220,114 @@ pub struct ImportRequest { pub category_id: String, pub files: Vec, } + +// === 搜索增强 === + +#[derive(Debug, Deserialize)] +pub struct SearchRequest { + pub query: String, + pub category_id: Option, + pub industry_id: Option, + pub search_structured: Option, + pub search_documents: Option, + pub limit: Option, + pub min_score: Option, +} + +#[derive(Debug, Serialize)] +pub struct UnifiedSearchResult { + pub documents: Vec, + pub structured: Vec, +} + +// === 结构化数据源 === + +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] +pub struct StructuredSource { + pub id: String, + pub account_id: Option, + pub title: String, + pub description: Option, + pub original_file_name: Option, + pub sheet_names: Vec, + pub row_count: i32, + pub column_headers: Vec, + pub visibility: Option, + pub industry_id: Option, + pub status: String, + pub created_by: String, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +#[derive(Debug, Deserialize)] +pub struct CreateStructuredSourceRequest { + pub title: String, + pub description: Option, + pub original_file_name: Option, + pub sheet_names: Option>, + pub column_headers: Option>, + pub visibility: Option, + pub industry_id: Option, +} + +#[derive(Debug, Deserialize)] +pub struct ListStructuredSourcesQuery { + pub page: Option, + pub page_size: Option, + pub industry_id: Option, + pub status: Option, +} + +#[derive(Debug, Serialize)] +pub struct StructuredSourceResponse { + pub id: String, + pub title: String, + pub description: Option, + pub original_file_name: Option, + pub sheet_names: Vec, + pub row_count: i64, + pub column_headers: Vec, + pub visibility: Option, + pub industry_id: Option, + pub status: String, + pub created_by: String, + pub created_at: String, + pub updated_at: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] +pub struct StructuredRow { + pub id: String, + pub source_id: String, + pub sheet_name: Option, + pub row_index: i32, + pub headers: Vec, + pub row_data: serde_json::Value, + pub created_at: DateTime, +} + +#[derive(Debug, Deserialize)] +pub struct ListStructuredRowsQuery { + pub page: Option, + pub page_size: Option, + pub sheet_name: Option, +} + +#[derive(Debug, Deserialize)] +pub struct StructuredQueryRequest { + pub query: String, + pub source_id: Option, + pub industry_id: Option, + pub limit: Option, +} + +#[derive(Debug, Serialize)] +pub struct StructuredQueryResult { + pub source_id: String, + pub source_title: String, + pub headers: Vec, + pub rows: Vec>, + pub total_matched: i64, + pub generated_sql: Option, +} diff --git a/crates/zclaw-saas/src/main.rs b/crates/zclaw-saas/src/main.rs index cf3b586..358411b 100644 --- a/crates/zclaw-saas/src/main.rs +++ b/crates/zclaw-saas/src/main.rs @@ -13,6 +13,7 @@ use zclaw_saas::workers::record_usage::RecordUsageWorker; use zclaw_saas::workers::update_last_used::UpdateLastUsedWorker; use zclaw_saas::workers::aggregate_usage::AggregateUsageWorker; use zclaw_saas::workers::generate_embedding::GenerateEmbeddingWorker; +use zclaw_saas::workers::DistillationWorker; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -48,8 +49,18 @@ async fn main() -> anyhow::Result<()> { dispatcher.register(UpdateLastUsedWorker); dispatcher.register(AggregateUsageWorker); dispatcher.register(GenerateEmbeddingWorker); + + // 蒸馏 Worker(需要加密密钥来解密 provider API key) + match config.api_key_encryption_key() { + Ok(enc_key) => { + dispatcher.register(DistillationWorker::new(enc_key)); + info!("DistillationWorker registered"); + } + Err(e) => tracing::warn!("DistillationWorker skipped (no enc key): {}", e), + } + dispatcher.start(); // 必须在所有 register() 之后调用 - info!("Worker dispatcher initialized (7 workers registered)"); + info!("Worker dispatcher initialized (8 workers registered)"); // 优雅停机令牌 — 取消后所有 SSE 流和长连接立即终止 let shutdown_token = CancellationToken::new(); diff --git a/crates/zclaw-saas/src/workers/distill_knowledge.rs b/crates/zclaw-saas/src/workers/distill_knowledge.rs new file mode 100644 index 0000000..956b318 --- /dev/null +++ b/crates/zclaw-saas/src/workers/distill_knowledge.rs @@ -0,0 +1,253 @@ +//! 知识蒸馏 Worker +//! +//! 通过 LLM API 直调生成行业知识条目。 +//! 问题来源:知识缺口 API + 行业关键词 + Self-Instruct +//! 质量过滤:L0 自动过滤(长度/关键词/隐私检测) +//! +//! 成本极低:DeepSeek V3 约 ¥0.001/条,120 条种子知识约 ¥0.5 + +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use sqlx::PgPool; +use crate::error::SaasResult; +use super::Worker; + +#[derive(Debug, Serialize, Deserialize)] +pub struct DistillKnowledgeArgs { + /// 要蒸馏的问题列表 + pub questions: Vec, + /// 目标行业 ID(可选) + pub industry_id: Option, + /// 目标知识分类 ID + pub category_id: String, + /// Provider ID(如 "deepseek") + pub provider_id: String, + /// 模型 ID(如 "deepseek-chat") + pub model_id: String, +} + +pub struct DistillationWorker { + /// TOTP/API Key 加密密钥(用于解密 provider key) + enc_key_bytes: [u8; 32], +} + +impl DistillationWorker { + pub fn new(enc_key: [u8; 32]) -> Self { + Self { enc_key_bytes: enc_key } + } +} + +#[async_trait] +impl Worker for DistillationWorker { + type Args = DistillKnowledgeArgs; + + fn name(&self) -> &str { + "distill_knowledge" + } + + async fn perform(&self, db: &PgPool, args: Self::Args) -> SaasResult<()> { + tracing::info!( + "DistillKnowledge: starting {} questions for category '{}'", + args.questions.len(), + args.category_id, + ); + + // 1. 获取 provider 信息(base_url) + let provider: Option<(String,)> = sqlx::query_as( + "SELECT base_url FROM providers WHERE id = $1" + ) + .bind(&args.provider_id) + .fetch_optional(db) + .await?; + + let base_url = match provider { + Some((url,)) => url.trim_end_matches('/').to_string(), + None => { + tracing::error!("DistillKnowledge: provider '{}' not found", args.provider_id); + return Ok(()); + } + }; + + // 2. 获取可用 API Key + let selection = crate::relay::key_pool::select_best_key( + db, &args.provider_id, &self.enc_key_bytes, + ).await?; + + let api_key = selection.key.key_value.clone(); + let client = reqwest::Client::new(); + + // 3. 逐条蒸馏 + let mut success_count = 0u32; + let mut skip_count = 0u32; + + for question in &args.questions { + match distill_single(&client, &base_url, &api_key, &args.model_id, question).await { + Some(answer) => { + // L0 质量过滤 + if passes_l0_filter(&answer) { + // 入库 + match insert_distilled_item(db, &args, question, &answer).await { + Ok(()) => success_count += 1, + Err(e) => tracing::warn!("DistillKnowledge: insert failed: {}", e), + } + } else { + skip_count += 1; + tracing::debug!("DistillKnowledge: L0 filtered: {}", &question[..question.len().min(50)]); + } + } + None => { + tracing::warn!("DistillKnowledge: no answer for: {}", &question[..question.len().min(50)]); + } + } + } + + tracing::info!( + "DistillKnowledge: completed — {} success, {} filtered, {} total", + success_count, skip_count, args.questions.len(), + ); + + Ok(()) + } +} + +/// 调用 LLM API 获取单个回答 +async fn distill_single( + client: &reqwest::Client, + base_url: &str, + api_key: &str, + model: &str, + question: &str, +) -> Option { + let url = format!("{}/chat/completions", base_url); + + let body = serde_json::json!({ + "model": model, + "messages": [ + { + "role": "system", + "content": "你是行业知识工程师。请用中文简洁回答问题,回答要准确、实用、不超过500字。只提供事实性内容,不做猜测。" + }, + { + "role": "user", + "content": question + } + ], + "temperature": 0.3, + "max_tokens": 1000, + }); + + let response = client + .post(&url) + .header("Authorization", format!("Bearer {}", api_key)) + .header("Content-Type", "application/json") + .json(&body) + .timeout(std::time::Duration::from_secs(30)) + .send() + .await + .ok()?; + + if !response.status().is_success() { + tracing::warn!("DistillKnowledge: API error status: {}", response.status()); + return None; + } + + let json: serde_json::Value = response.json().await.ok()?; + + // 提取回答文本 + json.get("choices")? + .get(0)? + .get("message")? + .get("content")? + .as_str() + .map(|s| s.to_string()) +} + +/// L0 质量过滤:自动过滤低质量内容 +fn passes_l0_filter(content: &str) -> bool { + // 最短长度(至少 20 字符的有效回答) + if content.len() < 20 { + return false; + } + + // 最长限制(100KB 数据库限制,蒸馏内容应远小于此) + if content.len() > 50_000 { + return false; + } + + // 简单隐私检测:不应包含明显敏感信息模式 + let privacy_patterns = [ + "身份证号", "银行卡号", "密码是", "社保号", + ]; + for pattern in &privacy_patterns { + if content.contains(pattern) { + return false; + } + } + + true +} + +/// 将蒸馏结果插入知识库 +async fn insert_distilled_item( + db: &PgPool, + args: &DistillKnowledgeArgs, + question: &str, + answer: &str, +) -> SaasResult<()> { + let id = uuid::Uuid::new_v4().to_string(); + let title = if question.len() > 100 { + format!("{}...", &question[..97]) + } else { + question.to_string() + }; + + // 从回答中提取关键词 + let mut keywords = Vec::new(); + super::generate_embedding::extract_keywords_from_text(answer, &mut keywords); + // 也加入问题中的关键词 + super::generate_embedding::extract_keywords_from_text(question, &mut keywords); + keywords.truncate(30); + + // 构建完整内容 + let content = format!("## {}\n\n{}", question, answer); + + // 插入知识条目 + sqlx::query( + "INSERT INTO knowledge_items \ + (id, category_id, title, content, keywords, priority, status, source, tags, \ + visibility, account_id, created_by) \ + VALUES ($1, $2, $3, $4, $5, 0, 'active', 'distillation', '{}', \ + 'public', NULL, 'system')" + ) + .bind(&id) + .bind(&args.category_id) + .bind(&title) + .bind(&content) + .bind(&keywords) + .execute(db) + .await?; + + // 触发分块(复用 embedding worker 的分块逻辑) + // 注意:这里不用 worker dispatch(避免递归),直接分块 + let chunks = crate::knowledge::service::chunk_content(&content, 512, 64); + for (idx, chunk) in chunks.iter().enumerate() { + let chunk_id = uuid::Uuid::new_v4().to_string(); + let mut chunk_keywords = keywords.clone(); + super::generate_embedding::extract_keywords_from_text(chunk, &mut chunk_keywords); + chunk_keywords.truncate(50); + + sqlx::query( + "INSERT INTO knowledge_chunks (id, item_id, chunk_index, content, keywords, created_at) \ + VALUES ($1, $2, $3, $4, $5, NOW())" + ) + .bind(&chunk_id) + .bind(&id) + .bind(idx as i32) + .bind(chunk) + .bind(&chunk_keywords) + .execute(db) + .await?; + } + + Ok(()) +} diff --git a/crates/zclaw-saas/src/workers/generate_embedding.rs b/crates/zclaw-saas/src/workers/generate_embedding.rs index 1831cf3..9a3d5a7 100644 --- a/crates/zclaw-saas/src/workers/generate_embedding.rs +++ b/crates/zclaw-saas/src/workers/generate_embedding.rs @@ -78,7 +78,7 @@ impl Worker for GenerateEmbeddingWorker { let chunk_id = uuid::Uuid::new_v4().to_string(); let mut chunk_keywords = keywords.clone(); - extract_chunk_keywords(chunk, &mut chunk_keywords); + extract_keywords_from_text(chunk, &mut chunk_keywords); sqlx::query( "INSERT INTO knowledge_chunks (id, item_id, chunk_index, content, keywords, created_at) @@ -112,10 +112,8 @@ impl Worker for GenerateEmbeddingWorker { } } -/// 从 chunk 内容中提取高频中文词组作为补充关键词 -/// -/// 简单策略:提取 2-4 字的连续中文字符段,取出现频率 > 1 的 -fn extract_chunk_keywords(content: &str, keywords: &mut Vec) { +/// 从 chunk 内容中提取高频中文词组作为补充关键词(公开,供 distill_knowledge worker 复用) +pub fn extract_keywords_from_text(content: &str, keywords: &mut Vec) { let chars: Vec = content.chars().collect(); let mut i = 0; diff --git a/crates/zclaw-saas/src/workers/mod.rs b/crates/zclaw-saas/src/workers/mod.rs index b57a4ea..1bf8c2a 100644 --- a/crates/zclaw-saas/src/workers/mod.rs +++ b/crates/zclaw-saas/src/workers/mod.rs @@ -251,6 +251,7 @@ pub mod update_last_used; pub mod record_usage; pub mod aggregate_usage; pub mod generate_embedding; +pub mod distill_knowledge; // 便捷导出 pub use log_operation::LogOperationWorker; @@ -259,3 +260,4 @@ pub use cleanup_refresh_tokens::CleanupRefreshTokensWorker; pub use update_last_used::UpdateLastUsedWorker; pub use record_usage::RecordUsageWorker; pub use aggregate_usage::AggregateUsageWorker; +pub use distill_knowledge::DistillationWorker;