Do not poll service for readiness if it failed before

This commit is contained in:
Nikolay Kim 2021-10-20 09:06:45 +06:00
parent c5932faae2
commit 784d35ce65
4 changed files with 101 additions and 18 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.4.5] - 2021-10-20
* framed: Do not poll service for readiness if it failed before
## [0.4.4] - 2021-10-13
* Use wrapping_add for usize

View file

@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.4.4"
version = "0.4.5"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"

View file

@ -321,7 +321,9 @@ where
// drain service responses
DispatcherState::Stop => {
// service may relay on poll_ready for response results
let _ = this.service.poll_ready(cx);
if !this.inner.state.is_dispatcher_ready_err() {
let _ = this.service.poll_ready(cx);
}
if slf.shared.inflight.get() == 0 {
slf.st.set(DispatcherState::Shutdown);
@ -453,6 +455,7 @@ where
self.st.set(DispatcherState::Stop);
self.error.set(Some(err));
self.unregister_keepalive();
self.state.dispatcher_ready_err();
Poll::Ready(PollService::ServiceError)
}
}
@ -514,7 +517,7 @@ mod tests {
use crate::codec::BytesCodec;
use crate::testing::Io;
use crate::time::{sleep, Millis};
use crate::util::Bytes;
use crate::util::{Bytes, Ready};
use super::*;
@ -593,6 +596,12 @@ mod tests {
let _ = disp.await;
});
sleep(Millis(25)).await;
client.write("GET /test HTTP/1\r\n\r\n");
let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"));
let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"));
@ -673,6 +682,60 @@ mod tests {
assert!(client.is_server_dropped());
}
#[crate::rt_test]
async fn test_err_in_service_ready() {
let (client, server) = Io::create();
client.remote_buffer_cap(0);
client.write("GET /test HTTP/1\r\n\r\n");
let counter = Rc::new(Cell::new(0));
struct Srv(Rc<Cell<usize>>);
impl Service for Srv {
type Request = DispatchItem<BytesCodec>;
type Response = Option<Response<BytesCodec>>;
type Error = ();
type Future = Ready<Option<Response<BytesCodec>>, ()>;
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), ()>> {
self.0.set(self.0.get() + 1);
Poll::Ready(Err(()))
}
fn call(&self, _: DispatchItem<BytesCodec>) -> Self::Future {
Ready::Ok(None)
}
}
let (disp, state) = Dispatcher::debug(server, BytesCodec, Srv(counter.clone()));
state
.write()
.encode(
Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"),
&mut BytesCodec,
)
.unwrap();
crate::rt::spawn(async move {
let _ = disp.await;
});
// buffer should be flushed
client.remote_buffer_cap(1024);
let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"));
// write side must be closed, dispatcher waiting for read side to close
assert!(client.is_closed());
// close read side
client.close().await;
assert!(client.is_server_dropped());
// service must be checked for readiness only once
assert_eq!(counter.get(), 1);
}
#[crate::rt_test]
async fn test_write_backpressure() {
let (client, server) = Io::create();

View file

@ -11,27 +11,31 @@ use crate::util::{poll_fn, Buf, BytesMut, Either};
bitflags::bitflags! {
pub struct Flags: u16 {
const DSP_STOP = 0b0000_0000_0001;
const DSP_KEEPALIVE = 0b0000_0000_0010;
/// io error occured
const IO_ERR = 0b0000_0000_0100;
const IO_ERR = 0b0000_0001;
/// stop io tasks
const IO_STOP = 0b0000_0000_1000;
const IO_STOP = 0b0000_0010;
/// shutdown io tasks
const IO_SHUTDOWN = 0b0000_0001_0000;
const IO_SHUTDOWN = 0b0000_0100;
/// pause io read
const RD_PAUSED = 0b0000_0010_0000;
const RD_PAUSED = 0b0000_1000;
/// new data is available
const RD_READY = 0b0000_0100_0000;
const RD_READY = 0b0001_0000;
/// read buffer is full
const RD_BUF_FULL = 0b0000_1000_0000;
const RD_BUF_FULL = 0b0010_0000;
/// write buffer is full
const WR_BACKPRESSURE = 0b0000_0001_0000_0000;
const WR_BACKPRESSURE = 0b0001_0000_0000;
const ST_DSP_ERR = 0b0001_0000_0000_0000;
/// dispatcher is marked stopped
const DSP_STOP = 0b0001_0000_0000_0000;
/// keep-alive timeout occured
const DSP_KEEPALIVE = 0b0010_0000_0000_0000;
/// dispatcher returned error
const DSP_ERR = 0b0100_0000_0000_0000;
/// dispatcher rediness error
const DSP_READY_ERR = 0b1000_0000_0000_0000;
}
}
@ -366,11 +370,17 @@ impl State {
}
#[inline]
/// Check is dispatcher marked stopped
/// Check if dispatcher marked stopped
pub fn is_dispatcher_stopped(&self) -> bool {
self.0.flags.get().contains(Flags::DSP_STOP)
}
#[inline]
/// Check if dispatcher failed readiness check
pub fn is_dispatcher_ready_err(&self) -> bool {
self.0.flags.get().contains(Flags::DSP_READY_ERR)
}
#[inline]
pub fn is_open(&self) -> bool {
!self
@ -461,6 +471,12 @@ impl State {
self.insert_flags(Flags::DSP_STOP);
}
#[inline]
/// Mark dispatcher as failed readiness check
pub fn dispatcher_ready_err(&self) {
self.insert_flags(Flags::DSP_READY_ERR);
}
#[inline]
/// Get api for read task
pub fn read(&'_ self) -> Read<'_> {
@ -858,7 +874,7 @@ impl<'a> Write<'a> {
{
let flags = self.0.flags.get();
if !flags.intersects(Flags::IO_ERR | Flags::ST_DSP_ERR) {
if !flags.intersects(Flags::IO_ERR | Flags::DSP_ERR) {
match item {
Ok(Some(item)) => {
let mut buf = self.0.get_write_buf();
@ -874,7 +890,7 @@ impl<'a> Write<'a> {
if let Err(err) = codec.encode(item, &mut buf) {
log::trace!("Encoder error: {:?}", err);
self.0.release_write_buf(buf);
self.0.insert_flags(Flags::DSP_STOP | Flags::ST_DSP_ERR);
self.0.insert_flags(Flags::DSP_STOP | Flags::DSP_ERR);
self.0.dispatch_task.wake();
return Err(Either::Right(err));
} else if is_write_sleep {
@ -885,7 +901,7 @@ impl<'a> Write<'a> {
result
}
Err(err) => {
self.0.insert_flags(Flags::DSP_STOP | Flags::ST_DSP_ERR);
self.0.insert_flags(Flags::DSP_STOP | Flags::DSP_ERR);
self.0.dispatch_task.wake();
Err(Either::Left(err))
}