diff --git a/Cargo.toml b/Cargo.toml index 871d9de2..11f67481 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,8 @@ ntex-util = { path = "ntex-util" } ntex-compio = { path = "ntex-compio" } ntex-tokio = { path = "ntex-tokio" } +ntex-neon = { git = "https://github.com/ntex-rs/neon.git", branch = "iour-fix" } + [workspace.dependencies] async-task = "4.5.0" bitflags = "2" diff --git a/ntex-server/src/wrk.rs b/ntex-server/src/wrk.rs index a61f7731..85eb896b 100644 --- a/ntex-server/src/wrk.rs +++ b/ntex-server/src/wrk.rs @@ -99,10 +99,10 @@ impl Worker { log::debug!("Creating server instance in {:?}", id); let factory = cfg.create().await; - log::debug!("Server instance has been created in {:?}", id); match create(id, rx1, rx2, factory, avail_tx).await { Ok((svc, wrk)) => { + log::debug!("Server instance has been created in {:?}", id); run_worker(svc, wrk).await; } Err(e) => { @@ -241,7 +241,7 @@ impl WorkerAvailabilityTx { /// Worker accepts message via unbounded channel and starts processing. struct WorkerSt> { id: WorkerId, - rx: Pin>>, + rx: Receiver, stop: Pin>>, factory: F, availability: WorkerAvailabilityTx, @@ -252,21 +252,45 @@ where T: Send + 'static, F: ServiceFactory + 'static, { - loop { - let fut = poll_fn(|cx| { - ready!(svc.poll_ready(cx)?); + //println!("------- start worker {:?}", wrk.id); - if let Some(item) = ready!(Pin::new(&mut wrk.rx).poll_next(cx)) { - let fut = svc.call(item); - let _ = spawn(async move { - let _ = fut.await; - }); + loop { + //println!("------- run worker {:?}", wrk.id); + let mut recv = std::pin::pin!(wrk.rx.recv()); + let fut = poll_fn(|cx| { + match svc.poll_ready(cx) { + Poll::Ready(res) => { + res?; + wrk.availability.set(true); + } + Poll::Pending => { + wrk.availability.set(false); + return Poll::Pending + } + } + + //println!("------- waiting socket {:?}", wrk.id); + match ready!(recv.as_mut().poll(cx)) { + Ok(item) => { + //println!("------- got {:?}", wrk.id); + + let fut = svc.call(item); + let _ = spawn(async move { + let _ = fut.await; + }); + Poll::Ready(Ok::<_, F::Error>(true)) + } + Err(_) => { + //println!("------- failed {:?}", wrk.id); + + log::error!("Server is gone"); + Poll::Ready(Ok(false)) + } } - Poll::Ready(Ok::<(), F::Error>(())) }); match select(fut, stream_recv(&mut wrk.stop)).await { - Either::Left(Ok(())) => continue, + Either::Left(Ok(true)) => continue, Either::Left(Err(_)) => { let _ = ntex_rt::spawn(async move { svc.shutdown().await; @@ -285,7 +309,7 @@ where stop_svc(wrk.id, svc, timeout, Some(result)).await; return; } - Either::Right(None) => { + Either::Left(Ok(false)) | Either::Right(None) => { stop_svc(wrk.id, svc, STOP_TIMEOUT, None).await; return; } @@ -336,8 +360,6 @@ where { availability.set(false); let factory = factory?; - - let rx = Box::pin(rx); let mut stop = Box::pin(stop); let svc = match select(factory.create(()), stream_recv(&mut stop)).await { @@ -356,9 +378,9 @@ where svc, WorkerSt { id, + rx, factory, availability, - rx: Box::pin(rx), stop: Box::pin(stop), }, )) diff --git a/ntex/tests/http_openssl.rs b/ntex/tests/http_openssl.rs index 5aab2e57..0c7d65c3 100644 --- a/ntex/tests/http_openssl.rs +++ b/ntex/tests/http_openssl.rs @@ -442,7 +442,6 @@ async fn test_h2_client_drop() -> io::Result<()> { let count = count2.clone(); HttpService::build() .h2(move |req: Request| { - println!("HANDLE H2"); let count = count.clone(); async move { let _st = SetOnDrop(count); @@ -458,7 +457,7 @@ async fn test_h2_client_drop() -> io::Result<()> { let result = timeout(Millis(250), srv.srequest(Method::GET, "/").send()).await; assert!(result.is_err()); - sleep(Millis(250)).await; + sleep(Millis(150)).await; assert_eq!(count.load(Ordering::Relaxed), 1); Ok(()) }