//! 知识库服务层 — CRUD、检索、分析 use sqlx::PgPool; use crate::error::SaasResult; use super::types::*; // === 分类管理 === /// 获取分类树(带条目计数) pub async fn list_categories_tree(pool: &PgPool) -> SaasResult> { let categories: Vec = sqlx::query_as( "SELECT * FROM knowledge_categories ORDER BY sort_order, name" ) .fetch_all(pool) .await?; // 获取每个分类的条目计数 let counts: Vec<(String, i64)> = sqlx::query_as( "SELECT category_id, COUNT(*) FROM knowledge_items WHERE status = 'active' GROUP BY category_id" ) .fetch_all(pool) .await?; let count_map: std::collections::HashMap = counts.into_iter().collect(); // 构建树形结构 let mut roots = Vec::new(); let mut all: Vec = categories.into_iter().map(|c| { let count = *count_map.get(&c.id).unwrap_or(&0); CategoryResponse { id: c.id, name: c.name, description: c.description, parent_id: c.parent_id, icon: c.icon, sort_order: c.sort_order, item_count: count, children: Vec::new(), created_at: c.created_at.to_rfc3339(), updated_at: c.updated_at.to_rfc3339(), } }).collect(); // 构建子节点映射 let mut children_map: std::collections::HashMap> = std::collections::HashMap::new(); for cat in all.drain(..) { if let Some(ref parent_id) = cat.parent_id { children_map.entry(parent_id.clone()).or_default().push(cat); } else { roots.push(cat); } } // 递归填充子节点 fn fill_children( cats: &mut Vec, children_map: &mut std::collections::HashMap>, ) { for cat in cats.iter_mut() { if let Some(children) = children_map.remove(&cat.id) { cat.children = children; fill_children(&mut cat.children, children_map); } // 累加子节点条目数到父节点 let child_count: i64 = cat.children.iter().map(|c| c.item_count).sum(); cat.item_count += child_count; } } fill_children(&mut roots, &mut children_map); Ok(roots) } /// 创建分类 pub async fn create_category( pool: &PgPool, name: &str, description: Option<&str>, parent_id: Option<&str>, icon: Option<&str>, ) -> SaasResult { // 验证 parent_id 存在性 if let Some(pid) = parent_id { let exists: bool = sqlx::query_scalar( "SELECT EXISTS(SELECT 1 FROM knowledge_categories WHERE id = $1)" ) .bind(pid) .fetch_one(pool) .await?; if !exists { return Err(crate::error::SaasError::InvalidInput( format!("父分类 '{}' 不存在", pid), )); } } let id = uuid::Uuid::new_v4().to_string(); let category = sqlx::query_as::<_, KnowledgeCategory>( "INSERT INTO knowledge_categories (id, name, description, parent_id, icon) \ VALUES ($1, $2, $3, $4, $5) RETURNING *" ) .bind(&id) .bind(name) .bind(description) .bind(parent_id) .bind(icon) .fetch_one(pool) .await?; Ok(category) } /// 删除分类(有子分类或条目时拒绝) pub async fn delete_category(pool: &PgPool, category_id: &str) -> SaasResult<()> { // 检查子分类 let child_count: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM knowledge_categories WHERE parent_id = $1" ) .bind(category_id) .fetch_one(pool) .await?; if child_count.0 > 0 { return Err(crate::error::SaasError::InvalidInput( "该分类下有子分类,无法删除".into(), )); } // 检查条目 let item_count: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM knowledge_items WHERE category_id = $1" ) .bind(category_id) .fetch_one(pool) .await?; if item_count.0 > 0 { return Err(crate::error::SaasError::InvalidInput( "该分类下有知识条目,无法删除".into(), )); } let result = sqlx::query("DELETE FROM knowledge_categories WHERE id = $1") .bind(category_id) .execute(pool) .await?; if result.rows_affected() == 0 { return Err(crate::error::SaasError::NotFound("分类不存在".into())); } Ok(()) } /// 更新分类(含循环引用检测 + 深度限制) pub async fn update_category( pool: &PgPool, category_id: &str, name: Option<&str>, description: Option<&str>, parent_id: Option<&str>, icon: Option<&str>, ) -> SaasResult { if let Some(pid) = parent_id { if pid == category_id { return Err(crate::error::SaasError::InvalidInput( "分类不能成为自身的子分类".into(), )); } // 检查新的父级不是当前分类的后代(循环检测) let mut check_id = pid.to_string(); let mut depth = 0; loop { if check_id == category_id { return Err(crate::error::SaasError::InvalidInput( "循环引用:父级分类不能是当前分类的后代".into(), )); } let parent: Option<(Option,)> = sqlx::query_as( "SELECT parent_id FROM knowledge_categories WHERE id = $1" ) .bind(&check_id) .fetch_optional(pool) .await?; match parent { Some((Some(gp),)) => { check_id = gp; depth += 1; if depth > 10 { break; } } _ => break, } } // 检查深度限制(最多 3 层) let mut current_depth = 0; let mut check = pid.to_string(); while let Some((Some(p),)) = sqlx::query_as::<_, (Option,)>( "SELECT parent_id FROM knowledge_categories WHERE id = $1" ) .bind(&check) .fetch_optional(pool) .await? { check = p; current_depth += 1; if current_depth > 10 { break; } } if current_depth >= 3 { return Err(crate::error::SaasError::InvalidInput( "分类层级不能超过 3 层".into(), )); } } let category = sqlx::query_as::<_, KnowledgeCategory>( "UPDATE knowledge_categories SET \ name = COALESCE($1, name), \ description = COALESCE($2, description), \ parent_id = COALESCE($3, parent_id), \ icon = COALESCE($4, icon), \ updated_at = NOW() \ WHERE id = $5 RETURNING *" ) .bind(name) .bind(description) .bind(parent_id) .bind(icon) .bind(category_id) .fetch_optional(pool) .await? .ok_or_else(|| crate::error::SaasError::NotFound("分类不存在".into()))?; Ok(category) } // === 知识条目 CRUD === /// 按分类分页查询条目列表 pub async fn list_items_by_category( pool: &PgPool, category_id: &str, status_filter: &str, page: i64, page_size: i64, ) -> SaasResult<(Vec, i64)> { let offset = (page - 1) * page_size; let items: Vec = sqlx::query_as( "SELECT * FROM knowledge_items \ WHERE category_id = $1 AND status = $2 \ ORDER BY priority DESC, updated_at DESC \ LIMIT $3 OFFSET $4" ) .bind(category_id) .bind(status_filter) .bind(page_size) .bind(offset) .fetch_all(pool) .await?; let total: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM knowledge_items WHERE category_id = $1 AND status = $2" ) .bind(category_id) .bind(status_filter) .fetch_one(pool) .await?; Ok((items, total.0)) } /// 创建知识条目 pub async fn create_item( pool: &PgPool, account_id: &str, req: &CreateItemRequest, is_admin: bool, ) -> SaasResult { let id = uuid::Uuid::new_v4().to_string(); let keywords = req.keywords.as_deref().unwrap_or(&[]); let related_questions = req.related_questions.as_deref().unwrap_or(&[]); let priority = req.priority.unwrap_or(0); let tags = req.tags.as_deref().unwrap_or(&[]); // visibility: Admin 默认 public,普通用户默认 private let visibility = req.visibility.as_deref().unwrap_or_else(|| { if is_admin { "public" } else { "private" } }); if !is_admin && visibility == "public" { return Err(crate::error::SaasError::InvalidInput( "普通用户只能创建私有知识条目".into(), )); } // 验证 category_id 存在性 let cat_exists: bool = sqlx::query_scalar( "SELECT EXISTS(SELECT 1 FROM knowledge_categories WHERE id = $1)" ) .bind(&req.category_id) .fetch_one(pool) .await?; if !cat_exists { return Err(crate::error::SaasError::InvalidInput( format!("分类 '{}' 不存在", req.category_id), )); } // 使用事务保证 item + version 原子性 let mut tx = pool.begin().await?; let item_account_id: Option<&str> = if visibility == "public" { None } else { Some(account_id) }; let item = sqlx::query_as::<_, KnowledgeItem>( "INSERT INTO knowledge_items \ (id, category_id, title, content, keywords, related_questions, priority, tags, created_by, visibility, account_id) \ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) \ RETURNING *" ) .bind(&id) .bind(&req.category_id) .bind(&req.title) .bind(&req.content) .bind(keywords) .bind(related_questions) .bind(priority) .bind(tags) .bind(account_id) .bind(visibility) .bind(item_account_id) .fetch_one(&mut *tx) .await?; // 创建初始版本快照 let version_id = uuid::Uuid::new_v4().to_string(); sqlx::query( "INSERT INTO knowledge_versions \ (id, item_id, version, title, content, keywords, related_questions, created_by) \ VALUES ($1, $2, 1, $3, $4, $5, $6, $7)" ) .bind(&version_id) .bind(&id) .bind(&req.title) .bind(&req.content) .bind(keywords) .bind(related_questions) .bind(account_id) .execute(&mut *tx) .await?; tx.commit().await?; Ok(item) } /// 获取条目详情 pub async fn get_item(pool: &PgPool, item_id: &str) -> SaasResult> { let item = sqlx::query_as::<_, KnowledgeItem>( "SELECT * FROM knowledge_items WHERE id = $1" ) .bind(item_id) .fetch_optional(pool) .await?; Ok(item) } /// 更新条目(含版本快照)— 事务保护防止并发竞态 pub async fn update_item( pool: &PgPool, item_id: &str, account_id: &str, req: &UpdateItemRequest, ) -> SaasResult { // status 验证在事务之前,避免无谓锁占用 const VALID_STATUSES: &[&str] = &["active", "draft", "archived", "deprecated"]; if let Some(ref status) = &req.status { if !VALID_STATUSES.contains(&status.as_str()) { return Err(crate::error::SaasError::InvalidInput( format!("无效的状态值: {},有效值: {}", status, VALID_STATUSES.join(", ")) )); } } let mut tx = pool.begin().await?; // 获取当前条目并锁定行防止并发修改 let current = sqlx::query_as::<_, KnowledgeItem>( "SELECT * FROM knowledge_items WHERE id = $1 FOR UPDATE" ) .bind(item_id) .fetch_optional(&mut *tx) .await? .ok_or_else(|| crate::error::SaasError::NotFound("知识条目不存在".into()))?; // 合并更新 let title = req.title.as_deref().unwrap_or(¤t.title); let content = req.content.as_deref().unwrap_or(¤t.content); let keywords: Vec = req.keywords.as_ref() .or(Some(¤t.keywords)) .unwrap_or(&vec![]) .clone(); let related_questions: Vec = req.related_questions.as_ref() .or(Some(¤t.related_questions)) .unwrap_or(&vec![]) .clone(); let priority = req.priority.unwrap_or(current.priority); let tags: Vec = req.tags.as_ref() .or(Some(¤t.tags)) .unwrap_or(&vec![]) .clone(); // 更新条目 let updated = sqlx::query_as::<_, KnowledgeItem>( "UPDATE knowledge_items SET \ title = $1, content = $2, keywords = $3, related_questions = $4, \ priority = $5, tags = $6, status = COALESCE($7, status), \ version = version + 1, updated_at = NOW() \ WHERE id = $8 RETURNING *" ) .bind(title) .bind(content) .bind(&keywords) .bind(&related_questions) .bind(priority) .bind(&tags) .bind(req.status.as_deref()) .bind(item_id) .fetch_one(&mut *tx) .await?; // 创建版本快照 let version_id = uuid::Uuid::new_v4().to_string(); sqlx::query( "INSERT INTO knowledge_versions \ (id, item_id, version, title, content, keywords, related_questions, \ change_summary, created_by) \ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)" ) .bind(&version_id) .bind(item_id) .bind(updated.version) .bind(title) .bind(content) .bind(&keywords) .bind(&related_questions) .bind(req.change_summary.as_deref()) .bind(account_id) .execute(&mut *tx) .await?; tx.commit().await?; Ok(updated) } /// 删除条目(级联删除 chunks + versions) pub async fn delete_item(pool: &PgPool, item_id: &str) -> SaasResult<()> { let result = sqlx::query("DELETE FROM knowledge_items WHERE id = $1") .bind(item_id) .execute(pool) .await?; if result.rows_affected() == 0 { return Err(crate::error::SaasError::NotFound("知识条目不存在".into())); } Ok(()) } // === 分块 === /// 将内容按 Markdown 标题 + 固定长度分块 pub fn chunk_content(content: &str, max_tokens: usize, overlap: usize) -> Vec { // 先按 Markdown 标题分段 let sections: Vec<&str> = content.split("\n# ").collect(); let mut chunks = Vec::new(); for (i, section) in sections.iter().enumerate() { // 第一个片段保留原始内容,其余片段重新添加标题标记 let section_content = if i == 0 { section.to_string() } else { format!("# {}", section) }; // 磁盘估算 token(中文约 1.5 字符/token) let estimated_tokens = section_content.len() / 2; if estimated_tokens <= max_tokens { if !section_content.trim().is_empty() { chunks.push(section_content.trim().to_string()); } } else { // 超长段落按固定长度切分 let chars: Vec = section_content.chars().collect(); let chunk_chars = max_tokens * 2; // 近似字符数 let overlap_chars = overlap * 2; let mut pos = 0; while pos < chars.len() { let end = (pos + chunk_chars).min(chars.len()); let chunk_str: String = chars[pos..end].iter().collect(); if !chunk_str.trim().is_empty() { chunks.push(chunk_str.trim().to_string()); } pos = if end >= chars.len() { end} else { end.saturating_sub(overlap_chars) }; } } } chunks} // === 搜索 === /// 语义搜索(向量 + 关键词混合) pub async fn search( pool: &PgPool, query: &str, category_id: Option<&str>, limit: i64, min_score: f64, ) -> SaasResult> { // 暂时使用关键词匹配(向量搜索需要 embedding 生成) let pattern = format!("%{}%", query.replace('\\', "\\\\").replace('%', "\\%").replace('_', "\\_")); let results = if let Some(cat_id) = category_id { sqlx::query_as::<_, (String, String, String, String, String, Vec)>( "SELECT kc.id, kc.item_id, ki.title, kcat.name, kc.content, kc.keywords \ FROM knowledge_chunks kc \ JOIN knowledge_items ki ON kc.item_id = ki.id \ JOIN knowledge_categories kcat ON ki.category_id = kcat.id \ WHERE ki.status = 'active' \ AND ki.category_id = $1 \ AND (kc.content ILIKE $2 OR $3 = ANY(kc.keywords)) \ ORDER BY ki.priority DESC \ LIMIT $4" ) .bind(cat_id) .bind(&pattern) .bind(query) .bind(limit) .fetch_all(pool) .await? } else { sqlx::query_as::<_, (String, String, String, String, String, Vec)>( "SELECT kc.id, kc.item_id, ki.title, kcat.name, kc.content, kc.keywords \ FROM knowledge_chunks kc \ JOIN knowledge_items ki ON kc.item_id = ki.id \ JOIN knowledge_categories kcat ON ki.category_id = kcat.id \ WHERE ki.status = 'active' \ AND (kc.content ILIKE $1 OR $2 = ANY(kc.keywords)) \ ORDER BY ki.priority DESC \ LIMIT $3" ) .bind(&pattern) .bind(query) .bind(limit) .fetch_all(pool) .await? }; Ok(results.into_iter().map(|(chunk_id, item_id, title, cat_name, content, keywords)| { // 基于关键词匹配数计算分数:匹配数 / 总查询关键词数 let query_keywords: Vec<&str> = query.split_whitespace().collect(); let matched_count = keywords.iter() .filter(|k| query_keywords.iter().any(|qk| k.to_lowercase().contains(&qk.to_lowercase()))) .count(); let score = if keywords.is_empty() || query_keywords.is_empty() { 0.5 } else { (matched_count as f64 / keywords.len().max(query_keywords.len()) as f64).min(1.0) }; SearchResult { chunk_id, item_id, item_title: title, category_name: cat_name, content, score, keywords, } }).filter(|r| r.score >= min_score).collect()) } // === 统一搜索(双通道合并) === /// 统一搜索:同时检索文档通道和结构化通道 pub async fn unified_search( pool: &PgPool, request: &SearchRequest, viewer_account_id: Option<&str>, ) -> SaasResult { let limit = request.limit.unwrap_or(5).min(10); let search_docs = request.search_documents.unwrap_or(true); let search_struct = request.search_structured.unwrap_or(true); // 文档通道 let documents = if search_docs { search( pool, &request.query, request.category_id.as_deref(), limit, request.min_score.unwrap_or(0.5), ).await? } else { Vec::new() }; // 结构化通道 let structured = if search_struct { query_structured( pool, &StructuredQueryRequest { query: request.query.clone(), source_id: None, industry_id: request.industry_id.clone(), limit: Some(limit), }, viewer_account_id, ).await? } else { Vec::new() }; Ok(UnifiedSearchResult { documents, structured, }) } // === 种子知识冷启动 === /// 为指定行业插入种子知识(幂等) pub async fn seed_knowledge( pool: &PgPool, industry_id: &str, category_id: &str, items: &[(String, String, Vec)], // (title, content, keywords) system_account_id: &str, ) -> SaasResult { let mut created = 0; for (title, content, keywords) in items { if content.trim().is_empty() { continue; } // 幂等:按标题 + source='distillation' + tags 含行业ID 查重 let exists: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM knowledge_items \ WHERE title = $1 AND source = 'distillation' \ AND $2 = ANY(tags)" ) .bind(title) .bind(format!("industry:{}", industry_id)) .fetch_one(pool) .await?; if exists.0 > 0 { continue; } let id = uuid::Uuid::new_v4().to_string(); let now = chrono::Utc::now(); let kw_json = serde_json::to_value(keywords).unwrap_or(serde_json::json!([])); let tags = vec![ format!("industry:{}", industry_id), "source:distillation".to_string(), ]; sqlx::query( "INSERT INTO knowledge_items \ (id, category_id, title, content, keywords, status, priority, visibility, account_id, source, tags, version, created_by, created_at, updated_at) \ VALUES ($1, $8, $2, $3, $4, 'active', 5, 'public', NULL, \ 'distillation', $5, 1, $6, $7, $7)" ) .bind(&id) .bind(title) .bind(content) .bind(&kw_json) .bind(&tags) .bind(system_account_id) .bind(&now) .bind(category_id) .execute(pool) .await?; created += 1; } Ok(created) } // === 分析 === /// 分析总览 pub async fn analytics_overview(pool: &PgPool) -> SaasResult { let total_items: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM knowledge_items" ) .fetch_one(pool) .await?; let active_items: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM knowledge_items WHERE status = 'active'" ) .fetch_one(pool) .await?; let total_categories: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM knowledge_categories" ) .fetch_one(pool) .await?; let weekly_new: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM knowledge_items WHERE created_at >= NOW() - interval '7 days'" ) .fetch_one(pool) .await?; let total_refs: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM knowledge_usage" ) .fetch_one(pool) .await?; let injected: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM knowledge_usage WHERE was_injected = true" ) .fetch_one(pool) .await?; let positive: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM knowledge_usage WHERE agent_feedback = 'positive'" ) .fetch_one(pool) .await?; let with_feedback: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM knowledge_usage WHERE agent_feedback IS NOT NULL" ) .fetch_one(pool) .await?; let stale: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM knowledge_items ki \ WHERE ki.status = 'active' \ AND NOT EXISTS (SELECT 1 FROM knowledge_usage ku WHERE ku.item_id = ki.id AND ku.created_at >= NOW() - interval '90 days')" ) .fetch_one(pool) .await?; let hit_rate = if total_refs.0 > 0 { with_feedback.0 as f64 / total_refs.0 as f64 } else { 0.0 }; let injection_rate = if total_refs.0 > 0 { injected.0 as f64 / total_refs.0 as f64 } else { 0.0 }; let positive_rate = if total_refs.0 > 0 { positive.0 as f64 / total_refs.0 as f64 } else { 0.0 }; Ok(AnalyticsOverview { total_items: total_items.0, active_items: active_items.0, total_categories: total_categories.0, weekly_new_items: weekly_new.0, total_references: total_refs.0, avg_reference_per_item: if total_items.0 > 0 { total_refs.0 as f64 / total_items.0 as f64 } else { 0.0 }, hit_rate, injection_rate, positive_feedback_rate: positive_rate, stale_items_count: stale.0, }) } /// 回滚到指定版本(创建新版本快照) pub async fn rollback_version( pool: &PgPool, item_id: &str, target_version: i32, account_id: &str, ) -> SaasResult { // 使用事务保证原子性,防止并发回滚冲突 let mut tx = pool.begin().await?; // 获取目标版本 let version: KnowledgeVersion = sqlx::query_as( "SELECT * FROM knowledge_versions WHERE item_id = $1 AND version = $2" ) .bind(item_id) .bind(target_version) .fetch_optional(&mut *tx) .await? .ok_or_else(|| crate::error::SaasError::NotFound("版本不存在".into()))?; // 锁定当前条目行防止并发修改(SELECT FOR UPDATE) let current: Option<(i32,)> = sqlx::query_as( "SELECT version FROM knowledge_items WHERE id = $1 FOR UPDATE" ) .bind(item_id) .fetch_optional(&mut *tx) .await?; let current_version = current .ok_or_else(|| crate::error::SaasError::NotFound("知识条目不存在".into()))? .0; // 防止版本无限递增: 最多 100 个版本 if current_version >= 100 { return Err(crate::error::SaasError::InvalidInput( "版本数已达上限(100),请考虑合并历史版本".into(), )); } let new_version = current_version + 1; // 更新条目为该版本内容 let updated = sqlx::query_as::<_, KnowledgeItem>( "UPDATE knowledge_items SET \ title = $1, content = $2, keywords = $3, related_questions = $4, \ version = $5, updated_at = NOW() \ WHERE id = $6 RETURNING *" ) .bind(&version.title) .bind(&version.content) .bind(&version.keywords) .bind(&version.related_questions) .bind(new_version) .bind(item_id) .fetch_one(&mut *tx) .await?; // 创建新版本快照(记录回滚来源) let version_id = uuid::Uuid::new_v4().to_string(); let summary = format!("回滚到版本 {}(当前版本 {} → 新版本 {})", target_version, current_version, new_version); sqlx::query( "INSERT INTO knowledge_versions \ (id, item_id, version, title, content, keywords, related_questions, \ change_summary, created_by) \ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)" ) .bind(&version_id) .bind(item_id) .bind(new_version) .bind(&updated.title) .bind(&updated.content) .bind(&updated.keywords) .bind(&updated.related_questions) .bind(&summary) .bind(account_id) .execute(&mut *tx) .await?; tx.commit().await?; Ok(updated) } /// 质量指标(按分类分组) pub async fn analytics_quality(pool: &PgPool) -> SaasResult { let quality: Vec<(serde_json::Value,)> = sqlx::query_as( "SELECT json_build_object( 'category', kc.name, 'total', COUNT(ki.id), 'active', COUNT(CASE WHEN ki.status = 'active' THEN 1 END), 'with_keywords', COUNT(CASE WHEN array_length(ki.keywords, 1) > 0 THEN 1 END), 'avg_priority', COALESCE(AVG(ki.priority), 0) ) as row \ FROM knowledge_categories kc \ LEFT JOIN knowledge_items ki ON ki.category_id = kc.id \ GROUP BY kc.id, kc.name \ ORDER BY COUNT(ki.id) DESC" ) .fetch_all(pool) .await .unwrap_or_else(|e| { tracing::warn!("analytics_quality query failed: {}", e); vec![] }); Ok(serde_json::json!({ "categories": quality.into_iter().map(|(v,)| v).collect::>() })) } /// 知识缺口检测(低分查询聚类) pub async fn analytics_gaps(pool: &PgPool) -> SaasResult { let gaps: Vec<(serde_json::Value,)> = sqlx::query_as( "SELECT json_build_object( 'query', ku.query_text, 'count', COUNT(*), 'avg_score', COALESCE(AVG(ku.relevance_score), 0) ) as row \ FROM knowledge_usage ku \ WHERE ku.created_at >= NOW() - interval '30 days' \ AND (ku.relevance_score IS NULL OR ku.relevance_score < 0.5) \ AND ku.query_text IS NOT NULL \ GROUP BY ku.query_text \ ORDER BY COUNT(*) DESC \ LIMIT 20" ) .fetch_all(pool) .await .unwrap_or_else(|e| { tracing::warn!("analytics_gaps query failed: {}", e); vec![] }); Ok(serde_json::json!({ "gaps": gaps.into_iter().map(|(v,)| v).collect::>() })) } // === 结构化数据源 CRUD === /// 创建结构化数据源 pub async fn create_structured_source( pool: &PgPool, account_id: &str, is_admin: bool, req: &CreateStructuredSourceRequest, ) -> SaasResult { let id = uuid::Uuid::new_v4().to_string(); let visibility = req.visibility.as_deref().unwrap_or_else(|| { if is_admin { "public" } else { "private" } }); let source_account_id: Option<&str> = if visibility == "public" { None } else { Some(account_id) }; let source = sqlx::query_as::<_, StructuredSource>( "INSERT INTO structured_sources \ (id, account_id, title, description, original_file_name, sheet_names, column_headers, \ visibility, industry_id, created_by) \ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) \ RETURNING *" ) .bind(&id) .bind(source_account_id) .bind(&req.title) .bind(&req.description) .bind(&req.original_file_name) .bind(req.sheet_names.as_deref().unwrap_or(&vec![])) .bind(req.column_headers.as_deref().unwrap_or(&vec![])) .bind(visibility) .bind(&req.industry_id) .bind(account_id) .fetch_one(pool) .await?; Ok(source) } /// 批量写入结构化数据行 pub async fn insert_structured_rows( pool: &PgPool, source_id: &str, rows: &[(Option, i32, Vec, serde_json::Value)], ) -> SaasResult { let mut tx = pool.begin().await?; let mut count: i64 = 0; for (sheet_name, row_index, headers, row_data) in rows { let row_id = uuid::Uuid::new_v4().to_string(); sqlx::query( "INSERT INTO structured_rows (id, source_id, sheet_name, row_index, headers, row_data) \ VALUES ($1, $2, $3, $4, $5, $6)" ) .bind(&row_id) .bind(source_id) .bind(sheet_name) .bind(*row_index) .bind(headers) .bind(row_data) .execute(&mut *tx) .await?; count += 1; } sqlx::query( "UPDATE structured_sources SET row_count = (SELECT COUNT(*) FROM structured_rows WHERE source_id = $1), \ updated_at = NOW() WHERE id = $1" ) .bind(source_id) .execute(&mut *tx) .await?; tx.commit().await?; Ok(count) } /// 列出结构化数据源(分页,含可见性过滤) pub async fn list_structured_sources( pool: &PgPool, viewer_account_id: Option<&str>, industry_id: Option<&str>, status: Option<&str>, page: i64, page_size: i64, ) -> SaasResult<(Vec, i64)> { let offset = (page - 1) * page_size; let items: Vec = sqlx::query_as( "SELECT * FROM structured_sources \ WHERE (visibility = 'public' OR account_id = $1) \ AND ($2::text IS NULL OR industry_id = $2) \ AND ($3::text IS NULL OR status = $3) \ ORDER BY updated_at DESC \ LIMIT $4 OFFSET $5" ) .bind(viewer_account_id) .bind(industry_id) .bind(status) .bind(page_size) .bind(offset) .fetch_all(pool) .await?; let total: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM structured_sources \ WHERE (visibility = 'public' OR account_id = $1) \ AND ($2::text IS NULL OR industry_id = $2) \ AND ($3::text IS NULL OR status = $3)" ) .bind(viewer_account_id) .bind(industry_id) .bind(status) .fetch_one(pool) .await?; Ok((items, total.0)) } /// 获取结构化数据源详情 pub async fn get_structured_source( pool: &PgPool, source_id: &str, viewer_account_id: Option<&str>, ) -> SaasResult> { let source = sqlx::query_as::<_, StructuredSource>( "SELECT * FROM structured_sources WHERE id = $1 \ AND (visibility = 'public' OR account_id = $2)" ) .bind(source_id) .bind(viewer_account_id) .fetch_optional(pool) .await?; Ok(source) } /// 列出结构化数据源的行数据(分页) pub async fn list_structured_rows( pool: &PgPool, source_id: &str, viewer_account_id: Option<&str>, sheet_name: Option<&str>, page: i64, page_size: i64, ) -> SaasResult<(Vec, i64)> { let source = get_structured_source(pool, source_id, viewer_account_id).await?; if source.is_none() { return Err(crate::error::SaasError::NotFound("数据源不存在或无权限".into())); } let offset = (page - 1) * page_size; let rows: Vec = sqlx::query_as( "SELECT * FROM structured_rows \ WHERE source_id = $1 \ AND ($2::text IS NULL OR sheet_name = $2) \ ORDER BY row_index \ LIMIT $3 OFFSET $4" ) .bind(source_id) .bind(sheet_name) .bind(page_size) .bind(offset) .fetch_all(pool) .await?; let total: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM structured_rows \ WHERE source_id = $1 \ AND ($2::text IS NULL OR sheet_name = $2)" ) .bind(source_id) .bind(sheet_name) .fetch_one(pool) .await?; Ok((rows, total.0)) } /// 删除结构化数据源(级联删除行) pub async fn delete_structured_source(pool: &PgPool, source_id: &str) -> SaasResult<()> { let result = sqlx::query("DELETE FROM structured_sources WHERE id = $1") .bind(source_id) .execute(pool) .await?; if result.rows_affected() == 0 { return Err(crate::error::SaasError::NotFound("数据源不存在".into())); } Ok(()) } /// 安全的结构化查询(关键词匹配 + 可见性过滤) pub async fn query_structured( pool: &PgPool, request: &StructuredQueryRequest, viewer_account_id: Option<&str>, ) -> SaasResult> { let limit = request.limit.unwrap_or(20).min(50); let pattern = format!("%{}%", request.query.replace('\\', "\\\\").replace('%', "\\%").replace('_', "\\_") ); let source_filter = if let Some(ref sid) = request.source_id { format!("AND ss.id = '{}'", sid.replace('\'', "''")) } else { String::new() }; let industry_filter = if let Some(ref iid) = request.industry_id { format!("AND ss.industry_id = '{}'", iid.replace('\'', "''")) } else { String::new() }; let rows: Vec<(String, String, Vec, serde_json::Value)> = sqlx::query_as( &format!( "SELECT sr.source_id, ss.title, sr.headers, sr.row_data \ FROM structured_rows sr \ JOIN structured_sources ss ON sr.source_id = ss.id \ WHERE (ss.visibility = 'public' OR ss.account_id = $1) \ AND ss.status = 'active' \ {} {} \ AND (sr.row_data::text ILIKE $2 \ OR array_to_string(sr.headers, ' ') ILIKE $2) \ ORDER BY ss.title, sr.row_index \ LIMIT {}", source_filter, industry_filter, limit ) ) .bind(viewer_account_id) .bind(&pattern) .fetch_all(pool) .await?; let mut results_map: std::collections::HashMap = std::collections::HashMap::new(); for (source_id, source_title, headers, row_data) in rows { let entry = results_map.entry(source_id.clone()) .or_insert_with(|| StructuredQueryResult { source_id: source_id.clone(), source_title: source_title.clone(), headers: headers.clone(), rows: Vec::new(), total_matched: 0, generated_sql: None, }); if let Ok(map) = serde_json::from_value::>(row_data) { entry.rows.push(map); } entry.total_matched += 1; } Ok(results_map.into_values().collect()) }