This commit is contained in:
Nikolay Kim 2025-03-28 08:30:47 +01:00
parent 965e825662
commit 8e1f12a32b
2 changed files with 14 additions and 4 deletions

View file

@ -208,8 +208,10 @@ impl<F: ServerConfiguration> HandleCmdState<F> {
}
fn update_workers(&mut self, upd: Update<F::Item>) {
println!("======== UPDATE WORKERS ===================== {:?}", self.workers.len());
match upd {
Update::Available(worker) => {
println!("======== UPDATE WORKERS: avail");
self.workers.push(worker);
self.workers.sort();
if self.workers.len() == 1 {
@ -217,6 +219,7 @@ impl<F: ServerConfiguration> HandleCmdState<F> {
}
}
Update::Unavailable(worker) => {
println!("======== UPDATE WORKERS: not-avail {:?}", self.workers.len());
if let Ok(idx) = self.workers.binary_search(&worker) {
self.workers.remove(idx);
}

View file

@ -155,6 +155,7 @@ impl<T> Worker<T> {
if self.avail.failed() {
self.failed.store(true, Ordering::Release);
}
println!("-------- update status {:?}", self.status());
self.status()
}
}
@ -237,6 +238,7 @@ impl WorkerAvailability {
async fn wait_for_update(&self) {
poll_fn(|cx| {
if self.inner.updated.load(Ordering::Acquire) {
println!("-------- status updated");
self.inner.updated.store(false, Ordering::Release);
Poll::Ready(())
} else {
@ -287,11 +289,17 @@ where
let mut recv = std::pin::pin!(wrk.rx.recv());
let fut = poll_fn(|cx| {
match svc.poll_ready(cx) {
Poll::Ready(res) => {
res?;
Poll::Ready(Ok(())) => {
println!("-------- status updated: ready");
wrk.availability.set(true);
}
Poll::Ready(Err(err)) => {
println!("-------- status updated: failed");
wrk.availability.set(false);
return Poll::Ready(Err(err));
}
Poll::Pending => {
println!("-------- status updated: pending");
wrk.availability.set(false);
return Poll::Pending;
}
@ -318,7 +326,6 @@ where
let _ = ntex_rt::spawn(async move {
svc.shutdown().await;
});
wrk.availability.set(false);
}
Either::Right(Some(Shutdown { timeout, result })) => {
wrk.availability.set(false);
@ -333,6 +340,7 @@ where
return;
}
Either::Left(Ok(false)) | Either::Right(None) => {
wrk.availability.set(false);
stop_svc(wrk.id, svc, STOP_TIMEOUT, None).await;
return;
}
@ -342,7 +350,6 @@ where
loop {
match select(wrk.factory.create(()), stream_recv(&mut wrk.stop)).await {
Either::Left(Ok(service)) => {
wrk.availability.set(true);
svc = Pipeline::new(service).bind();
break;
}