diff --git a/ntex-util/CHANGES.md b/ntex-util/CHANGES.md index 3afaeb80..597a02f9 100644 --- a/ntex-util/CHANGES.md +++ b/ntex-util/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.0.1] - 2024-05-28 + +* Re-enable BufferService + ## [2.0.0] - 2024-05-28 * Use async fn for Service::ready() and Service::shutdown() diff --git a/ntex-util/Cargo.toml b/ntex-util/Cargo.toml index 88ddd23f..29447bcd 100644 --- a/ntex-util/Cargo.toml +++ b/ntex-util/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-util" -version = "2.0.0" +version = "2.0.1" authors = ["ntex contributors "] description = "Utilities for ntex framework" keywords = ["network", "framework", "async", "futures"] @@ -18,8 +18,8 @@ path = "src/lib.rs" [dependencies] ntex-service = "3.0" ntex-rt = "0.4" -bitflags = "2.4" -fxhash = "0.2.1" +bitflags = "2" +fxhash = "0.2" log = "0.4" slab = "0.4" futures-timer = "3.0" diff --git a/ntex-util/src/services/buffer.rs b/ntex-util/src/services/buffer.rs index d1b1cd3a..91752355 100644 --- a/ntex-util/src/services/buffer.rs +++ b/ntex-util/src/services/buffer.rs @@ -3,7 +3,7 @@ use std::cell::{Cell, RefCell}; use std::task::{ready, Poll}; use std::{collections::VecDeque, fmt, future::poll_fn, marker::PhantomData}; -use ntex_service::{Middleware, Service, ServiceCtx}; +use ntex_service::{Middleware, Pipeline, PipelineBinding, Service, ServiceCtx}; use crate::channel::oneshot; @@ -62,18 +62,19 @@ impl Clone for Buffer { impl Middleware for Buffer where - S: Service, + S: Service + 'static, + R: 'static, { type Service = BufferService; fn create(&self, service: S) -> Self::Service { BufferService { - service, + service: Pipeline::new(service).bind(), size: self.buf_size, - cancel_on_shutdown: self.cancel_on_shutdown, ready: Cell::new(false), buf: RefCell::new(VecDeque::with_capacity(self.buf_size)), next_call: RefCell::default(), + cancel_on_shutdown: self.cancel_on_shutdown, _t: PhantomData, } } @@ -109,26 +110,27 @@ impl std::error::Error for BufferService /// Default number of buffered requests is 16 pub struct BufferService> { size: usize, - cancel_on_shutdown: bool, ready: Cell, - service: S, + service: PipelineBinding, buf: RefCell>>>, next_call: RefCell>>, + cancel_on_shutdown: bool, _t: PhantomData, } impl BufferService where - S: Service, + S: Service + 'static, + R: 'static, { pub fn new(size: usize, service: S) -> Self { Self { size, - service, - cancel_on_shutdown: false, + service: Pipeline::new(service).bind(), ready: Cell::new(false), buf: RefCell::new(VecDeque::with_capacity(size)), next_call: RefCell::default(), + cancel_on_shutdown: false, _t: PhantomData, } } @@ -148,11 +150,11 @@ where fn clone(&self) -> Self { Self { size: self.size, - cancel_on_shutdown: self.cancel_on_shutdown, ready: Cell::new(false), service: self.service.clone(), buf: RefCell::new(VecDeque::with_capacity(self.size)), next_call: RefCell::default(), + cancel_on_shutdown: self.cancel_on_shutdown, _t: PhantomData, } } @@ -176,45 +178,50 @@ where impl Service for BufferService where - S: Service, + S: Service + 'static, + R: 'static, { type Response = S::Response; type Error = BufferServiceError; - #[inline] - async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { - let mut buffer = self.buf.borrow_mut(); - let mut next_call = self.next_call.borrow_mut(); - if let Some(next_call) = &*next_call { - // hold advancement until the last released task either makes a call or is dropped - let _ = ready!(next_call.poll_recv(cx)); - } - next_call.take(); + async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + poll_fn(|cx| { + let mut buffer = self.buf.borrow_mut(); + let mut next_call = self.next_call.borrow_mut(); + if let Some(next_call) = &*next_call { + // hold advancement until the last released task either makes a call or is dropped + let _ = ready!(next_call.poll_recv(cx)); + } + next_call.take(); - if self.service.poll_ready(cx)?.is_pending() { - if buffer.len() < self.size { - // buffer next request + if self.service.poll_ready(cx)?.is_pending() { + if buffer.len() < self.size { + // buffer next request + self.ready.set(false); + return Poll::Ready(Ok(())); + } else { + log::trace!("Buffer limit exceeded"); + return Poll::Pending; + } + } + + while let Some(sender) = buffer.pop_front() { + let (next_call_tx, next_call_rx) = oneshot::channel(); + if sender.send(next_call_tx).is_err() + || next_call_rx.poll_recv(cx).is_ready() + { + // the task is gone + continue; + } + next_call.replace(next_call_rx); self.ready.set(false); return Poll::Ready(Ok(())); - } else { - log::trace!("Buffer limit exceeded"); - return Poll::Pending; } - } - while let Some(sender) = buffer.pop_front() { - let (next_call_tx, next_call_rx) = oneshot::channel(); - if sender.send(next_call_tx).is_err() || next_call_rx.poll_recv(cx).is_ready() { - // the task is gone - continue; - } - next_call.replace(next_call_rx); - self.ready.set(false); - return Poll::Ready(Ok(())); - } - - self.ready.set(true); - Poll::Ready(Ok(())) + self.ready.set(true); + Poll::Ready(Ok(())) + }) + .await } async fn shutdown(&self) { @@ -252,6 +259,7 @@ where return Poll::Pending; } } + Poll::Ready(()) }) .await; @@ -261,11 +269,11 @@ where async fn call( &self, req: R, - ctx: ServiceCtx<'_, Self>, + _: ServiceCtx<'_, Self>, ) -> Result { if self.ready.get() { self.ready.set(false); - Ok(ctx.call_nowait(&self.service, req).await?) + Ok(self.service.call_nowait(req).await?) } else { let (tx, rx) = oneshot::channel(); self.buf.borrow_mut().push_back(tx); @@ -276,11 +284,8 @@ where BufferServiceError::RequestCanceled })?; - // check service readiness - ctx.ready(&self.service).await?; - // call service - Ok(ctx.call_nowait(&self.service, req).await?) + Ok(self.service.call(req).await?) } } } @@ -307,13 +312,16 @@ mod tests { type Response = (); type Error = (); - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - self.0.waker.register(cx.waker()); - if self.0.ready.get() { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } + async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> { + poll_fn(|cx| { + self.0.waker.register(cx.waker()); + if self.0.ready.get() { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } + }) + .await } async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> { @@ -331,7 +339,8 @@ mod tests { count: Cell::new(0), }); - let srv = Pipeline::new(BufferService::new(2, TestService(inner.clone())).clone()); + let srv = + Pipeline::new(BufferService::new(2, TestService(inner.clone())).clone()).bind(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); let srv1 = srv.clone(); @@ -370,7 +379,7 @@ mod tests { count: Cell::new(0), }); - let srv = Pipeline::new(BufferService::new(2, TestService(inner.clone()))); + let srv = Pipeline::new(BufferService::new(2, TestService(inner.clone()))).bind(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); let _ = srv.call(()).await; assert_eq!(inner.count.get(), 1); @@ -392,7 +401,7 @@ mod tests { fn_factory(|| async { Ok::<_, ()>(TestService(inner.clone())) }), ); - let srv = srv.pipeline(&()).await.unwrap(); + let srv = srv.pipeline(&()).await.unwrap().bind(); assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); let srv1 = srv.clone(); diff --git a/ntex-util/src/services/mod.rs b/ntex-util/src/services/mod.rs index fa575077..28974192 100644 --- a/ntex-util/src/services/mod.rs +++ b/ntex-util/src/services/mod.rs @@ -1,4 +1,4 @@ -// pub mod buffer; +pub mod buffer; pub mod counter; mod extensions; pub mod inflight;