fix(saas): WorkerDispatcher registration race — consumer starts after all workers registered
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
Root cause: start_consumer() was called in new() before any register() calls, so the consumer's cloned HashMap was always empty. Workers like log_operation and record_usage were never found, causing "Unknown worker" errors. - Add WorkerDispatcher::start() method to be called after all register()s - Update main.rs to call dispatcher.start() after 7 workers registered
This commit is contained in:
@@ -48,6 +48,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
dispatcher.register(UpdateLastUsedWorker);
|
dispatcher.register(UpdateLastUsedWorker);
|
||||||
dispatcher.register(AggregateUsageWorker);
|
dispatcher.register(AggregateUsageWorker);
|
||||||
dispatcher.register(GenerateEmbeddingWorker);
|
dispatcher.register(GenerateEmbeddingWorker);
|
||||||
|
dispatcher.start(); // 必须在所有 register() 之后调用
|
||||||
info!("Worker dispatcher initialized (7 workers registered)");
|
info!("Worker dispatcher initialized (7 workers registered)");
|
||||||
|
|
||||||
// 优雅停机令牌 — 取消后所有 SSE 流和长连接立即终止
|
// 优雅停机令牌 — 取消后所有 SSE 流和长连接立即终止
|
||||||
|
|||||||
@@ -104,8 +104,9 @@ impl WorkerDispatcher {
|
|||||||
spawn_limiter,
|
spawn_limiter,
|
||||||
};
|
};
|
||||||
|
|
||||||
// 启动消费循环
|
// 注意:不在此处启动消费循环 — 调用方需在 register() 全部完成后调用 start()
|
||||||
dispatcher.start_consumer(receiver);
|
// 否则 consumer 持有的是空 handlers 克隆
|
||||||
|
let _ = receiver; // 由 start() 消费
|
||||||
|
|
||||||
dispatcher
|
dispatcher
|
||||||
}
|
}
|
||||||
@@ -121,6 +122,14 @@ impl WorkerDispatcher {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// 启动消费循环(必须在所有 register() 完成后调用)
|
||||||
|
pub fn start(self: &mut Self) {
|
||||||
|
// 重新创建 channel,start_consumer 需要持有最新的 handlers
|
||||||
|
let (sender, receiver) = mpsc::channel(1024);
|
||||||
|
self.sender = sender;
|
||||||
|
self.start_consumer(receiver);
|
||||||
|
}
|
||||||
|
|
||||||
/// 派发任务(非阻塞)
|
/// 派发任务(非阻塞)
|
||||||
pub async fn dispatch<A>(&self, worker_name: &str, args: A) -> SaasResult<()>
|
pub async fn dispatch<A>(&self, worker_name: &str, args: A) -> SaasResult<()>
|
||||||
where
|
where
|
||||||
|
|||||||
Reference in New Issue
Block a user