diff --git a/.gitignore b/.gitignore index d48fb085..2c5156f8 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ target/ guide/build/ /gh-pages cobertura.xml +.DS_Store *.so *.out diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index 63aff9b3..411a79c8 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.2.0-b.12] - 2021-02-18 + +* Fix double registation for accept back-pressure + ## [0.2.0-b.11] - 2021-02-02 * framed: fix wake write method dsp_restart_write_task diff --git a/ntex/src/framed/read.rs b/ntex/src/framed/read.rs index 95e05a75..79a0e204 100644 --- a/ntex/src/framed/read.rs +++ b/ntex/src/framed/read.rs @@ -59,7 +59,6 @@ where Poll::Pending } Err(err) => { - log::trace!("error during reading data: {:?}", err); self.state.set_io_error(err); Poll::Ready(()) } diff --git a/ntex/src/framed/state.rs b/ntex/src/framed/state.rs index da347ce5..56f2da6a 100644 --- a/ntex/src/framed/state.rs +++ b/ntex/src/framed/state.rs @@ -125,7 +125,7 @@ impl State { state.dispatch_task.wake(); } - pub(super) fn disconnect_timeout(&self) -> u16 { + pub(super) fn get_disconnect_timeout(&self) -> u16 { self.0.disconnect_timeout.get() } @@ -147,6 +147,13 @@ impl State { self.0.flags.get() } + #[inline] + /// Set disconnecto timeout + pub fn disconnect_timeout(self, timeout: u16) -> Self { + self.0.disconnect_timeout.set(timeout); + self + } + #[inline] /// Set disconnecto timeout pub fn set_disconnect_timeout(&self, timeout: u16) { diff --git a/ntex/src/framed/write.rs b/ntex/src/framed/write.rs index c2b7ddaa..9a39484a 100644 --- a/ntex/src/framed/write.rs +++ b/ntex/src/framed/write.rs @@ -47,7 +47,7 @@ where /// Shutdown io stream pub fn shutdown(io: Rc>, state: State) -> Self { - let disconnect_timeout = state.disconnect_timeout() as u64; + let disconnect_timeout = state.get_disconnect_timeout() as u64; let st = IoWriteState::Shutdown( if disconnect_timeout != 0 { Some(delay_for(Duration::from_millis(disconnect_timeout))) @@ -84,7 +84,7 @@ where if this.state.is_io_shutdown() { log::trace!("write task is instructed to shutdown"); - let disconnect_timeout = this.state.disconnect_timeout() as u64; + let disconnect_timeout = this.state.get_disconnect_timeout() as u64; this.st = IoWriteState::Shutdown( if disconnect_timeout != 0 { Some(delay_for(Duration::from_millis(disconnect_timeout))) @@ -202,11 +202,11 @@ where T: AsyncRead + AsyncWrite + Unpin, { let len = buf.len(); - log::trace!("flushing framed transport: {}", len); if len != 0 { - let mut written = 0; + // log::trace!("flushing framed transport: {}", len); + let mut written = 0; while written < len { match Pin::new(&mut *io).poll_write(cx, &buf[written..]) { Poll::Pending => break, @@ -227,7 +227,7 @@ where } } } - log::trace!("flushed {} bytes", written); + // log::trace!("flushed {} bytes", written); // remove written data if written == len { diff --git a/ntex/src/server/accept.rs b/ntex/src/server/accept.rs index 0e503957..04a94a18 100644 --- a/ntex/src/server/accept.rs +++ b/ntex/src/server/accept.rs @@ -346,6 +346,10 @@ impl Accept { if !on { self.backpressure = false; for (token, info) in self.sockets.iter() { + if info.timeout.is_some() { + // socket will re-register itself after timeout + continue; + } if let Err(err) = self.poll.register( &info.sock, mio::Token(token + DELTA), @@ -361,8 +365,10 @@ impl Accept { } else if on { self.backpressure = true; for (_, info) in self.sockets.iter() { - trace!("Enabling backpressure for {}", info.addr); - let _ = self.poll.deregister(&info.sock); + if info.timeout.is_none() { + trace!("Enabling backpressure for {}", info.addr); + let _ = self.poll.deregister(&info.sock); + } } } }