This commit is contained in:
Nikolay Kim 2025-03-27 07:39:14 +01:00
parent 5cbbb26713
commit ea74503736
3 changed files with 41 additions and 18 deletions

View file

@ -45,6 +45,8 @@ ntex-util = { path = "ntex-util" }
ntex-compio = { path = "ntex-compio" } ntex-compio = { path = "ntex-compio" }
ntex-tokio = { path = "ntex-tokio" } ntex-tokio = { path = "ntex-tokio" }
ntex-neon = { git = "https://github.com/ntex-rs/neon.git", branch = "iour-fix" }
[workspace.dependencies] [workspace.dependencies]
async-task = "4.5.0" async-task = "4.5.0"
bitflags = "2" bitflags = "2"

View file

@ -99,10 +99,10 @@ impl<T> Worker<T> {
log::debug!("Creating server instance in {:?}", id); log::debug!("Creating server instance in {:?}", id);
let factory = cfg.create().await; let factory = cfg.create().await;
log::debug!("Server instance has been created in {:?}", id);
match create(id, rx1, rx2, factory, avail_tx).await { match create(id, rx1, rx2, factory, avail_tx).await {
Ok((svc, wrk)) => { Ok((svc, wrk)) => {
log::debug!("Server instance has been created in {:?}", id);
run_worker(svc, wrk).await; run_worker(svc, wrk).await;
} }
Err(e) => { Err(e) => {
@ -241,7 +241,7 @@ impl WorkerAvailabilityTx {
/// Worker accepts message via unbounded channel and starts processing. /// Worker accepts message via unbounded channel and starts processing.
struct WorkerSt<T, F: ServiceFactory<T>> { struct WorkerSt<T, F: ServiceFactory<T>> {
id: WorkerId, id: WorkerId,
rx: Pin<Box<dyn Stream<Item = T>>>, rx: Receiver<T>,
stop: Pin<Box<dyn Stream<Item = Shutdown>>>, stop: Pin<Box<dyn Stream<Item = Shutdown>>>,
factory: F, factory: F,
availability: WorkerAvailabilityTx, availability: WorkerAvailabilityTx,
@ -252,21 +252,45 @@ where
T: Send + 'static, T: Send + 'static,
F: ServiceFactory<T> + 'static, F: ServiceFactory<T> + 'static,
{ {
loop { //println!("------- start worker {:?}", wrk.id);
let fut = poll_fn(|cx| {
ready!(svc.poll_ready(cx)?);
if let Some(item) = ready!(Pin::new(&mut wrk.rx).poll_next(cx)) { loop {
let fut = svc.call(item); //println!("------- run worker {:?}", wrk.id);
let _ = spawn(async move { let mut recv = std::pin::pin!(wrk.rx.recv());
let _ = fut.await; let fut = poll_fn(|cx| {
}); match svc.poll_ready(cx) {
Poll::Ready(res) => {
res?;
wrk.availability.set(true);
}
Poll::Pending => {
wrk.availability.set(false);
return Poll::Pending
}
}
//println!("------- waiting socket {:?}", wrk.id);
match ready!(recv.as_mut().poll(cx)) {
Ok(item) => {
//println!("------- got {:?}", wrk.id);
let fut = svc.call(item);
let _ = spawn(async move {
let _ = fut.await;
});
Poll::Ready(Ok::<_, F::Error>(true))
}
Err(_) => {
//println!("------- failed {:?}", wrk.id);
log::error!("Server is gone");
Poll::Ready(Ok(false))
}
} }
Poll::Ready(Ok::<(), F::Error>(()))
}); });
match select(fut, stream_recv(&mut wrk.stop)).await { match select(fut, stream_recv(&mut wrk.stop)).await {
Either::Left(Ok(())) => continue, Either::Left(Ok(true)) => continue,
Either::Left(Err(_)) => { Either::Left(Err(_)) => {
let _ = ntex_rt::spawn(async move { let _ = ntex_rt::spawn(async move {
svc.shutdown().await; svc.shutdown().await;
@ -285,7 +309,7 @@ where
stop_svc(wrk.id, svc, timeout, Some(result)).await; stop_svc(wrk.id, svc, timeout, Some(result)).await;
return; return;
} }
Either::Right(None) => { Either::Left(Ok(false)) | Either::Right(None) => {
stop_svc(wrk.id, svc, STOP_TIMEOUT, None).await; stop_svc(wrk.id, svc, STOP_TIMEOUT, None).await;
return; return;
} }
@ -336,8 +360,6 @@ where
{ {
availability.set(false); availability.set(false);
let factory = factory?; let factory = factory?;
let rx = Box::pin(rx);
let mut stop = Box::pin(stop); let mut stop = Box::pin(stop);
let svc = match select(factory.create(()), stream_recv(&mut stop)).await { let svc = match select(factory.create(()), stream_recv(&mut stop)).await {
@ -356,9 +378,9 @@ where
svc, svc,
WorkerSt { WorkerSt {
id, id,
rx,
factory, factory,
availability, availability,
rx: Box::pin(rx),
stop: Box::pin(stop), stop: Box::pin(stop),
}, },
)) ))

View file

@ -442,7 +442,6 @@ async fn test_h2_client_drop() -> io::Result<()> {
let count = count2.clone(); let count = count2.clone();
HttpService::build() HttpService::build()
.h2(move |req: Request| { .h2(move |req: Request| {
println!("HANDLE H2");
let count = count.clone(); let count = count.clone();
async move { async move {
let _st = SetOnDrop(count); let _st = SetOnDrop(count);
@ -458,7 +457,7 @@ async fn test_h2_client_drop() -> io::Result<()> {
let result = timeout(Millis(250), srv.srequest(Method::GET, "/").send()).await; let result = timeout(Millis(250), srv.srequest(Method::GET, "/").send()).await;
assert!(result.is_err()); assert!(result.is_err());
sleep(Millis(250)).await; sleep(Millis(150)).await;
assert_eq!(count.load(Ordering::Relaxed), 1); assert_eq!(count.load(Ordering::Relaxed), 1);
Ok(()) Ok(())
} }