From eab9b5fdccfc1518027df7009e6c0b327b859f80 Mon Sep 17 00:00:00 2001 From: iven Date: Wed, 8 Apr 2026 08:33:54 +0800 Subject: [PATCH] =?UTF-8?q?fix(saas):=20WorkerDispatcher=20registration=20?= =?UTF-8?q?race=20=E2=80=94=20consumer=20starts=20after=20all=20workers=20?= =?UTF-8?q?registered?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause: start_consumer() was called in new() before any register() calls, so the consumer's cloned HashMap was always empty. Workers like log_operation and record_usage were never found, causing "Unknown worker" errors. - Add WorkerDispatcher::start() method to be called after all register()s - Update main.rs to call dispatcher.start() after 7 workers registered --- crates/zclaw-saas/src/main.rs | 1 + crates/zclaw-saas/src/workers/mod.rs | 13 +++++++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/crates/zclaw-saas/src/main.rs b/crates/zclaw-saas/src/main.rs index 6221cc0..e0be71b 100644 --- a/crates/zclaw-saas/src/main.rs +++ b/crates/zclaw-saas/src/main.rs @@ -48,6 +48,7 @@ async fn main() -> anyhow::Result<()> { dispatcher.register(UpdateLastUsedWorker); dispatcher.register(AggregateUsageWorker); dispatcher.register(GenerateEmbeddingWorker); + dispatcher.start(); // 必须在所有 register() 之后调用 info!("Worker dispatcher initialized (7 workers registered)"); // 优雅停机令牌 — 取消后所有 SSE 流和长连接立即终止 diff --git a/crates/zclaw-saas/src/workers/mod.rs b/crates/zclaw-saas/src/workers/mod.rs index 75846e0..b57a4ea 100644 --- a/crates/zclaw-saas/src/workers/mod.rs +++ b/crates/zclaw-saas/src/workers/mod.rs @@ -104,8 +104,9 @@ impl WorkerDispatcher { spawn_limiter, }; - // 启动消费循环 - dispatcher.start_consumer(receiver); + // 注意:不在此处启动消费循环 — 调用方需在 register() 全部完成后调用 start() + // 否则 consumer 持有的是空 handlers 克隆 + let _ = receiver; // 由 start() 消费 dispatcher } @@ -121,6 +122,14 @@ impl WorkerDispatcher { ); } + /// 启动消费循环(必须在所有 register() 完成后调用) + pub fn start(self: &mut Self) { + // 重新创建 channel,start_consumer 需要持有最新的 handlers + let (sender, receiver) = mpsc::channel(1024); + self.sender = sender; + self.start_consumer(receiver); + } + /// 派发任务(非阻塞) pub async fn dispatch(&self, worker_name: &str, args: A) -> SaasResult<()> where