mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-04 05:17:39 +03:00
Update buffer service
This commit is contained in:
parent
a30147120d
commit
bcf97a8fa4
1 changed files with 61 additions and 31 deletions
|
@ -1,6 +1,6 @@
|
|||
//! Service that buffers incomming requests.
|
||||
use std::cell::{Cell, RefCell};
|
||||
use std::task::{ready, Poll};
|
||||
use std::task::{ready, Poll, Waker};
|
||||
use std::{collections::VecDeque, fmt, future::poll_fn, marker::PhantomData};
|
||||
|
||||
use ntex_service::{Middleware, Pipeline, PipelineBinding, Service, ServiceCtx};
|
||||
|
@ -70,11 +70,13 @@ where
|
|||
fn create(&self, service: S) -> Self::Service {
|
||||
BufferService {
|
||||
service: Pipeline::new(service).bind(),
|
||||
service_pending: Cell::new(true),
|
||||
size: self.buf_size,
|
||||
ready: Cell::new(false),
|
||||
buf: RefCell::new(VecDeque::with_capacity(self.buf_size)),
|
||||
next_call: RefCell::default(),
|
||||
cancel_on_shutdown: self.cancel_on_shutdown,
|
||||
readiness: Cell::new(None),
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
|
@ -111,10 +113,12 @@ impl<E: std::fmt::Display + std::fmt::Debug> std::error::Error for BufferService
|
|||
pub struct BufferService<R, S: Service<R>> {
|
||||
size: usize,
|
||||
ready: Cell<bool>,
|
||||
service_pending: Cell<bool>,
|
||||
service: PipelineBinding<S, R>,
|
||||
buf: RefCell<VecDeque<oneshot::Sender<oneshot::Sender<()>>>>,
|
||||
next_call: RefCell<Option<oneshot::Receiver<()>>>,
|
||||
cancel_on_shutdown: bool,
|
||||
readiness: Cell<Option<Waker>>,
|
||||
_t: PhantomData<R>,
|
||||
}
|
||||
|
||||
|
@ -127,10 +131,12 @@ where
|
|||
Self {
|
||||
size,
|
||||
service: Pipeline::new(service).bind(),
|
||||
service_pending: Cell::new(true),
|
||||
ready: Cell::new(false),
|
||||
buf: RefCell::new(VecDeque::with_capacity(size)),
|
||||
next_call: RefCell::default(),
|
||||
cancel_on_shutdown: false,
|
||||
readiness: Cell::new(None),
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
|
@ -152,9 +158,11 @@ where
|
|||
size: self.size,
|
||||
ready: Cell::new(false),
|
||||
service: self.service.clone(),
|
||||
service_pending: Cell::new(false),
|
||||
buf: RefCell::new(VecDeque::with_capacity(self.size)),
|
||||
next_call: RefCell::default(),
|
||||
cancel_on_shutdown: self.cancel_on_shutdown,
|
||||
readiness: Cell::new(None),
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
|
@ -170,6 +178,7 @@ where
|
|||
.field("cancel_on_shutdown", &self.cancel_on_shutdown)
|
||||
.field("ready", &self.ready)
|
||||
.field("service", &self.service)
|
||||
.field("service_pending", &self.service_pending)
|
||||
.field("buf", &self.buf)
|
||||
.field("next_call", &self.next_call)
|
||||
.finish()
|
||||
|
@ -185,58 +194,79 @@ where
|
|||
type Error = BufferServiceError<S::Error>;
|
||||
|
||||
async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
|
||||
// hold advancement until the last released task either makes a call or is dropped
|
||||
let next_call = self.next_call.borrow_mut().take();
|
||||
if let Some(next_call) = next_call {
|
||||
let _ = next_call.recv().await;
|
||||
}
|
||||
|
||||
poll_fn(|cx| {
|
||||
let mut buffer = self.buf.borrow_mut();
|
||||
let mut next_call = self.next_call.borrow_mut();
|
||||
if let Some(next_call) = &*next_call {
|
||||
// hold advancement until the last released task either makes a call or is dropped
|
||||
let _ = ready!(next_call.poll_recv(cx));
|
||||
}
|
||||
next_call.take();
|
||||
|
||||
// handle inner service readiness
|
||||
if self.service.poll_ready(cx)?.is_pending() {
|
||||
if buffer.len() < self.size {
|
||||
// buffer next request
|
||||
self.ready.set(false);
|
||||
return Poll::Ready(Ok(()));
|
||||
self.service_pending.set(false);
|
||||
Poll::Ready(Ok(()))
|
||||
} else {
|
||||
log::trace!("Buffer limit exceeded");
|
||||
return Poll::Pending;
|
||||
// service is not ready
|
||||
self.service_pending.set(true);
|
||||
let _ = self.readiness.take().map(|w| w.wake());
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
} else {
|
||||
self.service_pending.set(false);
|
||||
|
||||
while let Some(sender) = buffer.pop_front() {
|
||||
let (next_call_tx, next_call_rx) = oneshot::channel();
|
||||
if sender.send(next_call_tx).is_err()
|
||||
|| next_call_rx.poll_recv(cx).is_ready()
|
||||
{
|
||||
// the task is gone
|
||||
continue;
|
||||
while let Some(sender) = buffer.pop_front() {
|
||||
let (next_call_tx, next_call_rx) = oneshot::channel();
|
||||
if sender.send(next_call_tx).is_err()
|
||||
|| next_call_rx.poll_recv(cx).is_ready()
|
||||
{
|
||||
// the task is gone
|
||||
continue;
|
||||
}
|
||||
self.next_call.borrow_mut().replace(next_call_rx);
|
||||
self.ready.set(false);
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
next_call.replace(next_call_rx);
|
||||
self.ready.set(false);
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
|
||||
self.ready.set(true);
|
||||
Poll::Ready(Ok(()))
|
||||
self.ready.set(true);
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn not_ready(&self) {
|
||||
let fut = poll_fn(|cx| {
|
||||
if self.service_pending.get() {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
self.readiness.set(Some(cx.waker().clone()));
|
||||
Poll::Pending
|
||||
}
|
||||
});
|
||||
|
||||
crate::future::select(fut, self.service.get_ref().not_ready()).await;
|
||||
}
|
||||
|
||||
async fn shutdown(&self) {
|
||||
// hold advancement until the last released task either makes a call or is dropped
|
||||
let next_call = self.next_call.borrow_mut().take();
|
||||
if let Some(next_call) = next_call {
|
||||
let _ = next_call.recv().await;
|
||||
}
|
||||
|
||||
poll_fn(|cx| {
|
||||
let mut buffer = self.buf.borrow_mut();
|
||||
if self.cancel_on_shutdown {
|
||||
buffer.clear();
|
||||
} else if !buffer.is_empty() {
|
||||
let mut next_call = self.next_call.borrow_mut();
|
||||
if let Some(next_call) = &*next_call {
|
||||
// hold advancement until the last released task either makes a call or is dropped
|
||||
let _ = ready!(next_call.poll_recv(cx));
|
||||
}
|
||||
next_call.take();
|
||||
}
|
||||
|
||||
if !buffer.is_empty() {
|
||||
if ready!(self.service.poll_ready(cx)).is_err() {
|
||||
log::error!(
|
||||
"Buffered inner service failed while buffer flushing on shutdown"
|
||||
|
@ -252,7 +282,7 @@ where
|
|||
// the task is gone
|
||||
continue;
|
||||
}
|
||||
next_call.replace(next_call_rx);
|
||||
self.next_call.borrow_mut().replace(next_call_rx);
|
||||
if buffer.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue