Re-enable BufferService (#365)

This commit is contained in:
Nikolay Kim 2024-05-28 18:12:24 +05:00 committed by GitHub
parent b04bdf41f6
commit c52db3fd10
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 73 additions and 60 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [2.0.1] - 2024-05-28
* Re-enable BufferService
## [2.0.0] - 2024-05-28
* Use async fn for Service::ready() and Service::shutdown()

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-util"
version = "2.0.0"
version = "2.0.1"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for ntex framework"
keywords = ["network", "framework", "async", "futures"]
@ -18,8 +18,8 @@ path = "src/lib.rs"
[dependencies]
ntex-service = "3.0"
ntex-rt = "0.4"
bitflags = "2.4"
fxhash = "0.2.1"
bitflags = "2"
fxhash = "0.2"
log = "0.4"
slab = "0.4"
futures-timer = "3.0"

View file

@ -3,7 +3,7 @@ use std::cell::{Cell, RefCell};
use std::task::{ready, Poll};
use std::{collections::VecDeque, fmt, future::poll_fn, marker::PhantomData};
use ntex_service::{Middleware, Service, ServiceCtx};
use ntex_service::{Middleware, Pipeline, PipelineBinding, Service, ServiceCtx};
use crate::channel::oneshot;
@ -62,18 +62,19 @@ impl<R> Clone for Buffer<R> {
impl<R, S> Middleware<S> for Buffer<R>
where
S: Service<R>,
S: Service<R> + 'static,
R: 'static,
{
type Service = BufferService<R, S>;
fn create(&self, service: S) -> Self::Service {
BufferService {
service,
service: Pipeline::new(service).bind(),
size: self.buf_size,
cancel_on_shutdown: self.cancel_on_shutdown,
ready: Cell::new(false),
buf: RefCell::new(VecDeque::with_capacity(self.buf_size)),
next_call: RefCell::default(),
cancel_on_shutdown: self.cancel_on_shutdown,
_t: PhantomData,
}
}
@ -109,26 +110,27 @@ impl<E: std::fmt::Display + std::fmt::Debug> std::error::Error for BufferService
/// Default number of buffered requests is 16
pub struct BufferService<R, S: Service<R>> {
size: usize,
cancel_on_shutdown: bool,
ready: Cell<bool>,
service: S,
service: PipelineBinding<S, R>,
buf: RefCell<VecDeque<oneshot::Sender<oneshot::Sender<()>>>>,
next_call: RefCell<Option<oneshot::Receiver<()>>>,
cancel_on_shutdown: bool,
_t: PhantomData<R>,
}
impl<R, S> BufferService<R, S>
where
S: Service<R>,
S: Service<R> + 'static,
R: 'static,
{
pub fn new(size: usize, service: S) -> Self {
Self {
size,
service,
cancel_on_shutdown: false,
service: Pipeline::new(service).bind(),
ready: Cell::new(false),
buf: RefCell::new(VecDeque::with_capacity(size)),
next_call: RefCell::default(),
cancel_on_shutdown: false,
_t: PhantomData,
}
}
@ -148,11 +150,11 @@ where
fn clone(&self) -> Self {
Self {
size: self.size,
cancel_on_shutdown: self.cancel_on_shutdown,
ready: Cell::new(false),
service: self.service.clone(),
buf: RefCell::new(VecDeque::with_capacity(self.size)),
next_call: RefCell::default(),
cancel_on_shutdown: self.cancel_on_shutdown,
_t: PhantomData,
}
}
@ -176,45 +178,50 @@ where
impl<R, S> Service<R> for BufferService<R, S>
where
S: Service<R>,
S: Service<R> + 'static,
R: 'static,
{
type Response = S::Response;
type Error = BufferServiceError<S::Error>;
#[inline]
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
let mut buffer = self.buf.borrow_mut();
let mut next_call = self.next_call.borrow_mut();
if let Some(next_call) = &*next_call {
// hold advancement until the last released task either makes a call or is dropped
let _ = ready!(next_call.poll_recv(cx));
}
next_call.take();
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
poll_fn(|cx| {
let mut buffer = self.buf.borrow_mut();
let mut next_call = self.next_call.borrow_mut();
if let Some(next_call) = &*next_call {
// hold advancement until the last released task either makes a call or is dropped
let _ = ready!(next_call.poll_recv(cx));
}
next_call.take();
if self.service.poll_ready(cx)?.is_pending() {
if buffer.len() < self.size {
// buffer next request
if self.service.poll_ready(cx)?.is_pending() {
if buffer.len() < self.size {
// buffer next request
self.ready.set(false);
return Poll::Ready(Ok(()));
} else {
log::trace!("Buffer limit exceeded");
return Poll::Pending;
}
}
while let Some(sender) = buffer.pop_front() {
let (next_call_tx, next_call_rx) = oneshot::channel();
if sender.send(next_call_tx).is_err()
|| next_call_rx.poll_recv(cx).is_ready()
{
// the task is gone
continue;
}
next_call.replace(next_call_rx);
self.ready.set(false);
return Poll::Ready(Ok(()));
} else {
log::trace!("Buffer limit exceeded");
return Poll::Pending;
}
}
while let Some(sender) = buffer.pop_front() {
let (next_call_tx, next_call_rx) = oneshot::channel();
if sender.send(next_call_tx).is_err() || next_call_rx.poll_recv(cx).is_ready() {
// the task is gone
continue;
}
next_call.replace(next_call_rx);
self.ready.set(false);
return Poll::Ready(Ok(()));
}
self.ready.set(true);
Poll::Ready(Ok(()))
self.ready.set(true);
Poll::Ready(Ok(()))
})
.await
}
async fn shutdown(&self) {
@ -252,6 +259,7 @@ where
return Poll::Pending;
}
}
Poll::Ready(())
})
.await;
@ -261,11 +269,11 @@ where
async fn call(
&self,
req: R,
ctx: ServiceCtx<'_, Self>,
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
if self.ready.get() {
self.ready.set(false);
Ok(ctx.call_nowait(&self.service, req).await?)
Ok(self.service.call_nowait(req).await?)
} else {
let (tx, rx) = oneshot::channel();
self.buf.borrow_mut().push_back(tx);
@ -276,11 +284,8 @@ where
BufferServiceError::RequestCanceled
})?;
// check service readiness
ctx.ready(&self.service).await?;
// call service
Ok(ctx.call_nowait(&self.service, req).await?)
Ok(self.service.call(req).await?)
}
}
}
@ -307,13 +312,16 @@ mod tests {
type Response = ();
type Error = ();
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.waker.register(cx.waker());
if self.0.ready.get() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
poll_fn(|cx| {
self.0.waker.register(cx.waker());
if self.0.ready.get() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
})
.await
}
async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
@ -331,7 +339,8 @@ mod tests {
count: Cell::new(0),
});
let srv = Pipeline::new(BufferService::new(2, TestService(inner.clone())).clone());
let srv =
Pipeline::new(BufferService::new(2, TestService(inner.clone())).clone()).bind();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let srv1 = srv.clone();
@ -370,7 +379,7 @@ mod tests {
count: Cell::new(0),
});
let srv = Pipeline::new(BufferService::new(2, TestService(inner.clone())));
let srv = Pipeline::new(BufferService::new(2, TestService(inner.clone()))).bind();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let _ = srv.call(()).await;
assert_eq!(inner.count.get(), 1);
@ -392,7 +401,7 @@ mod tests {
fn_factory(|| async { Ok::<_, ()>(TestService(inner.clone())) }),
);
let srv = srv.pipeline(&()).await.unwrap();
let srv = srv.pipeline(&()).await.unwrap().bind();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let srv1 = srv.clone();

View file

@ -1,4 +1,4 @@
// pub mod buffer;
pub mod buffer;
pub mod counter;
mod extensions;
pub mod inflight;