//! 遥测服务逻辑 use sqlx::PgPool; use crate::error::SaasResult; use crate::models::{TelemetryModelStatsRow, TelemetryDailyStatsRow, TelemetryReportRow}; use super::types::*; const CHUNK_SIZE: usize = 100; /// 批量写入遥测记录(分块多行 INSERT,每 chunk 100 条) pub async fn ingest_telemetry( db: &PgPool, account_id: &str, device_id: &str, app_version: &str, entries: &[TelemetryEntry], ) -> SaasResult { // 预验证所有条目,分离有效/无效 let now = chrono::Utc::now().to_rfc3339(); let mut rejected = 0usize; let valid: Vec<&TelemetryEntry> = entries.iter().filter(|e| { if e.input_tokens < 0 || e.output_tokens < 0 || e.model_id.is_empty() { rejected += 1; false } else { true } }).collect(); if valid.is_empty() { return Ok(TelemetryReportResponse { accepted: 0, rejected }); } let mut tx = db.begin().await?; let mut accepted = 0usize; let cols = 13; for chunk in valid.chunks(CHUNK_SIZE) { // 预分配所有参数(拥有所有权) let ids: Vec = (0..chunk.len()).map(|_| uuid::Uuid::new_v4().to_string()).collect(); // 构建 VALUES 占位符 let placeholders: Vec = (0..chunk.len()) .map(|i| { let base = i * cols + 1; format!("(${},${},${},${},${},${},${},${},${},${},${},${},${})", base, base+1, base+2, base+3, base+4, base+5, base+6, base+7, base+8, base+9, base+10, base+11, base+12) }).collect(); let sql = format!( "INSERT INTO telemetry_reports \ (id, account_id, device_id, app_version, model_id, input_tokens, output_tokens, \ latency_ms, success, error_type, connection_mode, reported_at, created_at) VALUES {}", placeholders.join(", ") ); let mut query = sqlx::query(&sql); for (i, entry) in chunk.iter().enumerate() { query = query .bind(&ids[i]) .bind(account_id) .bind(device_id) .bind(app_version) .bind(&entry.model_id) .bind(entry.input_tokens) .bind(entry.output_tokens) .bind(entry.latency_ms) .bind(entry.success) .bind(&entry.error_type) .bind(&entry.connection_mode) .bind(&entry.timestamp) .bind(&now); } match query.execute(&mut *tx).await { Ok(result) => accepted += result.rows_affected() as usize, Err(e) => { tracing::warn!("Failed to insert telemetry chunk: {}", e); rejected += chunk.len(); } } } tx.commit().await?; Ok(TelemetryReportResponse { accepted, rejected }) } /// 按模型聚合用量统计 pub async fn get_model_stats( db: &PgPool, account_id: &str, query: &TelemetryStatsQuery, ) -> SaasResult> { let mut param_idx: i32 = 1; let mut where_clauses = vec![format!("account_id = ${}", param_idx)]; let mut params: Vec = vec![account_id.to_string()]; param_idx += 1; if let Some(ref from) = query.from { where_clauses.push(format!("reported_at >= ${}", param_idx)); params.push(from.clone()); param_idx += 1; } if let Some(ref to) = query.to { where_clauses.push(format!("reported_at <= ${}", param_idx)); params.push(to.clone()); param_idx += 1; } if let Some(ref model) = query.model_id { where_clauses.push(format!("model_id = ${}", param_idx)); params.push(model.clone()); param_idx += 1; } if let Some(ref mode) = query.connection_mode { where_clauses.push(format!("connection_mode = ${}", param_idx)); params.push(mode.clone()); param_idx += 1; } let where_sql = where_clauses.join(" AND "); let _ = param_idx; // used in loop above, suppress unused-assignment warning let sql = format!( "SELECT model_id, COUNT(*)::bigint as request_count, COALESCE(SUM(input_tokens), 0)::bigint as input_tokens, COALESCE(SUM(output_tokens), 0)::bigint as output_tokens, AVG(latency_ms) as avg_latency_ms, (COUNT(*) FILTER (WHERE success = true))::float / NULLIF(COUNT(*), 0) as success_rate FROM telemetry_reports WHERE {} GROUP BY model_id ORDER BY request_count DESC LIMIT 50", where_sql ); let mut query_builder = sqlx::query_as::<_, TelemetryModelStatsRow>(&sql); for p in ¶ms { query_builder = query_builder.bind(p); } let rows = query_builder.fetch_all(db).await?; let stats: Vec = rows .into_iter() .map(|r| { ModelUsageStat { model_id: r.model_id, request_count: r.request_count, input_tokens: r.input_tokens, output_tokens: r.output_tokens, avg_latency_ms: r.avg_latency_ms, success_rate: r.success_rate.unwrap_or(0.0), } }) .collect(); Ok(stats) } /// 写入审计日志摘要(分块多行 INSERT,每 chunk 100 条) pub async fn ingest_audit_summary( db: &PgPool, account_id: &str, device_id: &str, entries: &[AuditSummaryEntry], ) -> SaasResult { // 预过滤空 action let valid: Vec<_> = entries.iter().filter(|e| !e.action.is_empty()).collect(); if valid.is_empty() { return Ok(0); } let mut tx = db.begin().await?; let mut written = 0usize; // 每行 6 列参数 let cols = 6; for chunk in valid.chunks(CHUNK_SIZE) { let mut sql = String::from( "INSERT INTO operation_logs (account_id, action, target_type, target_id, details, created_at) VALUES " ); let placeholders: Vec = (0..chunk.len()) .map(|i| { let base = i * cols + 1; format!("(${},${},${},${},${},${})", base, base+1, base+2, base+3, base+4, base+5) }).collect(); sql.push_str(&placeholders.join(", ")); // 预收集 details(拥有所有权),避免借用生命周期问题 let details_list: Vec = chunk.iter().map(|entry| { serde_json::json!({ "source": "desktop", "device_id": device_id, "result": entry.result, }) }).collect(); let mut query = sqlx::query(&sql); for (i, entry) in chunk.iter().enumerate() { query = query .bind(account_id) .bind(&entry.action) .bind("desktop_audit") .bind(&entry.target) .bind(&details_list[i]) .bind(&entry.timestamp); } match query.execute(&mut *tx).await { Ok(result) => written += result.rows_affected() as usize, Err(e) => { tracing::warn!("Failed to insert audit summary chunk: {}", e); } } } tx.commit().await?; Ok(written) } /// 按天聚合用量统计 pub async fn get_daily_stats( db: &PgPool, account_id: &str, query: &TelemetryStatsQuery, ) -> SaasResult> { let days = query.days.unwrap_or(30).min(90).max(1) as i64; // Rust 侧计算日期范围,避免 format!() 拼 SQL let from_ts = (chrono::Utc::now() - chrono::Duration::days(days)) .date_naive() .and_hms_opt(0, 0, 0).unwrap() .and_utc() .to_rfc3339(); let sql = "SELECT reported_at::date::text as day, COUNT(*)::bigint as request_count, COALESCE(SUM(input_tokens), 0)::bigint as input_tokens, COALESCE(SUM(output_tokens), 0)::bigint as output_tokens, COUNT(DISTINCT device_id)::bigint as unique_devices FROM telemetry_reports WHERE account_id = $1 AND reported_at >= $2 GROUP BY reported_at::date ORDER BY day DESC"; let rows: Vec = sqlx::query_as(sql).bind(account_id).bind(&from_ts).fetch_all(db).await?; let stats: Vec = rows .into_iter() .map(|r| { DailyUsageStat { day: r.day, request_count: r.request_count, input_tokens: r.input_tokens, output_tokens: r.output_tokens, unique_devices: r.unique_devices, } }) .collect(); Ok(stats) } /// 查询账号最近的遥测报告 pub async fn get_recent_reports( db: &PgPool, account_id: &str, limit: i64, ) -> SaasResult> { let limit = limit.min(100).max(1); let rows = sqlx::query_as::<_, TelemetryReportRow>( "SELECT id, account_id, device_id, app_version, model_id, \ input_tokens, output_tokens, latency_ms, success, \ error_type, connection_mode, \ reported_at::text, created_at::text \ FROM telemetry_reports \ WHERE account_id = $1 \ ORDER BY reported_at DESC \ LIMIT $2" ) .bind(account_id) .bind(limit) .fetch_all(db) .await?; Ok(rows) }