diff --git a/Cargo.toml b/Cargo.toml index 415ded2..ad4f957 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,6 +120,9 @@ handlebars = "6" # HTML sanitization ammonia = "4" +# Document parsing +pdf-extract = "0.7" + # Metrics metrics = "0.24" metrics-exporter-prometheus = "0.16" diff --git a/crates/erp-ai/Cargo.toml b/crates/erp-ai/Cargo.toml index 0648253..92c83c6 100644 --- a/crates/erp-ai/Cargo.toml +++ b/crates/erp-ai/Cargo.toml @@ -26,3 +26,4 @@ sha2.workspace = true redis.workspace = true hex.workspace = true regex-lite.workspace = true +pdf-extract.workspace = true diff --git a/crates/erp-ai/src/service/document/mod.rs b/crates/erp-ai/src/service/document/mod.rs new file mode 100644 index 0000000..b8e9fc7 --- /dev/null +++ b/crates/erp-ai/src/service/document/mod.rs @@ -0,0 +1,459 @@ +pub mod chunker; +pub mod parser; + +use sea_orm::{ + ColumnTrait, ConnectionTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, Set, +}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use crate::entity::ai_knowledge_documents; +use crate::error::{AiError, AiResult}; +use crate::service::embedding::{EmbeddingService, format_vector}; +use crate::service::knowledge_v2::KnowledgeV2Service; + +use std::sync::Arc; + +// ─── DTO ─── + +#[derive(Debug, Deserialize, Serialize, utoipa::ToSchema)] +pub struct CreateDocumentReq { + pub title: String, + pub doc_type: Option, + pub source_type: Option, + pub source_url: Option, + pub content: Option, +} + +#[derive(Debug, Deserialize, utoipa::IntoParams)] +pub struct ListDocumentsQuery { + pub status: Option, + pub page: Option, + pub page_size: Option, +} + +#[derive(Debug)] +pub struct UploadDocumentParams { + pub file_name: String, + pub file_size: i64, + pub mime_type: String, + pub content: String, +} + +// ─── Service ─── + +pub struct DocumentService { + db: sea_orm::DatabaseConnection, + knowledge_v2: Arc, + embedding: Arc, +} + +impl DocumentService { + pub fn new( + db: sea_orm::DatabaseConnection, + knowledge_v2: Arc, + embedding: Arc, + ) -> Self { + Self { + db, + knowledge_v2, + embedding, + } + } + + pub async fn list_documents( + &self, + tenant_id: Uuid, + kb_id: Uuid, + query: &ListDocumentsQuery, + ) -> AiResult<(Vec, u64)> { + let page = query.page.unwrap_or(1); + let page_size = query.page_size.unwrap_or(20); + + let mut find = ai_knowledge_documents::Entity::find() + .filter(ai_knowledge_documents::Column::TenantId.eq(tenant_id)) + .filter(ai_knowledge_documents::Column::KnowledgeBaseId.eq(kb_id)) + .filter(ai_knowledge_documents::Column::DeletedAt.is_null()); + + if let Some(ref status) = query.status { + find = find.filter(ai_knowledge_documents::Column::Status.eq(status.as_str())); + } + + let paginator = find + .order_by_desc(ai_knowledge_documents::Column::CreatedAt) + .paginate(&self.db, page_size); + + let total = paginator.num_items().await?; + let items = paginator.fetch_page(page - 1).await?; + + Ok((items, total)) + } + + pub async fn get_document( + &self, + tenant_id: Uuid, + id: Uuid, + ) -> AiResult { + ai_knowledge_documents::Entity::find_by_id(id) + .one(&self.db) + .await + .map_err(|e| AiError::DbError(e.to_string()))? + .filter(|m| m.tenant_id == tenant_id && m.deleted_at.is_none()) + .ok_or_else(|| AiError::KnowledgeError("文档不存在".into())) + } + + /// 创建手动输入文档并立即处理 + pub async fn create_manual_document( + &self, + tenant_id: Uuid, + user_id: Uuid, + kb_id: Uuid, + req: CreateDocumentReq, + ) -> AiResult { + // 验证知识库存在 + self.knowledge_v2.get_by_id(tenant_id, kb_id).await?; + + let id = Uuid::now_v7(); + let now = chrono::Utc::now(); + + let active = ai_knowledge_documents::ActiveModel { + id: Set(id), + tenant_id: Set(tenant_id), + knowledge_base_id: Set(kb_id), + title: Set(req.title), + doc_type: Set(req.doc_type.unwrap_or_else(|| "manual".into())), + source_type: Set(req.source_type.unwrap_or_else(|| "manual".into())), + source_url: Set(req.source_url), + file_name: Set(None), + file_size: Set(None), + file_mime_type: Set(None), + content: Set(req.content), + status: Set("pending".into()), + chunk_count: Set(0), + embedded_count: Set(0), + error_message: Set(None), + processing_started_at: Set(None), + processing_completed_at: Set(None), + created_at: Set(now), + updated_at: Set(now), + created_by: Set(Some(user_id)), + updated_by: Set(Some(user_id)), + deleted_at: Set(None), + version_lock: Set(1), + }; + + ai_knowledge_documents::Entity::insert(active) + .exec(&self.db) + .await + .map_err(|e| AiError::DbError(e.to_string()))?; + + // 异步处理文档(切片 + 嵌入) + self.knowledge_v2.increment_document_count(kb_id, 1).await?; + self.process_document(id).await?; + + Ok(id) + } + + /// 创建文件上传文档记录 + pub async fn create_upload_document( + &self, + tenant_id: Uuid, + user_id: Uuid, + kb_id: Uuid, + title: String, + params: UploadDocumentParams, + ) -> AiResult { + self.knowledge_v2.get_by_id(tenant_id, kb_id).await?; + + let id = Uuid::now_v7(); + let now = chrono::Utc::now(); + + let doc_type = mime_to_doc_type(¶ms.mime_type); + + let active = ai_knowledge_documents::ActiveModel { + id: Set(id), + tenant_id: Set(tenant_id), + knowledge_base_id: Set(kb_id), + title: Set(title), + doc_type: Set(doc_type), + source_type: Set("upload".into()), + source_url: Set(None), + file_name: Set(Some(params.file_name)), + file_size: Set(Some(params.file_size)), + file_mime_type: Set(Some(params.mime_type)), + content: Set(Some(params.content)), + status: Set("pending".into()), + chunk_count: Set(0), + embedded_count: Set(0), + error_message: Set(None), + processing_started_at: Set(None), + processing_completed_at: Set(None), + created_at: Set(now), + updated_at: Set(now), + created_by: Set(Some(user_id)), + updated_by: Set(Some(user_id)), + deleted_at: Set(None), + version_lock: Set(1), + }; + + ai_knowledge_documents::Entity::insert(active) + .exec(&self.db) + .await + .map_err(|e| AiError::DbError(e.to_string()))?; + + self.knowledge_v2.increment_document_count(kb_id, 1).await?; + self.process_document(id).await?; + + Ok(id) + } + + pub async fn delete_document(&self, tenant_id: Uuid, kb_id: Uuid, id: Uuid) -> AiResult<()> { + let existing = self.get_document(tenant_id, id).await?; + if existing.knowledge_base_id != kb_id { + return Err(AiError::KnowledgeError("文档不属于该知识库".into())); + } + + let now = chrono::Utc::now(); + let active = ai_knowledge_documents::ActiveModel { + id: Set(existing.id), + tenant_id: Set(existing.tenant_id), + knowledge_base_id: Set(existing.knowledge_base_id), + title: Set(existing.title), + doc_type: Set(existing.doc_type), + source_type: Set(existing.source_type), + source_url: Set(existing.source_url), + file_name: Set(existing.file_name), + file_size: Set(existing.file_size), + file_mime_type: Set(existing.file_mime_type), + content: Set(existing.content), + status: Set(existing.status), + chunk_count: Set(existing.chunk_count), + embedded_count: Set(existing.embedded_count), + error_message: Set(existing.error_message), + processing_started_at: Set(existing.processing_started_at), + processing_completed_at: Set(existing.processing_completed_at), + created_at: Set(existing.created_at), + updated_at: Set(now), + created_by: Set(existing.created_by), + updated_by: Set(existing.updated_by), + deleted_at: Set(Some(now)), + version_lock: Set(existing.version_lock + 1), + }; + + ai_knowledge_documents::Entity::update(active) + .exec(&self.db) + .await + .map_err(|e| AiError::DbError(e.to_string()))?; + + self.knowledge_v2 + .increment_document_count(kb_id, -1) + .await?; + self.knowledge_v2 + .increment_chunk_count(kb_id, -existing.chunk_count) + .await?; + + Ok(()) + } + + /// 处理文档:切片 → 嵌入 → 更新状态 + async fn process_document(&self, doc_id: Uuid) -> AiResult<()> { + let now = chrono::Utc::now(); + + // 标记处理中 + self.update_doc_status(doc_id, "processing", None, Some(now), None) + .await?; + + let doc = match ai_knowledge_documents::Entity::find_by_id(doc_id) + .one(&self.db) + .await + { + Ok(Some(d)) if d.deleted_at.is_none() => d, + _ => { + self.update_doc_status( + doc_id, + "failed", + Some("文档未找到".into()), + None, + Some(now), + ) + .await?; + return Ok(()); + } + }; + + let content = match &doc.content { + Some(c) if !c.trim().is_empty() => c.clone(), + _ => { + self.update_doc_status( + doc_id, + "failed", + Some("文档内容为空".into()), + None, + Some(now), + ) + .await?; + return Ok(()); + } + }; + + // 切片 + let chunks = chunker::chunk_text(&content, 500, 50); + if chunks.is_empty() { + self.update_doc_status( + doc_id, + "failed", + Some("切片结果为空".into()), + None, + Some(now), + ) + .await?; + return Ok(()); + } + + // 嵌入 + 存储 + let mut embedded_count = 0u32; + for (idx, chunk_content) in chunks.iter().enumerate() { + let chunk_id = Uuid::now_v7(); + let embedding = self.try_embed(chunk_content).await; + + let embedding_val = embedding + .as_ref() + .map(|e| sea_orm::Value::String(Some(Box::new(format_vector(e))))) + .unwrap_or(sea_orm::Value::String(None)); + + let sql = r#" + INSERT INTO ai_knowledge_chunks + (id, tenant_id, knowledge_base_id, document_id, chunk_index, content, + embedding, metadata, hit_count, created_at, updated_at, created_by, updated_by, deleted_at) + VALUES ($1, $2, $3, $4, $5, $6, $7::vector, '{}', 0, $8, $8, $9, $9, NULL) + "#; + + let stmt = sea_orm::Statement::from_sql_and_values( + sea_orm::DatabaseBackend::Postgres, + sql, + [ + sea_orm::Value::from(chunk_id), + sea_orm::Value::from(doc.tenant_id), + sea_orm::Value::from(doc.knowledge_base_id), + sea_orm::Value::from(doc_id), + sea_orm::Value::from(idx as i32), + sea_orm::Value::String(Some(Box::new(chunk_content.clone()))), + embedding_val, + sea_orm::Value::from(now), + sea_orm::Value::from(doc.created_by), + ], + ); + + match self.db.execute(stmt).await { + Ok(_) => { + if embedding.is_some() { + embedded_count += 1; + } + } + Err(e) => { + tracing::warn!(chunk_index = idx, error = %e, "切片插入失败,跳过"); + } + } + } + + // 更新文档状态 + let completed_now = chrono::Utc::now(); + let sql = r#" + UPDATE ai_knowledge_documents + SET status = 'completed', chunk_count = $2, embedded_count = $3, + processing_completed_at = $4, updated_at = $4, version_lock = version_lock + 1 + WHERE id = $1 AND deleted_at IS NULL + "#; + let stmt = sea_orm::Statement::from_sql_and_values( + sea_orm::DatabaseBackend::Postgres, + sql, + [ + sea_orm::Value::from(doc_id), + sea_orm::Value::from(chunks.len() as i32), + sea_orm::Value::from(embedded_count as i32), + sea_orm::Value::from(completed_now), + ], + ); + self.db + .execute(stmt) + .await + .map_err(|e| AiError::DbError(e.to_string()))?; + + // 原子递增知识库切片计数 + self.knowledge_v2 + .increment_chunk_count(doc.knowledge_base_id, chunks.len() as i32) + .await?; + + Ok(()) + } + + async fn update_doc_status( + &self, + doc_id: Uuid, + status: &str, + error: Option, + started_at: Option>, + completed_at: Option>, + ) -> AiResult<()> { + let now = chrono::Utc::now(); + let mut values: Vec = vec![ + sea_orm::Value::from(doc_id), + sea_orm::Value::String(Some(Box::new(status.to_string()))), + error + .map(|e| sea_orm::Value::String(Some(Box::new(e)))) + .unwrap_or(sea_orm::Value::String(None)), + sea_orm::Value::from(now), + ]; + + let mut extra_sql = String::new(); + if let Some(sa) = started_at { + values.push(sea_orm::Value::from(sa)); + extra_sql.push_str(", processing_started_at = $5"); + } + if let Some(ca) = completed_at { + values.push(sea_orm::Value::from(ca)); + let idx = values.len(); + extra_sql.push_str(&format!(", processing_completed_at = ${}", idx)); + } + + let sql = format!( + "UPDATE ai_knowledge_documents SET status = $2, error_message = $3, updated_at = $4, version_lock = version_lock + 1{} WHERE id = $1 AND deleted_at IS NULL", + extra_sql + ); + + let stmt = sea_orm::Statement::from_sql_and_values( + sea_orm::DatabaseBackend::Postgres, + &sql, + values, + ); + self.db + .execute(stmt) + .await + .map_err(|e| AiError::DbError(e.to_string()))?; + Ok(()) + } + + async fn try_embed(&self, text: &str) -> Option> { + if !self.embedding.is_configured() { + return None; + } + match self.embedding.embed(text).await { + Ok(e) => Some(e), + Err(e) => { + tracing::warn!(error = %e, "Embedding 生成失败"); + None + } + } + } +} + +fn mime_to_doc_type(mime: &str) -> String { + match mime { + "application/pdf" => "pdf".into(), + "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => "docx".into(), + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => "xlsx".into(), + "text/plain" => "txt".into(), + "text/markdown" => "md".into(), + _ => "other".into(), + } +} diff --git a/crates/erp-ai/src/service/document/parser.rs b/crates/erp-ai/src/service/document/parser.rs new file mode 100644 index 0000000..6f969c4 --- /dev/null +++ b/crates/erp-ai/src/service/document/parser.rs @@ -0,0 +1,60 @@ +use crate::error::{AiError, AiResult}; + +/// 从文件内容解析出纯文本 +pub fn parse_document(file_name: &str, mime_type: &str, data: &[u8]) -> AiResult { + match mime_type { + "application/pdf" => parse_pdf(data), + "text/plain" | "text/markdown" => parse_text(data), + _ => { + if file_name.ends_with(".pdf") { + return parse_pdf(data); + } + // DOCX/XLSX 等二进制格式用 UTF-8 lossy 提取可读文本 + // 后续 Phase 可替换为专业解析器 + if file_name.ends_with(".txt") || file_name.ends_with(".md") { + return parse_text(data); + } + // 二进制格式兜底:提取 UTF-8 可读片段 + parse_binary_text(data) + } + } +} + +fn parse_pdf(data: &[u8]) -> AiResult { + pdf_extract::extract_text_from_mem(data) + .map(|t| t.trim().to_string()) + .map_err(|e| AiError::KnowledgeError(format!("PDF 解析失败: {}", e))) +} + +fn parse_text(data: &[u8]) -> AiResult { + Ok(String::from_utf8_lossy(data).trim().to_string()) +} + +/// 从二进制文件中提取可读文本片段(DOCX/XLSX 兜底方案) +fn parse_binary_text(data: &[u8]) -> AiResult { + let text = String::from_utf8_lossy(data); + let mut readable = String::new(); + let mut chunk = String::new(); + + for ch in text.chars() { + let punctuation = ",。、;:\u{201c}\u{201d}\u{2018}\u{2019}!?()《》【】…—·\t\n\r"; + if ch.is_alphanumeric() || ch.is_whitespace() || punctuation.contains(ch) { + chunk.push(ch); + } else if !chunk.trim().is_empty() { + readable.push_str(chunk.trim()); + readable.push(' '); + chunk.clear(); + } + } + if !chunk.trim().is_empty() { + readable.push_str(chunk.trim()); + } + + let result = readable.split_whitespace().collect::>().join(" "); + if result.len() < 20 { + return Err(AiError::KnowledgeError( + "无法从文件中提取有效文本内容".into(), + )); + } + Ok(result) +}