diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index a6b34a9f..38ceaf50 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -6,6 +6,8 @@ * http: Add ClientResponse::header() method +* framed: Refactor write back-pressure support + ## [0.2.0] - 2021-02-21 * 0.2 release diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 16690a8a..067cc463 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "0.2.0" +version = "0.2.1" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" diff --git a/ntex/src/framed/dispatcher.rs b/ntex/src/framed/dispatcher.rs index 1df634ca..0983b27e 100644 --- a/ntex/src/framed/dispatcher.rs +++ b/ntex/src/framed/dispatcher.rs @@ -57,8 +57,7 @@ where #[derive(Copy, Clone, Debug)] enum DispatcherState { Processing, - WrEnabled, - WrWaitReady, + Backpressure, Stop, Shutdown, } @@ -216,78 +215,20 @@ where Poll::Ready(item) => { this.fut.set(None); slf.shared.inflight.set(slf.shared.inflight.get() - 1); - let _ = slf.handle_result(item, cx); + let _ = slf.handle_result(item); } } } loop { match slf.st.get() { - DispatcherState::WrEnabled => { - let item = match ready!(slf.poll_service(&this.service, cx)) { - PollService::Ready => { - slf.st.set(DispatcherState::WrWaitReady); - DispatchItem::WBackPressureEnabled - } - PollService::Item(item) => item, - PollService::ServiceError => continue, - }; - - // call service - if this.fut.is_none() { - // optimize first service call - this.fut.set(Some(this.service.call(item))); - match this.fut.as_mut().as_pin_mut().unwrap().poll(cx) { - Poll::Ready(res) => { - this.fut.set(None); - ready!(slf.handle_result(res, cx)); - } - Poll::Pending => { - slf.shared.inflight.set(slf.shared.inflight.get() + 1) - } - } - } else { - slf.spawn_service_call(this.service.call(item)); - } - } - DispatcherState::WrWaitReady => { - let item = match ready!(slf.poll_service(&this.service, cx)) { - PollService::Ready => { - if state.is_write_backpressure_disabled() { - slf.st.set(DispatcherState::Processing); - DispatchItem::WBackPressureDisabled - } else { - return Poll::Pending; - } - } - PollService::Item(item) => item, - PollService::ServiceError => continue, - }; - - // call service - if this.fut.is_none() { - // optimize first service call - this.fut.set(Some(this.service.call(item))); - match this.fut.as_mut().as_pin_mut().unwrap().poll(cx) { - Poll::Ready(res) => { - this.fut.set(None); - ready!(slf.handle_result(res, cx)); - } - Poll::Pending => { - slf.shared.inflight.set(slf.shared.inflight.get() + 1) - } - } - } else { - slf.spawn_service_call(this.service.call(item)); - } - } DispatcherState::Processing => { let item = match ready!(slf.poll_service(&this.service, cx)) { PollService::Ready => { - if state.is_write_backpressure_enabled() { + if !state.is_write_ready() { // instruct write task to notify dispatcher when data is flushed state.dsp_enable_write_backpressure(cx.waker()); - slf.st.set(DispatcherState::WrWaitReady); + slf.st.set(DispatcherState::Backpressure); DispatchItem::WBackPressureEnabled } else if state.is_read_ready() { // decode incoming bytes if buffer is ready @@ -324,7 +265,39 @@ where match this.fut.as_mut().as_pin_mut().unwrap().poll(cx) { Poll::Ready(res) => { this.fut.set(None); - ready!(slf.handle_result(res, cx)); + ready!(slf.handle_result(res)); + } + Poll::Pending => { + slf.shared.inflight.set(slf.shared.inflight.get() + 1) + } + } + } else { + slf.spawn_service_call(this.service.call(item)); + } + } + // handle write back-pressure + DispatcherState::Backpressure => { + let item = match ready!(slf.poll_service(&this.service, cx)) { + PollService::Ready => { + if state.is_write_ready() { + slf.st.set(DispatcherState::Processing); + DispatchItem::WBackPressureDisabled + } else { + return Poll::Pending; + } + } + PollService::Item(item) => item, + PollService::ServiceError => continue, + }; + + // call service + if this.fut.is_none() { + // optimize first service call + this.fut.set(Some(this.service.call(item))); + match this.fut.as_mut().as_pin_mut().unwrap().poll(cx) { + Poll::Ready(res) => { + this.fut.set(None); + ready!(slf.handle_result(res)); } Poll::Pending => { slf.shared.inflight.set(slf.shared.inflight.get() + 1) @@ -386,14 +359,12 @@ where fn handle_result( &self, item: Result::Item>, S::Error>, - cx: &mut Context<'_>, ) -> Poll<()> { match self.state.write_result(item, &self.shared.codec) { Ok(true) => (), Ok(false) => { // instruct write task to notify dispatcher when data is flushed - self.state.dsp_enable_write_backpressure(cx.waker()); - self.st.set(DispatcherState::WrEnabled); + self.state.enable_write_backpressure(); return Poll::Pending; } Err(Either::Left(err)) => { @@ -515,6 +486,8 @@ where mod tests { use bytes::Bytes; use futures::future::FutureExt; + use rand::Rng; + use std::sync::{Arc, Mutex}; use crate::codec::BytesCodec; use crate::rt::time::delay_for; @@ -540,7 +513,7 @@ mod tests { T: AsyncRead + AsyncWrite + Unpin + 'static, { let timer = Timer::default(); - let ka_timeout = 30; + let ka_timeout = 1; let ka_updated = timer.now(); let state = State::new(); let io = Rc::new(RefCell::new(io)); @@ -550,6 +523,9 @@ mod tests { inflight: Cell::new(0), }); + let expire = ka_updated + Duration::from_millis(500); + timer.register(expire, expire, &state); + crate::rt::spawn(ReadTask::new(io.clone(), state.clone())); crate::rt::spawn(WriteTask::new(io.clone(), state.clone())); @@ -670,4 +646,117 @@ mod tests { client.close().await; assert!(client.is_server_dropped()); } + + #[ntex_rt::test] + async fn test_write_backpressure() { + let (client, server) = Io::create(); + // do not allow to write to socket + client.remote_buffer_cap(0); + client.write("GET /test HTTP/1\r\n\r\n"); + + let data = Arc::new(Mutex::new(RefCell::new(Vec::new()))); + let data2 = data.clone(); + + let (disp, state) = Dispatcher::debug( + server, + BytesCodec, + crate::fn_service(move |msg: DispatchItem| { + let data = data2.clone(); + async move { + match msg { + DispatchItem::Item(_) => { + data.lock().unwrap().borrow_mut().push(0); + let bytes = rand::thread_rng() + .sample_iter(&rand::distributions::Alphanumeric) + .take(65_536) + .map(char::from) + .collect::(); + return Ok::<_, ()>(Some(Bytes::from(bytes))); + } + DispatchItem::WBackPressureEnabled => { + data.lock().unwrap().borrow_mut().push(1); + } + DispatchItem::WBackPressureDisabled => { + data.lock().unwrap().borrow_mut().push(2); + } + _ => (), + } + Ok(None) + } + }), + ); + crate::rt::spawn(disp.map(|_| ())); + + let buf = client.read_any(); + assert_eq!(buf, Bytes::from_static(b"")); + client.write("GET /test HTTP/1\r\n\r\n"); + delay_for(Duration::from_millis(25)).await; + + // buf must be consumed + assert_eq!(client.remote_buffer(|buf| buf.len()), 0); + + // response message + assert!(!state.is_write_ready()); + assert_eq!(state.with_write_buf(|buf| buf.len()), 65536); + + client.remote_buffer_cap(10240); + delay_for(Duration::from_millis(50)).await; + assert_eq!(state.with_write_buf(|buf| buf.len()), 55296); + + client.remote_buffer_cap(45056); + delay_for(Duration::from_millis(50)).await; + assert_eq!(state.with_write_buf(|buf| buf.len()), 10240); + + // backpressure disabled + assert!(state.is_write_ready()); + assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 1, 2]); + } + + #[ntex_rt::test] + async fn test_keepalive() { + env_logger::init(); + let (client, server) = Io::create(); + // do not allow to write to socket + client.remote_buffer_cap(1024); + client.write("GET /test HTTP/1\r\n\r\n"); + + let data = Arc::new(Mutex::new(RefCell::new(Vec::new()))); + let data2 = data.clone(); + + let (disp, state) = Dispatcher::debug( + server, + BytesCodec, + crate::fn_service(move |msg: DispatchItem| { + let data = data2.clone(); + async move { + match msg { + DispatchItem::Item(bytes) => { + data.lock().unwrap().borrow_mut().push(0); + return Ok::<_, ()>(Some(bytes.freeze())); + } + DispatchItem::KeepAliveTimeout => { + data.lock().unwrap().borrow_mut().push(1); + } + _ => (), + } + Ok(None) + } + }), + ); + crate::rt::spawn(disp.map(|_| ())); + + let state = state.disconnect_timeout(1); + + let buf = client.read().await.unwrap(); + assert_eq!(buf, Bytes::from_static(b"GET /test HTTP/1\r\n\r\n")); + delay_for(Duration::from_millis(3100)).await; + + // write side must be closed, dispatcher should fail with keep-alive + let flags = state.flags(); + assert!(state.is_io_err()); + assert!(state.is_io_shutdown()); + assert!(flags.contains(crate::framed::state::Flags::IO_SHUTDOWN)); + assert!(client.is_closed()); + assert_eq!(&data.lock().unwrap().borrow()[..], &[0, 1]); + } } diff --git a/ntex/src/framed/state.rs b/ntex/src/framed/state.rs index 98a48f2b..3c227dd7 100644 --- a/ntex/src/framed/state.rs +++ b/ntex/src/framed/state.rs @@ -33,12 +33,10 @@ bitflags::bitflags! { /// read buffer is full const RD_BUF_FULL = 0b0000_1000_0000; - /// write task is ready - const WR_READY = 0b0001_0000_0000; /// write buffer is full - const WR_NOT_READY = 0b0010_0000_0000; + const WR_BACKPRESSURE = 0b0000_0001_0000_0000; - const ST_DSP_ERR = 0b0001_0000_0000_0000; + const ST_DSP_ERR = 0b0001_0000_0000_0000; } } @@ -205,40 +203,20 @@ impl State { /// read task must be paused if service is not ready (RD_PAUSED) pub(super) fn is_read_paused(&self) -> bool { - self.0.flags.get().intersects(Flags::RD_PAUSED) + self.0.flags.get().contains(Flags::RD_PAUSED) } #[inline] - /// Check if write back-pressure is disabled - pub fn is_write_backpressure_disabled(&self) -> bool { - let mut flags = self.0.flags.get(); - if flags.contains(Flags::WR_READY) { - flags.remove(Flags::WR_READY); - self.0.flags.set(flags); - true - } else { - false - } - } - - #[inline] - /// Check if write back-pressure is enabled - pub fn is_write_backpressure_enabled(&self) -> bool { - let mut flags = self.0.flags.get(); - if flags.contains(Flags::WR_READY) { - flags.remove(Flags::WR_READY); - self.0.flags.set(flags); - true - } else { - false - } + /// Check if write task is ready + pub fn is_write_ready(&self) -> bool { + !self.0.flags.get().contains(Flags::WR_BACKPRESSURE) } #[inline] /// Enable write back-persurre pub fn enable_write_backpressure(&self) { log::trace!("enable write back-pressure"); - self.insert_flags(Flags::WR_NOT_READY) + self.insert_flags(Flags::WR_BACKPRESSURE); } #[inline] @@ -335,13 +313,16 @@ impl State { self.0.read_task.register(waker); } - pub(super) fn update_write_task(&self) { - let mut flags = self.0.flags.get(); - if flags.contains(Flags::WR_NOT_READY) { - flags.remove(Flags::WR_NOT_READY); - flags.insert(Flags::WR_READY); - self.0.flags.set(flags); - self.0.dispatch_task.wake(); + pub(super) fn update_write_task(&self, ready: bool) { + if ready { + let mut flags = self.0.flags.get(); + if flags.contains(Flags::WR_BACKPRESSURE) { + flags.remove(Flags::WR_BACKPRESSURE); + self.0.flags.set(flags); + self.0.dispatch_task.wake(); + } + } else { + self.insert_flags(Flags::WR_BACKPRESSURE); } } @@ -383,7 +364,7 @@ impl State { /// /// Write task must be waken up separately. pub fn dsp_enable_write_backpressure(&self, waker: &Waker) { - self.insert_flags(Flags::WR_NOT_READY); + self.insert_flags(Flags::WR_BACKPRESSURE); self.0.dispatch_task.register(waker); } diff --git a/ntex/src/framed/write.rs b/ntex/src/framed/write.rs index 5c5cba4c..b84f1505 100644 --- a/ntex/src/framed/write.rs +++ b/ntex/src/framed/write.rs @@ -103,9 +103,7 @@ where match result { Poll::Ready(Ok(_)) | Poll::Pending => { - if len < HW { - this.state.update_write_task() - } + this.state.update_write_task(len < HW) } Poll::Ready(Err(err)) => { log::trace!("error during sending data: {:?}", err);