feat(saas): add knowledge base module — categories, items, versions, search, analytics

- 5 knowledge tables (categories, items, chunks, versions, usage) with pgvector + HNSW + GIN indexes
- 23+ API routes covering full CRUD, tree-structured categories, version snapshots
- Keyword-based search with ILIKE + array match (placeholder for vector search)
- Analytics endpoints: overview, trends, top-items, quality, gaps
- Markdown-aware content chunking with overlap strategy
- Worker dispatch for async embedding generation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
iven
2026-04-02 00:21:28 +08:00
parent b66087de0e
commit ef60f9a183
8 changed files with 1194 additions and 1 deletions

View File

@@ -0,0 +1,457 @@
//! 知识库服务层 — CRUD、检索、分析
use sqlx::PgPool;
use crate::error::SaasResult;
use super::types::*;
// === 分类管理 ===
/// 获取分类树(带条目计数)
pub async fn list_categories_tree(pool: &PgPool) -> SaasResult<Vec<CategoryResponse>> {
let categories: Vec<KnowledgeCategory> = sqlx::query_as(
"SELECT * FROM knowledge_categories ORDER BY sort_order, name"
)
.fetch_all(pool)
.await?;
// 获取每个分类的条目计数
let counts: Vec<(String, i64)> = sqlx::query_as(
"SELECT category_id, COUNT(*) FROM knowledge_items WHERE status = 'active' GROUP BY category_id"
)
.fetch_all(pool)
.await?;
let count_map: std::collections::HashMap<String, i64> = counts.into_iter().collect();
// 构建树形结构
let mut roots = Vec::new();
let mut all: Vec<CategoryResponse> = categories.into_iter().map(|c| {
let count = *count_map.get(&c.id).unwrap_or(&0);
CategoryResponse {
id: c.id,
name: c.name,
description: c.description,
parent_id: c.parent_id,
icon: c.icon,
sort_order: c.sort_order,
item_count: count,
children: Vec::new(),
created_at: c.created_at.to_rfc3339(),
updated_at: c.updated_at.to_rfc3339(),
}
}).collect();
// 构建子节点映射
let mut children_map: std::collections::HashMap<String, Vec<CategoryResponse>> =
std::collections::HashMap::new();
for cat in all.drain(..) {
if let Some(ref parent_id) = cat.parent_id {
children_map.entry(parent_id.clone()).or_default().push(cat);
} else {
roots.push(cat);
}
}
// 递归填充子节点
fn fill_children(
cats: &mut Vec<CategoryResponse>,
children_map: &mut std::collections::HashMap<String, Vec<CategoryResponse>>,
) {
for cat in cats.iter_mut() {
if let Some(children) = children_map.remove(&cat.id) {
cat.children = children;
fill_children(&mut cat.children, children_map);
}
// 累加子节点条目数到父节点
let child_count: i64 = cat.children.iter().map(|c| c.item_count).sum();
cat.item_count += child_count;
}
}
fill_children(&mut roots, &mut children_map);
Ok(roots)
}
/// 创建分类
pub async fn create_category(
pool: &PgPool,
name: &str,
description: Option<&str>,
parent_id: Option<&str>,
icon: Option<&str>,
) -> SaasResult<KnowledgeCategory> {
let id = uuid::Uuid::new_v4().to_string();
let category = sqlx::query_as::<_, KnowledgeCategory>(
"INSERT INTO knowledge_categories (id, name, description, parent_id, icon) \
VALUES ($1, $2, $3, $4, $5) RETURNING *"
)
.bind(&id)
.bind(name)
.bind(description)
.bind(parent_id)
.bind(icon)
.fetch_one(pool)
.await?;
Ok(category)
}
/// 删除分类(有子分类或条目时拒绝)
pub async fn delete_category(pool: &PgPool, category_id: &str) -> SaasResult<()> {
// 检查子分类
let child_count: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM knowledge_categories WHERE parent_id = $1"
)
.bind(category_id)
.fetch_one(pool)
.await?;
if child_count.0 > 0 {
return Err(crate::error::SaasError::InvalidInput(
"该分类下有子分类,无法删除".into(),
));
}
// 检查条目
let item_count: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM knowledge_items WHERE category_id = $1"
)
.bind(category_id)
.fetch_one(pool)
.await?;
if item_count.0 > 0 {
return Err(crate::error::SaasError::InvalidInput(
"该分类下有知识条目,无法删除".into(),
));
}
sqlx::query("DELETE FROM knowledge_categories WHERE id = $1")
.bind(category_id)
.execute(pool)
.await?;
Ok(())
}
// === 知识条目 CRUD ===
/// 创建知识条目
pub async fn create_item(
pool: &PgPool,
account_id: &str,
req: &CreateItemRequest,
) -> SaasResult<KnowledgeItem> {
let id = uuid::Uuid::new_v4().to_string();
let keywords = req.keywords.as_deref().unwrap_or(&[]);
let related_questions = req.related_questions.as_deref().unwrap_or(&[]);
let priority = req.priority.unwrap_or(0);
let tags = req.tags.as_deref().unwrap_or(&[]);
let item = sqlx::query_as::<_, KnowledgeItem>(
"INSERT INTO knowledge_items \
(id, category_id, title, content, keywords, related_questions, priority, tags, created_by) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) \
RETURNING *"
)
.bind(&id)
.bind(&req.category_id)
.bind(&req.title)
.bind(&req.content)
.bind(keywords)
.bind(related_questions)
.bind(priority)
.bind(tags)
.bind(account_id)
.fetch_one(pool)
.await?;
// 创建初始版本快照
let version_id = uuid::Uuid::new_v4().to_string();
sqlx::query(
"INSERT INTO knowledge_versions \
(id, item_id, version, title, content, keywords, related_questions, created_by) \
VALUES ($1, $2, 1, $3, $4, $5, $6, $7)"
)
.bind(&version_id)
.bind(&id)
.bind(&req.title)
.bind(&req.content)
.bind(keywords)
.bind(related_questions)
.bind(account_id)
.execute(pool)
.await?;
Ok(item)
}
/// 获取条目详情
pub async fn get_item(pool: &PgPool, item_id: &str) -> SaasResult<Option<KnowledgeItem>> {
let item = sqlx::query_as::<_, KnowledgeItem>(
"SELECT * FROM knowledge_items WHERE id = $1"
)
.bind(item_id)
.fetch_optional(pool)
.await?;
Ok(item)
}
/// 更新条目(含版本快照)
pub async fn update_item(
pool: &PgPool,
item_id: &str,
account_id: &str,
req: &UpdateItemRequest,
) -> SaasResult<KnowledgeItem> {
// 获取当前条目
let current = sqlx::query_as::<_, KnowledgeItem>(
"SELECT * FROM knowledge_items WHERE id = $1"
)
.bind(item_id)
.fetch_optional(pool)
.await?
.ok_or_else(|| crate::error::SaasError::NotFound("知识条目不存在".into()))?;
// 合并更新
let title = req.title.as_deref().unwrap_or(&current.title);
let content = req.content.as_deref().unwrap_or(&current.content);
let keywords: Vec<String> = req.keywords.as_ref()
.or(Some(&current.keywords))
.unwrap_or(&vec![])
.clone();
let related_questions: Vec<String> = req.related_questions.as_ref()
.or(Some(&current.related_questions))
.unwrap_or(&vec![])
.clone();
let priority = req.priority.unwrap_or(current.priority);
let tags: Vec<String> = req.tags.as_ref()
.or(Some(&current.tags))
.unwrap_or(&vec![])
.clone();
// 更新条目
let updated = sqlx::query_as::<_, KnowledgeItem>(
"UPDATE knowledge_items SET \
title = $1, content = $2, keywords = $3, related_questions = $4, \
priority = $5, tags = $6, status = COALESCE($7, status), \
version = version + 1, updated_at = NOW() \
WHERE id = $8 RETURNING *"
)
.bind(title)
.bind(content)
.bind(&keywords)
.bind(&related_questions)
.bind(priority)
.bind(&tags)
.bind(req.status.as_deref())
.bind(item_id)
.fetch_one(pool)
.await?;
// 创建版本快照
let version_id = uuid::Uuid::new_v4().to_string();
sqlx::query(
"INSERT INTO knowledge_versions \
(id, item_id, version, title, content, keywords, related_questions, \
change_summary, created_by) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)"
)
.bind(&version_id)
.bind(item_id)
.bind(updated.version)
.bind(title)
.bind(content)
.bind(&keywords)
.bind(&related_questions)
.bind(req.change_summary.as_deref())
.bind(account_id)
.execute(pool)
.await?;
Ok(updated)
}
/// 删除条目(级联删除 chunks + versions
pub async fn delete_item(pool: &PgPool, item_id: &str) -> SaasResult<()> {
let result = sqlx::query("DELETE FROM knowledge_items WHERE id = $1")
.bind(item_id)
.execute(pool)
.await?;
if result.rows_affected() == 0 {
return Err(crate::error::SaasError::NotFound("知识条目不存在".into()));
}
Ok(())
}
// === 分块 ===
/// 将内容按 Markdown 标题 + 固定长度分块
pub fn chunk_content(content: &str, max_tokens: usize, overlap: usize) -> Vec<String> {
let mut chunks = Vec::new();
// 先按 Markdown 标题分段
let sections: Vec<&str> = content.split("\n# ").collect();
for section in sections {
// 简单估算 token中文约 1.5 字符/token
let estimated_tokens = section.len() / 2;
if estimated_tokens <= max_tokens {
if !section.trim().is_empty() {
chunks.push(section.trim().to_string());
}
} else {
// 超长段落按固定长度切分
let chars: Vec<char> = section.chars().collect();
let chunk_chars = max_tokens * 2; // 近似字符数
let overlap_chars = overlap * 2;
let mut pos = 0;
while pos < chars.len() {
let end = (pos + chunk_chars).min(chars.len());
let chunk: String = chars[pos..end].iter().collect();
if !chunk.trim().is_empty() {
chunks.push(chunk.trim().to_string());
}
pos = if end >= chars.len() { end } else { end.saturating_sub(overlap_chars) };
}
}
}
chunks
}
// === 搜索 ===
/// 语义搜索(向量 + 关键词混合)
pub async fn search(
pool: &PgPool,
query: &str,
category_id: Option<&str>,
limit: i64,
min_score: f64,
) -> SaasResult<Vec<SearchResult>> {
// 暂时使用关键词匹配(向量搜索需要 embedding 生成)
let pattern = format!("%{}%", query.replace('%', "\\%").replace('_', "\\_"));
let results = if let Some(cat_id) = category_id {
sqlx::query_as::<_, (String, String, String, String, String, Vec<String>)>(
"SELECT kc.id, kc.item_id, ki.title, kc.name as cat_name, kc.content, kc.keywords \
FROM knowledge_chunks kc \
JOIN knowledge_items ki ON kc.item_id = ki.id \
JOIN knowledge_categories kc2 ON ki.category_id = kc2.id \
WHERE ki.status = 'active' \
AND ki.category_id = $1 \
AND (kc.content ILIKE $2 OR $3 = ANY(kc.keywords)) \
ORDER BY ki.priority DESC \
LIMIT $4"
)
.bind(cat_id)
.bind(&pattern)
.bind(query)
.bind(limit)
.fetch_all(pool)
.await?
} else {
sqlx::query_as::<_, (String, String, String, String, String, Vec<String>)>(
"SELECT kc.id, kc.item_id, ki.title, kc2.name as cat_name, kc.content, kc.keywords \
FROM knowledge_chunks kc \
JOIN knowledge_items ki ON kc.item_id = ki.id \
JOIN knowledge_categories kc2 ON ki.category_id = kc2.id \
WHERE ki.status = 'active' \
AND (kc.content ILIKE $1 OR $2 = ANY(kc.keywords)) \
ORDER BY ki.priority DESC \
LIMIT $3"
)
.bind(&pattern)
.bind(query)
.bind(limit)
.fetch_all(pool)
.await?
};
Ok(results.into_iter().map(|(chunk_id, item_id, title, cat_name, content, keywords)| {
SearchResult {
chunk_id,
item_id,
item_title: title,
category_name: cat_name,
content,
score: 0.8, // 关键词匹配默认分数
keywords,
}
}).filter(|r| r.score >= min_score).collect())
}
// === 分析 ===
/// 分析总览
pub async fn analytics_overview(pool: &PgPool) -> SaasResult<AnalyticsOverview> {
let total_items: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM knowledge_items"
)
.fetch_one(pool)
.await?;
let active_items: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM knowledge_items WHERE status = 'active'"
)
.fetch_one(pool)
.await?;
let total_categories: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM knowledge_categories"
)
.fetch_one(pool)
.await?;
let weekly_new: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM knowledge_items WHERE created_at >= NOW() - interval '7 days'"
)
.fetch_one(pool)
.await?;
let total_refs: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM knowledge_usage"
)
.fetch_one(pool)
.await?;
let injected: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM knowledge_usage WHERE was_injected = true"
)
.fetch_one(pool)
.await?;
let positive: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM knowledge_usage WHERE agent_feedback = 'positive'"
)
.fetch_one(pool)
.await?;
let stale: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM knowledge_items ki \
WHERE ki.status = 'active' \
AND NOT EXISTS (SELECT 1 FROM knowledge_usage ku WHERE ku.item_id = ki.id AND ku.created_at >= NOW() - interval '90 days')"
)
.fetch_one(pool)
.await?;
let hit_rate = if total_refs.0 > 0 { 1.0 } else { 0.0 };
let injection_rate = if total_refs.0 > 0 { injected.0 as f64 / total_refs.0 as f64 } else { 0.0 };
let positive_rate = if total_refs.0 > 0 { positive.0 as f64 / total_refs.0 as f64 } else { 0.0 };
Ok(AnalyticsOverview {
total_items: total_items.0,
active_items: active_items.0,
total_categories: total_categories.0,
weekly_new_items: weekly_new.0,
total_references: total_refs.0,
avg_reference_per_item: if total_items.0 > 0 { total_refs.0 as f64 / total_items.0 as f64 } else { 0.0 },
hit_rate,
injection_rate,
positive_feedback_rate: positive_rate,
stale_items_count: stale.0,
})
}