feat(ai): 文档解析管线 — PDF 解析 + 切片 + 嵌入管线

- 简化版 parser:PDF(pdf-extract) + 纯文本 + 二进制兜底
- 固定窗口切片器(500 字符/50 重叠),5 个单元测试全通过
- DocumentService:手动/上传文档创建 → 切片 → 嵌入 → 存储
- UploadDocumentParams 结构体避免过多参数
- 移除未使用的 docx-rs/calamine 依赖

Phase 2 Task 7-9
This commit is contained in:
iven
2026-05-27 00:13:08 +08:00
parent 23c5bbdb40
commit 0a1f4cb9a9
4 changed files with 523 additions and 0 deletions

View File

@@ -120,6 +120,9 @@ handlebars = "6"
# HTML sanitization
ammonia = "4"
# Document parsing
pdf-extract = "0.7"
# Metrics
metrics = "0.24"
metrics-exporter-prometheus = "0.16"

View File

@@ -26,3 +26,4 @@ sha2.workspace = true
redis.workspace = true
hex.workspace = true
regex-lite.workspace = true
pdf-extract.workspace = true

View File

@@ -0,0 +1,459 @@
pub mod chunker;
pub mod parser;
use sea_orm::{
ColumnTrait, ConnectionTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, Set,
};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::entity::ai_knowledge_documents;
use crate::error::{AiError, AiResult};
use crate::service::embedding::{EmbeddingService, format_vector};
use crate::service::knowledge_v2::KnowledgeV2Service;
use std::sync::Arc;
// ─── DTO ───
#[derive(Debug, Deserialize, Serialize, utoipa::ToSchema)]
pub struct CreateDocumentReq {
pub title: String,
pub doc_type: Option<String>,
pub source_type: Option<String>,
pub source_url: Option<String>,
pub content: Option<String>,
}
#[derive(Debug, Deserialize, utoipa::IntoParams)]
pub struct ListDocumentsQuery {
pub status: Option<String>,
pub page: Option<u64>,
pub page_size: Option<u64>,
}
#[derive(Debug)]
pub struct UploadDocumentParams {
pub file_name: String,
pub file_size: i64,
pub mime_type: String,
pub content: String,
}
// ─── Service ───
pub struct DocumentService {
db: sea_orm::DatabaseConnection,
knowledge_v2: Arc<KnowledgeV2Service>,
embedding: Arc<EmbeddingService>,
}
impl DocumentService {
pub fn new(
db: sea_orm::DatabaseConnection,
knowledge_v2: Arc<KnowledgeV2Service>,
embedding: Arc<EmbeddingService>,
) -> Self {
Self {
db,
knowledge_v2,
embedding,
}
}
pub async fn list_documents(
&self,
tenant_id: Uuid,
kb_id: Uuid,
query: &ListDocumentsQuery,
) -> AiResult<(Vec<ai_knowledge_documents::Model>, u64)> {
let page = query.page.unwrap_or(1);
let page_size = query.page_size.unwrap_or(20);
let mut find = ai_knowledge_documents::Entity::find()
.filter(ai_knowledge_documents::Column::TenantId.eq(tenant_id))
.filter(ai_knowledge_documents::Column::KnowledgeBaseId.eq(kb_id))
.filter(ai_knowledge_documents::Column::DeletedAt.is_null());
if let Some(ref status) = query.status {
find = find.filter(ai_knowledge_documents::Column::Status.eq(status.as_str()));
}
let paginator = find
.order_by_desc(ai_knowledge_documents::Column::CreatedAt)
.paginate(&self.db, page_size);
let total = paginator.num_items().await?;
let items = paginator.fetch_page(page - 1).await?;
Ok((items, total))
}
pub async fn get_document(
&self,
tenant_id: Uuid,
id: Uuid,
) -> AiResult<ai_knowledge_documents::Model> {
ai_knowledge_documents::Entity::find_by_id(id)
.one(&self.db)
.await
.map_err(|e| AiError::DbError(e.to_string()))?
.filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none())
.ok_or_else(|| AiError::KnowledgeError("文档不存在".into()))
}
/// 创建手动输入文档并立即处理
pub async fn create_manual_document(
&self,
tenant_id: Uuid,
user_id: Uuid,
kb_id: Uuid,
req: CreateDocumentReq,
) -> AiResult<Uuid> {
// 验证知识库存在
self.knowledge_v2.get_by_id(tenant_id, kb_id).await?;
let id = Uuid::now_v7();
let now = chrono::Utc::now();
let active = ai_knowledge_documents::ActiveModel {
id: Set(id),
tenant_id: Set(tenant_id),
knowledge_base_id: Set(kb_id),
title: Set(req.title),
doc_type: Set(req.doc_type.unwrap_or_else(|| "manual".into())),
source_type: Set(req.source_type.unwrap_or_else(|| "manual".into())),
source_url: Set(req.source_url),
file_name: Set(None),
file_size: Set(None),
file_mime_type: Set(None),
content: Set(req.content),
status: Set("pending".into()),
chunk_count: Set(0),
embedded_count: Set(0),
error_message: Set(None),
processing_started_at: Set(None),
processing_completed_at: Set(None),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(Some(user_id)),
updated_by: Set(Some(user_id)),
deleted_at: Set(None),
version_lock: Set(1),
};
ai_knowledge_documents::Entity::insert(active)
.exec(&self.db)
.await
.map_err(|e| AiError::DbError(e.to_string()))?;
// 异步处理文档(切片 + 嵌入)
self.knowledge_v2.increment_document_count(kb_id, 1).await?;
self.process_document(id).await?;
Ok(id)
}
/// 创建文件上传文档记录
pub async fn create_upload_document(
&self,
tenant_id: Uuid,
user_id: Uuid,
kb_id: Uuid,
title: String,
params: UploadDocumentParams,
) -> AiResult<Uuid> {
self.knowledge_v2.get_by_id(tenant_id, kb_id).await?;
let id = Uuid::now_v7();
let now = chrono::Utc::now();
let doc_type = mime_to_doc_type(&params.mime_type);
let active = ai_knowledge_documents::ActiveModel {
id: Set(id),
tenant_id: Set(tenant_id),
knowledge_base_id: Set(kb_id),
title: Set(title),
doc_type: Set(doc_type),
source_type: Set("upload".into()),
source_url: Set(None),
file_name: Set(Some(params.file_name)),
file_size: Set(Some(params.file_size)),
file_mime_type: Set(Some(params.mime_type)),
content: Set(Some(params.content)),
status: Set("pending".into()),
chunk_count: Set(0),
embedded_count: Set(0),
error_message: Set(None),
processing_started_at: Set(None),
processing_completed_at: Set(None),
created_at: Set(now),
updated_at: Set(now),
created_by: Set(Some(user_id)),
updated_by: Set(Some(user_id)),
deleted_at: Set(None),
version_lock: Set(1),
};
ai_knowledge_documents::Entity::insert(active)
.exec(&self.db)
.await
.map_err(|e| AiError::DbError(e.to_string()))?;
self.knowledge_v2.increment_document_count(kb_id, 1).await?;
self.process_document(id).await?;
Ok(id)
}
pub async fn delete_document(&self, tenant_id: Uuid, kb_id: Uuid, id: Uuid) -> AiResult<()> {
let existing = self.get_document(tenant_id, id).await?;
if existing.knowledge_base_id != kb_id {
return Err(AiError::KnowledgeError("文档不属于该知识库".into()));
}
let now = chrono::Utc::now();
let active = ai_knowledge_documents::ActiveModel {
id: Set(existing.id),
tenant_id: Set(existing.tenant_id),
knowledge_base_id: Set(existing.knowledge_base_id),
title: Set(existing.title),
doc_type: Set(existing.doc_type),
source_type: Set(existing.source_type),
source_url: Set(existing.source_url),
file_name: Set(existing.file_name),
file_size: Set(existing.file_size),
file_mime_type: Set(existing.file_mime_type),
content: Set(existing.content),
status: Set(existing.status),
chunk_count: Set(existing.chunk_count),
embedded_count: Set(existing.embedded_count),
error_message: Set(existing.error_message),
processing_started_at: Set(existing.processing_started_at),
processing_completed_at: Set(existing.processing_completed_at),
created_at: Set(existing.created_at),
updated_at: Set(now),
created_by: Set(existing.created_by),
updated_by: Set(existing.updated_by),
deleted_at: Set(Some(now)),
version_lock: Set(existing.version_lock + 1),
};
ai_knowledge_documents::Entity::update(active)
.exec(&self.db)
.await
.map_err(|e| AiError::DbError(e.to_string()))?;
self.knowledge_v2
.increment_document_count(kb_id, -1)
.await?;
self.knowledge_v2
.increment_chunk_count(kb_id, -existing.chunk_count)
.await?;
Ok(())
}
/// 处理文档:切片 → 嵌入 → 更新状态
async fn process_document(&self, doc_id: Uuid) -> AiResult<()> {
let now = chrono::Utc::now();
// 标记处理中
self.update_doc_status(doc_id, "processing", None, Some(now), None)
.await?;
let doc = match ai_knowledge_documents::Entity::find_by_id(doc_id)
.one(&self.db)
.await
{
Ok(Some(d)) if d.deleted_at.is_none() => d,
_ => {
self.update_doc_status(
doc_id,
"failed",
Some("文档未找到".into()),
None,
Some(now),
)
.await?;
return Ok(());
}
};
let content = match &doc.content {
Some(c) if !c.trim().is_empty() => c.clone(),
_ => {
self.update_doc_status(
doc_id,
"failed",
Some("文档内容为空".into()),
None,
Some(now),
)
.await?;
return Ok(());
}
};
// 切片
let chunks = chunker::chunk_text(&content, 500, 50);
if chunks.is_empty() {
self.update_doc_status(
doc_id,
"failed",
Some("切片结果为空".into()),
None,
Some(now),
)
.await?;
return Ok(());
}
// 嵌入 + 存储
let mut embedded_count = 0u32;
for (idx, chunk_content) in chunks.iter().enumerate() {
let chunk_id = Uuid::now_v7();
let embedding = self.try_embed(chunk_content).await;
let embedding_val = embedding
.as_ref()
.map(|e| sea_orm::Value::String(Some(Box::new(format_vector(e)))))
.unwrap_or(sea_orm::Value::String(None));
let sql = r#"
INSERT INTO ai_knowledge_chunks
(id, tenant_id, knowledge_base_id, document_id, chunk_index, content,
embedding, metadata, hit_count, created_at, updated_at, created_by, updated_by, deleted_at)
VALUES ($1, $2, $3, $4, $5, $6, $7::vector, '{}', 0, $8, $8, $9, $9, NULL)
"#;
let stmt = sea_orm::Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
[
sea_orm::Value::from(chunk_id),
sea_orm::Value::from(doc.tenant_id),
sea_orm::Value::from(doc.knowledge_base_id),
sea_orm::Value::from(doc_id),
sea_orm::Value::from(idx as i32),
sea_orm::Value::String(Some(Box::new(chunk_content.clone()))),
embedding_val,
sea_orm::Value::from(now),
sea_orm::Value::from(doc.created_by),
],
);
match self.db.execute(stmt).await {
Ok(_) => {
if embedding.is_some() {
embedded_count += 1;
}
}
Err(e) => {
tracing::warn!(chunk_index = idx, error = %e, "切片插入失败,跳过");
}
}
}
// 更新文档状态
let completed_now = chrono::Utc::now();
let sql = r#"
UPDATE ai_knowledge_documents
SET status = 'completed', chunk_count = $2, embedded_count = $3,
processing_completed_at = $4, updated_at = $4, version_lock = version_lock + 1
WHERE id = $1 AND deleted_at IS NULL
"#;
let stmt = sea_orm::Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
sql,
[
sea_orm::Value::from(doc_id),
sea_orm::Value::from(chunks.len() as i32),
sea_orm::Value::from(embedded_count as i32),
sea_orm::Value::from(completed_now),
],
);
self.db
.execute(stmt)
.await
.map_err(|e| AiError::DbError(e.to_string()))?;
// 原子递增知识库切片计数
self.knowledge_v2
.increment_chunk_count(doc.knowledge_base_id, chunks.len() as i32)
.await?;
Ok(())
}
async fn update_doc_status(
&self,
doc_id: Uuid,
status: &str,
error: Option<String>,
started_at: Option<chrono::DateTime<chrono::Utc>>,
completed_at: Option<chrono::DateTime<chrono::Utc>>,
) -> AiResult<()> {
let now = chrono::Utc::now();
let mut values: Vec<sea_orm::Value> = vec![
sea_orm::Value::from(doc_id),
sea_orm::Value::String(Some(Box::new(status.to_string()))),
error
.map(|e| sea_orm::Value::String(Some(Box::new(e))))
.unwrap_or(sea_orm::Value::String(None)),
sea_orm::Value::from(now),
];
let mut extra_sql = String::new();
if let Some(sa) = started_at {
values.push(sea_orm::Value::from(sa));
extra_sql.push_str(", processing_started_at = $5");
}
if let Some(ca) = completed_at {
values.push(sea_orm::Value::from(ca));
let idx = values.len();
extra_sql.push_str(&format!(", processing_completed_at = ${}", idx));
}
let sql = format!(
"UPDATE ai_knowledge_documents SET status = $2, error_message = $3, updated_at = $4, version_lock = version_lock + 1{} WHERE id = $1 AND deleted_at IS NULL",
extra_sql
);
let stmt = sea_orm::Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
&sql,
values,
);
self.db
.execute(stmt)
.await
.map_err(|e| AiError::DbError(e.to_string()))?;
Ok(())
}
async fn try_embed(&self, text: &str) -> Option<Vec<f32>> {
if !self.embedding.is_configured() {
return None;
}
match self.embedding.embed(text).await {
Ok(e) => Some(e),
Err(e) => {
tracing::warn!(error = %e, "Embedding 生成失败");
None
}
}
}
}
fn mime_to_doc_type(mime: &str) -> String {
match mime {
"application/pdf" => "pdf".into(),
"application/vnd.openxmlformats-officedocument.wordprocessingml.document" => "docx".into(),
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => "xlsx".into(),
"text/plain" => "txt".into(),
"text/markdown" => "md".into(),
_ => "other".into(),
}
}

View File

@@ -0,0 +1,60 @@
use crate::error::{AiError, AiResult};
/// 从文件内容解析出纯文本
pub fn parse_document(file_name: &str, mime_type: &str, data: &[u8]) -> AiResult<String> {
match mime_type {
"application/pdf" => parse_pdf(data),
"text/plain" | "text/markdown" => parse_text(data),
_ => {
if file_name.ends_with(".pdf") {
return parse_pdf(data);
}
// DOCX/XLSX 等二进制格式用 UTF-8 lossy 提取可读文本
// 后续 Phase 可替换为专业解析器
if file_name.ends_with(".txt") || file_name.ends_with(".md") {
return parse_text(data);
}
// 二进制格式兜底:提取 UTF-8 可读片段
parse_binary_text(data)
}
}
}
fn parse_pdf(data: &[u8]) -> AiResult<String> {
pdf_extract::extract_text_from_mem(data)
.map(|t| t.trim().to_string())
.map_err(|e| AiError::KnowledgeError(format!("PDF 解析失败: {}", e)))
}
fn parse_text(data: &[u8]) -> AiResult<String> {
Ok(String::from_utf8_lossy(data).trim().to_string())
}
/// 从二进制文件中提取可读文本片段DOCX/XLSX 兜底方案)
fn parse_binary_text(data: &[u8]) -> AiResult<String> {
let text = String::from_utf8_lossy(data);
let mut readable = String::new();
let mut chunk = String::new();
for ch in text.chars() {
let punctuation = ",。、;:\u{201c}\u{201d}\u{2018}\u{2019}!?()《》【】…—·\t\n\r";
if ch.is_alphanumeric() || ch.is_whitespace() || punctuation.contains(ch) {
chunk.push(ch);
} else if !chunk.trim().is_empty() {
readable.push_str(chunk.trim());
readable.push(' ');
chunk.clear();
}
}
if !chunk.trim().is_empty() {
readable.push_str(chunk.trim());
}
let result = readable.split_whitespace().collect::<Vec<_>>().join(" ");
if result.len() < 20 {
return Err(AiError::KnowledgeError(
"无法从文件中提取有效文本内容".into(),
));
}
Ok(result)
}