feat(knowledge): Phase B+C 文档提取器 + multipart 文件上传
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled

- PDF 提取 (pdf-extract) + DOCX 提取 (zip+quick-xml) + Excel 解析 (calamine)
- 统一格式路由 detect_format() → RAG 通道或结构化通道
- POST /api/v1/knowledge/upload multipart 文件上传
- PDF/DOCX/Markdown → RAG 管线,Excel → structured_rows JSONB
- 结构化数据源 CRUD API (GET/DELETE /api/v1/structured/sources)
- POST /api/v1/structured/query JSONB 关键词查询
- 修复 industry/service.rs SaasError::Database 类型不匹配
This commit is contained in:
iven
2026-04-12 19:25:24 +08:00
parent 4800f89467
commit 60062a8097
7 changed files with 849 additions and 8 deletions

View File

@@ -1,7 +1,7 @@
//! 知识库 HTTP 处理器
use axum::{
extract::{Extension, Path, Query, State},
extract::{Extension, Multipart, Path, Query, State},
Json,
};
@@ -10,6 +10,7 @@ use crate::error::{SaasError, SaasResult};
use crate::state::AppState;
use super::service;
use super::types::*;
use super::extractors;
// === 分类管理 ===
@@ -685,3 +686,202 @@ pub async fn query_structured(
let results = service::query_structured(&state.db, &req, Some(&ctx.account_id)).await?;
Ok(Json(results))
}
// === 文件上传 ===
/// POST /api/v1/knowledge/upload — multipart 文件上传
///
/// 支持 PDF/DOCX → RAG 管线Excel → 结构化管线
pub async fn upload_file(
State(state): State<AppState>,
Extension(ctx): Extension<AuthContext>,
mut multipart: Multipart,
) -> SaasResult<Json<serde_json::Value>> {
check_permission(&ctx, "knowledge:write")?;
let is_admin = ctx.role == "admin" || ctx.role == "super_admin";
let mut results = Vec::new();
while let Some(field) = multipart.next_field().await.map_err(|e| {
SaasError::InvalidInput(format!("文件上传解析失败: {}", e))
})? {
let file_name = field.file_name().unwrap_or("unknown").to_string();
let data = field.bytes().await.map_err(|e| {
SaasError::InvalidInput(format!("文件读取失败: {}", e))
})?;
// 大小限制 20MB
if data.len() > 20 * 1024 * 1024 {
results.push(serde_json::json!({
"file": file_name,
"status": "error",
"error": "文件超过 20MB 限制"
}));
continue;
}
let format = match extractors::detect_format(&file_name) {
Some(f) => f,
None => {
results.push(serde_json::json!({
"file": file_name,
"status": "error",
"error": "不支持的文件格式"
}));
continue;
}
};
if format.is_structured() {
// Excel → 结构化通道
match handle_structured_upload(
&state, &ctx, is_admin, &data, &file_name,
).await {
Ok(result) => results.push(result),
Err(e) => results.push(serde_json::json!({
"file": file_name,
"status": "error",
"error": e.to_string()
})),
}
} else {
// PDF/DOCX/MD → 文档通道 (RAG)
match handle_document_upload(
&state, &ctx, is_admin, &data, &file_name, format,
).await {
Ok(result) => results.push(result),
Err(e) => results.push(serde_json::json!({
"file": file_name,
"status": "error",
"error": e.to_string()
})),
}
}
}
Ok(Json(serde_json::json!({
"results": results,
"count": results.len(),
})))
}
/// 处理文档类上传PDF/DOCX/MD → RAG 管线)
async fn handle_document_upload(
state: &AppState,
ctx: &AuthContext,
is_admin: bool,
data: &[u8],
file_name: &str,
format: extractors::DocumentFormat,
) -> SaasResult<serde_json::Value> {
let doc = match format {
extractors::DocumentFormat::Pdf => extractors::extract_pdf(data, file_name)?,
extractors::DocumentFormat::Docx => extractors::extract_docx(data, file_name)?,
extractors::DocumentFormat::Markdown => {
// Markdown 直通
let text = String::from_utf8_lossy(data).to_string();
let title = file_name.trim_end_matches(".md").trim_end_matches(".txt").to_string();
extractors::NormalizedDocument {
title,
sections: vec![extractors::DocumentSection {
heading: None,
content: text,
level: 1,
page_number: None,
}],
metadata: extractors::DocumentMetadata {
source_format: "markdown".to_string(),
file_name: file_name.to_string(),
total_pages: None,
total_sections: 1,
},
}
}
_ => return Err(SaasError::InvalidInput("不支持的文档格式".into())),
};
// 转为 Markdown 内容
let content = extractors::normalized_to_markdown(&doc);
if content.is_empty() {
return Err(SaasError::InvalidInput("文件内容为空".into()));
}
// 创建知识条目
let item_req = CreateItemRequest {
category_id: "uploaded".to_string(), // TODO: 从上传参数获取
title: doc.title.clone(),
content,
keywords: None,
related_questions: None,
priority: Some(5),
tags: Some(vec![format!("source:{}", doc.metadata.source_format)]),
visibility: None,
};
let item = service::create_item(&state.db, &ctx.account_id, &item_req, is_admin).await?;
// 触发分块
if let Err(e) = state.worker_dispatcher.dispatch(
"generate_embedding",
serde_json::json!({ "item_id": item.id }),
).await {
tracing::warn!("Upload: failed to dispatch embedding for {}: {}", item.id, e);
}
Ok(serde_json::json!({
"file": file_name,
"status": "ok",
"item_id": item.id,
"sections": doc.metadata.total_sections,
"format": doc.metadata.source_format,
}))
}
/// 处理结构化数据上传Excel → structured_rows
async fn handle_structured_upload(
state: &AppState,
ctx: &AuthContext,
is_admin: bool,
data: &[u8],
file_name: &str,
) -> SaasResult<serde_json::Value> {
let processed = extractors::extract_excel(data, file_name)?;
match processed {
extractors::ProcessedFile::Structured { title, sheet_names, column_headers, rows } => {
if rows.is_empty() {
return Err(SaasError::InvalidInput("Excel 文件没有数据行".into()));
}
// 创建结构化数据源
let source_req = CreateStructuredSourceRequest {
title,
description: None,
original_file_name: Some(file_name.to_string()),
sheet_names: Some(sheet_names.clone()),
column_headers: Some(column_headers.clone()),
visibility: None,
industry_id: None,
};
let source = service::create_structured_source(
&state.db, &ctx.account_id, is_admin, &source_req,
).await?;
// 批量写入行数据
let count = service::insert_structured_rows(
&state.db, &source.id, &rows,
).await?;
Ok(serde_json::json!({
"file": file_name,
"status": "ok",
"source_id": source.id,
"sheets": sheet_names,
"rows_imported": count,
"columns": column_headers.len(),
}))
}
_ => Err(SaasError::InvalidInput("意外的处理结果".into())),
}
}