Files
zclaw_openfang/crates/zclaw-saas/src/knowledge/handlers.rs
iven d40c4605b2 fix(knowledge): verification audit — 3 medium issues
- create_item: wrap item + version INSERT in transaction for atomicity
- update_item handler: validate content length (100KB) before DB hit
- KnowledgeChunk: document missing embedding field, safe per explicit SELECT usage
2026-04-02 19:16:32 +08:00

592 lines
19 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! 知识库 HTTP 处理器
use axum::{
extract::{Extension, Path, Query, State},
Json,
};
use crate::auth::types::AuthContext;
use crate::error::{SaasError, SaasResult};
use crate::state::AppState;
use super::service;
use super::types::*;
// === 分类管理 ===
/// GET /api/v1/knowledge/categories — 树形分类列表
pub async fn list_categories(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
) -> SaasResult<Json<Vec<CategoryResponse>>> {
check_permission(&ctx, "knowledge:read")?;
let tree = service::list_categories_tree(&state.db).await?;
Ok(Json(tree))
}
/// POST /api/v1/knowledge/categories — 创建分类
pub async fn create_category(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Json(req): Json<CreateCategoryRequest>,
) -> SaasResult<Json<serde_json::Value>> {
check_permission(&ctx, "knowledge:write")?;
if req.name.trim().is_empty() {
return Err(SaasError::InvalidInput("分类名称不能为空".into()));
}
let cat = service::create_category(
&state.db,
req.name.trim(),
req.description.as_deref(),
req.parent_id.as_deref(),
req.icon.as_deref(),
).await?;
Ok(Json(serde_json::json!({
"id": cat.id,
"name": cat.name,
})))
}
/// PUT /api/v1/knowledge/categories/:id — 更新分类
pub async fn update_category(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Path(id): Path<String>,
Json(req): Json<UpdateCategoryRequest>,
) -> SaasResult<Json<serde_json::Value>> {
check_permission(&ctx, "knowledge:write")?;
if let Some(ref name) = req.name {
if name.trim().is_empty() {
return Err(SaasError::InvalidInput("分类名称不能为空".into()));
}
}
let cat = service::update_category(
&state.db,
&id,
req.name.as_deref().map(|n| n.trim()),
req.description.as_deref(),
req.parent_id.as_deref(),
req.icon.as_deref(),
).await?;
Ok(Json(serde_json::json!({
"id": cat.id,
"name": cat.name,
"updated": true,
})))
}
/// DELETE /api/v1/knowledge/categories/:id — 删除分类
pub async fn delete_category(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Path(id): Path<String>,
) -> SaasResult<Json<serde_json::Value>> {
check_permission(&ctx, "knowledge:admin")?;
service::delete_category(&state.db, &id).await?;
Ok(Json(serde_json::json!({"deleted": true})))
}
/// GET /api/v1/knowledge/categories/:id/items — 分类下条目列表
pub async fn list_category_items(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Path(id): Path<String>,
Query(query): Query<ListItemsQuery>,
) -> SaasResult<Json<serde_json::Value>> {
check_permission(&ctx, "knowledge:read")?;
let page = query.page.unwrap_or(1).max(1);
let page_size = query.page_size.unwrap_or(20).max(1).min(100);
let status_filter = query.status.as_deref().unwrap_or("active");
let (items, total) = service::list_items_by_category(
&state.db,
&id,
status_filter,
page,
page_size,
).await?;
Ok(Json(serde_json::json!({
"items": items,
"total": total,
"page": page,
"page_size": page_size,
})))
}
// === 知识条目 CRUD ===
/// GET /api/v1/knowledge/items — 分页列表
pub async fn list_items(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Query(query): Query<ListItemsQuery>,
) -> SaasResult<Json<serde_json::Value>> {
check_permission(&ctx, "knowledge:read")?;
let page = query.page.unwrap_or(1).max(1).min(10000);
let page_size = query.page_size.unwrap_or(20).max(1).min(100);
let offset = (page - 1) * page_size;
// 转义 ILIKE 通配符,防止用户输入的 % 和 _ 被当作通配符
let keyword = query.keyword.as_ref().map(|k| {
k.replace('\\', "\\\\").replace('%', "\\%").replace('_', "\\_")
});
let items: Vec<KnowledgeItem> = sqlx::query_as(
"SELECT ki.* FROM knowledge_items ki \
JOIN knowledge_categories kc ON ki.category_id = kc.id \
WHERE ($1::text IS NULL OR ki.category_id = $1) \
AND ($2::text IS NULL OR ki.status = $2) \
AND ($3::text IS NULL OR ki.title ILIKE '%' || $3 || '%') \
ORDER BY ki.priority DESC, ki.updated_at DESC \
LIMIT $4 OFFSET $5"
)
.bind(&query.category_id)
.bind(&query.status)
.bind(&keyword)
.bind(page_size)
.bind(offset)
.fetch_all(&state.db)
.await?;
let total: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM knowledge_items ki \
WHERE ($1::text IS NULL OR ki.category_id = $1) \
AND ($2::text IS NULL OR ki.status = $2) \
AND ($3::text IS NULL OR ki.title ILIKE '%' || $3 || '%')"
)
.bind(&query.category_id)
.bind(&query.status)
.bind(&keyword)
.fetch_one(&state.db)
.await?;
Ok(Json(serde_json::json!({
"items": items,
"total": total.0,
"page": page,
"page_size": page_size,
})))
}
/// POST /api/v1/knowledge/items — 创建条目
pub async fn create_item(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Json(req): Json<CreateItemRequest>,
) -> SaasResult<Json<serde_json::Value>> {
check_permission(&ctx, "knowledge:write")?;
if req.title.trim().is_empty() || req.content.trim().is_empty() {
return Err(SaasError::InvalidInput("标题和内容不能为空".into()));
}
if req.content.len() > 100_000 {
return Err(SaasError::InvalidInput("内容不能超过 100KB".into()));
}
let item = service::create_item(&state.db, &ctx.account_id, &req).await?;
// 异步触发 embedding 生成
if let Err(e) = state.worker_dispatcher.dispatch(
"generate_embedding",
serde_json::json!({ "item_id": item.id }),
).await {
tracing::warn!("Failed to dispatch embedding generation: {}", e);
}
Ok(Json(serde_json::json!({
"id": item.id,
"title": item.title,
"version": item.version,
})))
}
/// POST /api/v1/knowledge/items/batch — 批量创建
pub async fn batch_create_items(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Json(items): Json<Vec<CreateItemRequest>>,
) -> SaasResult<Json<serde_json::Value>> {
check_permission(&ctx, "knowledge:write")?;
if items.len() > 50 {
return Err(SaasError::InvalidInput("单次批量创建不能超过 50 条".into()));
}
let mut created = Vec::new();
for req in &items {
if req.title.trim().is_empty() || req.content.trim().is_empty() {
tracing::warn!("Batch create: skipping item with empty title or content");
continue;
}
if req.content.len() > 100_000 {
tracing::warn!("Batch create: skipping item '{}' (content too long)", req.title);
continue;
}
match service::create_item(&state.db, &ctx.account_id, req).await {
Ok(item) => {
let _ = state.worker_dispatcher.dispatch(
"generate_embedding",
serde_json::json!({ "item_id": item.id }),
).await.map_err(|e| {
tracing::warn!("[Knowledge] Failed to dispatch embedding for item {}: {}", item.id, e);
e
});
created.push(item.id);
}
Err(e) => {
tracing::warn!("Batch create item failed: {}", e);
}
}
}
Ok(Json(serde_json::json!({
"created_count": created.len(),
"ids": created,
})))
}
/// GET /api/v1/knowledge/items/:id — 条目详情
pub async fn get_item(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Path(id): Path<String>,
) -> SaasResult<Json<serde_json::Value>> {
check_permission(&ctx, "knowledge:read")?;
let item = service::get_item(&state.db, &id).await?
.ok_or_else(|| SaasError::NotFound("知识条目不存在".into()))?;
Ok(Json(serde_json::to_value(item).unwrap_or_default()))
}
/// PUT /api/v1/knowledge/items/:id — 更新条目
pub async fn update_item(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Path(id): Path<String>,
Json(req): Json<UpdateItemRequest>,
) -> SaasResult<Json<serde_json::Value>> {
check_permission(&ctx, "knowledge:write")?;
if let Some(ref content) = req.content {
if content.len() > 100_000 {
return Err(SaasError::InvalidInput("内容不能超过 100KB".into()));
}
}
let updated = service::update_item(&state.db, &id, &ctx.account_id, &req).await?;
// 触发 re-embedding
if let Err(e) = state.worker_dispatcher.dispatch(
"generate_embedding",
serde_json::json!({ "item_id": id }),
).await {
tracing::warn!("[Knowledge] Failed to dispatch re-embedding for item {}: {}", id, e);
}
Ok(Json(serde_json::json!({
"id": updated.id,
"version": updated.version,
})))
}
/// DELETE /api/v1/knowledge/items/:id — 删除条目
pub async fn delete_item(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Path(id): Path<String>,
) -> SaasResult<Json<serde_json::Value>> {
check_permission(&ctx, "knowledge:admin")?;
service::delete_item(&state.db, &id).await?;
Ok(Json(serde_json::json!({"deleted": true})))
}
// === 版本控制 ===
/// GET /api/v1/knowledge/items/:id/versions
pub async fn list_versions(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Path(id): Path<String>,
) -> SaasResult<Json<serde_json::Value>> {
check_permission(&ctx, "knowledge:read")?;
let versions: Vec<KnowledgeVersion> = sqlx::query_as(
"SELECT * FROM knowledge_versions WHERE item_id = $1 ORDER BY version DESC"
)
.bind(&id)
.fetch_all(&state.db)
.await?;
Ok(Json(serde_json::json!({"versions": versions})))
}
/// GET /api/v1/knowledge/items/:id/versions/:v
pub async fn get_version(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Path((id, v)): Path<(String, i32)>,
) -> SaasResult<Json<serde_json::Value>> {
check_permission(&ctx, "knowledge:read")?;
let version: KnowledgeVersion = sqlx::query_as(
"SELECT * FROM knowledge_versions WHERE item_id = $1 AND version = $2"
)
.bind(&id)
.bind(v)
.fetch_optional(&state.db)
.await?
.ok_or_else(|| SaasError::NotFound("版本不存在".into()))?;
Ok(Json(serde_json::to_value(version).unwrap_or_default()))
}
/// POST /api/v1/knowledge/items/:id/rollback/:v
pub async fn rollback_version(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Path((id, v)): Path<(String, i32)>,
) -> SaasResult<Json<serde_json::Value>> {
check_permission(&ctx, "knowledge:admin")?;
let updated = service::rollback_version(&state.db, &id, v, &ctx.account_id).await?;
// 触发 re-embedding
if let Err(e) = state.worker_dispatcher.dispatch(
"generate_embedding",
serde_json::json!({ "item_id": id }),
).await {
tracing::warn!("[Knowledge] Failed to dispatch re-embedding after rollback for item {}: {}", id, e);
}
Ok(Json(serde_json::json!({
"id": updated.id,
"version": updated.version,
"rolled_back_to": v,
})))
}
// === 检索 ===
/// POST /api/v1/knowledge/search — 语义搜索
pub async fn search(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Json(req): Json<SearchRequest>,
) -> SaasResult<Json<Vec<SearchResult>>> {
check_permission(&ctx, "knowledge:search")?;
let limit = req.limit.unwrap_or(5).min(10);
let min_score = req.min_score.unwrap_or(0.5);
let results = service::search(
&state.db,
&req.query,
req.category_id.as_deref(),
limit,
min_score,
).await?;
Ok(Json(results))
}
/// POST /api/v1/knowledge/recommend — 关联推荐
pub async fn recommend(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Json(req): Json<SearchRequest>,
) -> SaasResult<Json<Vec<SearchResult>>> {
check_permission(&ctx, "knowledge:search")?;
let limit = req.limit.unwrap_or(5).min(10);
let results = service::search(
&state.db,
&req.query,
req.category_id.as_deref(),
limit,
0.3,
).await?;
Ok(Json(results))
}
// === 分析看板 ===
/// GET /api/v1/knowledge/analytics/overview
pub async fn analytics_overview(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
) -> SaasResult<Json<AnalyticsOverview>> {
check_permission(&ctx, "knowledge:read")?;
let overview = service::analytics_overview(&state.db).await?;
Ok(Json(overview))
}
/// GET /api/v1/knowledge/analytics/trends
pub async fn analytics_trends(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
) -> SaasResult<Json<serde_json::Value>> {
check_permission(&ctx, "knowledge:read")?;
// 使用 serde_json::Value 行来避免 PgRow 序列化
let trends: Vec<(serde_json::Value,)> = sqlx::query_as(
"SELECT json_build_object(
'date', DATE(created_at),
'count', COUNT(*),
'injected_count', SUM(CASE WHEN was_injected THEN 1 ELSE 0 END)
) as row \
FROM knowledge_usage \
WHERE created_at >= NOW() - interval '30 days' \
GROUP BY DATE(created_at) ORDER BY DATE(created_at)"
)
.fetch_all(&state.db)
.await
.unwrap_or_default();
let trends: Vec<serde_json::Value> = trends.into_iter().map(|(v,)| v).collect();
Ok(Json(serde_json::json!({"trends": trends})))
}
/// GET /api/v1/knowledge/analytics/top-items
pub async fn analytics_top_items(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
) -> SaasResult<Json<serde_json::Value>> {
check_permission(&ctx, "knowledge:read")?;
let items: Vec<(serde_json::Value,)> = sqlx::query_as(
"SELECT json_build_object(
'id', ki.id,
'title', ki.title,
'category', kc.name,
'ref_count', COUNT(ku.id)
) as row \
FROM knowledge_items ki \
JOIN knowledge_categories kc ON ki.category_id = kc.id \
LEFT JOIN knowledge_usage ku ON ku.item_id = ki.id \
WHERE ki.status = 'active' \
GROUP BY ki.id, ki.title, kc.name \
ORDER BY COUNT(ku.id) DESC LIMIT 20"
)
.fetch_all(&state.db)
.await
.unwrap_or_default();
let items: Vec<serde_json::Value> = items.into_iter().map(|(v,)| v).collect();
Ok(Json(serde_json::json!({"items": items})))
}
/// GET /api/v1/knowledge/analytics/quality
pub async fn analytics_quality(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
) -> SaasResult<Json<serde_json::Value>> {
check_permission(&ctx, "knowledge:read")?;
let quality = service::analytics_quality(&state.db).await?;
Ok(Json(quality))
}
/// GET /api/v1/knowledge/analytics/gaps
pub async fn analytics_gaps(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
) -> SaasResult<Json<serde_json::Value>> {
check_permission(&ctx, "knowledge:read")?;
let gaps = service::analytics_gaps(&state.db).await?;
Ok(Json(gaps))
}
// === 批量操作 ===
/// PATCH /api/v1/knowledge/categories/reorder — 批量排序
pub async fn reorder_categories(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Json(items): Json<Vec<ReorderItem>>,
) -> SaasResult<Json<serde_json::Value>> {
check_permission(&ctx, "knowledge:write")?;
if items.is_empty() {
return Ok(Json(serde_json::json!({"reordered": false, "count": 0})));
}
if items.len() > 100 {
return Err(SaasError::InvalidInput("单次排序不能超过 100 个".into()));
}
// 使用事务保证原子性
let mut tx = state.db.begin().await?;
for item in &items {
sqlx::query("UPDATE knowledge_categories SET sort_order = $1, updated_at = NOW() WHERE id = $2")
.bind(item.sort_order)
.bind(&item.id)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(Json(serde_json::json!({"reordered": true, "count": items.len()})))
}
/// POST /api/v1/knowledge/items/import — Markdown 文件导入
pub async fn import_items(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
Json(req): Json<ImportRequest>,
) -> SaasResult<Json<serde_json::Value>> {
check_permission(&ctx, "knowledge:write")?;
if req.files.len() > 20 {
return Err(SaasError::InvalidInput("单次导入不能超过 20 个文件".into()));
}
let mut created = Vec::new();
for file in &req.files {
// 内容长度检查(数据库限制 100KB
if file.content.len() > 100_000 {
tracing::warn!("跳过文件 '{}': 内容超长 ({} bytes)", file.title.as_deref().unwrap_or("未命名"), file.content.len());
continue;
}
// 空内容检查
if file.content.trim().is_empty() {
tracing::warn!("跳过空文件: '{}'", file.title.as_deref().unwrap_or("未命名"));
continue;
}
let title = file.title.clone().unwrap_or_else(|| {
file.content.lines().next()
.map(|l| l.trim_start_matches('#').trim().to_string())
.unwrap_or_else(|| format!("导入条目 {}", created.len() + 1))
});
let item_req = CreateItemRequest {
category_id: req.category_id.clone(),
title,
content: file.content.clone(),
keywords: file.keywords.clone(),
related_questions: None,
priority: None,
tags: file.tags.clone(),
};
match service::create_item(&state.db, &ctx.account_id, &item_req).await {
Ok(item) => {
let _ = state.worker_dispatcher.dispatch(
"generate_embedding",
serde_json::json!({ "item_id": item.id }),
).await.map_err(|e| {
tracing::warn!("[Knowledge] Failed to dispatch embedding for item {}: {}", item.id, e);
e
});
created.push(item.id);
}
Err(e) => {
tracing::warn!("Import item '{}' failed: {}", item_req.title, e);
}
}
}
Ok(Json(serde_json::json!({
"created_count": created.len(),
"ids": created,
})))
}
// === 辅助函数 ===
fn check_permission(ctx: &AuthContext, permission: &str) -> SaasResult<()> {
crate::auth::handlers::check_permission(ctx, permission)
}