diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index 2a673f9e..124b5693 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -4,6 +4,8 @@ * Remove slow frame timer if service is not ready +* Do not process data in Dispatcher from read buffer after disconnect + ## [0.3.8] - 2023-11-17 * Remove useless logs diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index a3a50e98..1d362c2b 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -324,49 +324,58 @@ where let item = match ready!(slf.poll_service(cx)) { PollService::Ready => { // decode incoming bytes if buffer is ready - match slf.shared.io.poll_recv_decode(&slf.shared.codec, cx) { - Ok(decoded) => { - slf.update_timer(&decoded); - if let Some(el) = decoded.item { - DispatchItem::Item(el) - } else { - return Poll::Pending; + if slf.shared.io.is_closed() { + log::trace!("io has been closed, stop dispatcher"); + slf.st = DispatcherState::Stop; + DispatchItem::Disconnect(None) + } else { + match slf.shared.io.poll_recv_decode(&slf.shared.codec, cx) + { + Ok(decoded) => { + slf.update_timer(&decoded); + if let Some(el) = decoded.item { + DispatchItem::Item(el) + } else { + return Poll::Pending; + } } - } - Err(RecvError::KeepAlive) => { - log::trace!("keep-alive error, stopping dispatcher"); - slf.st = DispatcherState::Stop; - if slf.flags.contains(Flags::READ_TIMEOUT) { - DispatchItem::ReadTimeout - } else { - DispatchItem::KeepAliveTimeout + Err(RecvError::KeepAlive) => { + log::trace!( + "keep-alive error, stopping dispatcher" + ); + slf.st = DispatcherState::Stop; + if slf.flags.contains(Flags::READ_TIMEOUT) { + DispatchItem::ReadTimeout + } else { + DispatchItem::KeepAliveTimeout + } + } + Err(RecvError::Stop) => { + log::trace!("dispatcher is instructed to stop"); + slf.st = DispatcherState::Stop; + continue; + } + Err(RecvError::WriteBackpressure) => { + // instruct write task to notify dispatcher when data is flushed + slf.st = DispatcherState::Backpressure; + DispatchItem::WBackPressureEnabled + } + Err(RecvError::Decoder(err)) => { + log::trace!( + "decoder error, stopping dispatcher: {:?}", + err + ); + slf.st = DispatcherState::Stop; + DispatchItem::DecoderError(err) + } + Err(RecvError::PeerGone(err)) => { + log::trace!( + "peer is gone, stopping dispatcher: {:?}", + err + ); + slf.st = DispatcherState::Stop; + DispatchItem::Disconnect(err) } - } - Err(RecvError::Stop) => { - log::trace!("dispatcher is instructed to stop"); - slf.st = DispatcherState::Stop; - continue; - } - Err(RecvError::WriteBackpressure) => { - // instruct write task to notify dispatcher when data is flushed - slf.st = DispatcherState::Backpressure; - DispatchItem::WBackPressureEnabled - } - Err(RecvError::Decoder(err)) => { - log::trace!( - "decoder error, stopping dispatcher: {:?}", - err - ); - slf.st = DispatcherState::Stop; - DispatchItem::DecoderError(err) - } - Err(RecvError::PeerGone(err)) => { - log::trace!( - "peer is gone, stopping dispatcher: {:?}", - err - ); - slf.st = DispatcherState::Stop; - DispatchItem::Disconnect(err) } } } @@ -492,8 +501,8 @@ where // remove all timers if self.flags.contains(Flags::READ_TIMEOUT) { self.flags.remove(Flags::READ_TIMEOUT); - self.shared.io.stop_timer(); } + self.shared.io.stop_timer(); match ready!(self.shared.io.poll_read_pause(cx)) { IoStatusUpdate::KeepAlive => {