cleanups RD_PAUSED state

This commit is contained in:
Nikolay Kim 2021-12-22 15:43:29 +06:00
parent 8bbbfde22d
commit 73c5a5faac
5 changed files with 33 additions and 38 deletions

View file

@ -12,6 +12,8 @@
* Rename .write_ready() to .flush()
* .poll_read_ready() cleanups RD_PAUSED state
## [0.1.0-b.2] - 2021-12-20
* Removed `WriteRef` and `ReadRef`

View file

@ -28,7 +28,7 @@ tokio = ["tok-io/net", "tok-io/rt"]
bitflags = "1.3"
ntex-codec = "0.6.0"
ntex-bytes = "0.1.8"
ntex-util = "0.1.3"
ntex-util = "0.1.4"
ntex-service = "0.2.1"
fxhash = "0.2.1"
log = "0.4"

View file

@ -220,29 +220,22 @@ where
}
Poll::Ready(()) => {
// decode incoming bytes if buffer is ready
match io.poll_recv(&slf.shared.codec, cx) {
Poll::Ready(Ok(Some(el))) => {
match ready!(io.poll_recv(&slf.shared.codec, cx)) {
Ok(Some(el)) => {
slf.update_keepalive();
DispatchItem::Item(el)
}
Poll::Ready(Err(Either::Left(err))) => {
Err(Either::Left(err)) => {
slf.st.set(DispatcherState::Stop);
slf.unregister_keepalive();
DispatchItem::DecoderError(err)
}
Poll::Ready(Err(Either::Right(err))) => {
Err(Either::Right(err)) => {
slf.st.set(DispatcherState::Stop);
slf.unregister_keepalive();
DispatchItem::Disconnect(Some(err))
}
Poll::Ready(Ok(None)) => {
DispatchItem::Disconnect(None)
}
Poll::Pending => {
log::trace!("not enough data to decode next frame, register dispatch task");
io.resume();
return Poll::Pending;
}
Ok(None) => DispatchItem::Disconnect(None),
}
}
}

View file

@ -35,16 +35,17 @@ impl Filter for Base {
#[inline]
fn want_read(&self) {
todo!()
let flags = self.0.flags();
if flags.intersects(Flags::RD_PAUSED | Flags::RD_BUF_FULL) {
self.0
.0
.remove_flags(Flags::RD_PAUSED | Flags::RD_BUF_FULL);
self.0 .0.read_task.wake();
}
}
#[inline]
fn want_shutdown(&self) {
todo!()
}
#[inline]
fn poll_shutdown(&self) -> Poll<io::Result<()>> {
let mut flags = self.0.flags();
if !flags.intersects(Flags::IO_ERR | Flags::IO_SHUTDOWN) {
flags.insert(Flags::IO_SHUTDOWN);
@ -52,7 +53,11 @@ impl Filter for Base {
self.0 .0.read_task.wake();
self.0 .0.write_task.wake();
}
}
#[inline]
fn poll_shutdown(&self) -> Poll<io::Result<()>> {
self.want_shutdown();
Poll::Ready(Ok(()))
}
@ -104,19 +109,16 @@ impl Filter for Base {
#[inline]
fn release_read_buf(&self, buf: BytesMut, nbytes: usize) -> Result<(), io::Error> {
let mut flags = self.0.flags();
if nbytes > 0 {
if buf.len() > self.0.memory_pool().read_params().high as usize {
log::trace!(
"buffer is too large {}, enable read back-pressure",
buf.len()
);
flags.insert(Flags::RD_READY | Flags::RD_BUF_FULL);
self.0 .0.insert_flags(Flags::RD_READY | Flags::RD_BUF_FULL);
} else {
flags.insert(Flags::RD_READY);
self.0 .0.insert_flags(Flags::RD_READY);
}
self.0.set_flags(flags);
}
self.0 .0.read_buf.set(Some(buf));

View file

@ -429,15 +429,6 @@ impl<F> Io<F> {
self.0 .0.insert_flags(Flags::RD_PAUSED);
}
#[inline]
/// Wake read io ask if it is paused
pub fn resume(&self) {
if self.flags().contains(Flags::RD_PAUSED) {
self.0 .0.remove_flags(Flags::RD_PAUSED);
self.0 .0.read_task.wake();
}
}
#[inline]
/// Encode item, send to a peer
pub async fn send<U>(
@ -501,9 +492,13 @@ impl<F> Io<F> {
let mut flags = self.0 .0.flags.get();
let ready = flags.contains(Flags::RD_READY);
if flags.contains(Flags::RD_BUF_FULL) {
log::trace!("read back-pressure is disabled, wake io task");
flags.remove(Flags::RD_READY | Flags::RD_BUF_FULL);
if flags.intersects(Flags::RD_BUF_FULL | Flags::RD_PAUSED) {
if flags.intersects(Flags::RD_BUF_FULL) {
log::trace!("read back-pressure is disabled, wake io task");
} else {
log::trace!("read task is resumed, wake io task");
}
flags.remove(Flags::RD_READY | Flags::RD_BUF_FULL | Flags::RD_PAUSED);
self.0 .0.read_task.wake();
self.0 .0.flags.set(flags);
if ready {
@ -514,8 +509,8 @@ impl<F> Io<F> {
} else if ready {
log::trace!("waking up io read task");
flags.remove(Flags::RD_READY);
self.0 .0.flags.set(flags);
self.0 .0.read_task.wake();
self.0 .0.flags.set(flags);
Poll::Ready(Ok(Some(())))
} else {
Poll::Pending
@ -539,7 +534,10 @@ impl<F> Io<F> {
match self.decode(codec) {
Ok(Some(el)) => Poll::Ready(Ok(Some(el))),
Ok(None) => match self.poll_read_ready(cx) {
Poll::Pending | Poll::Ready(Ok(Some(()))) => Poll::Pending,
Poll::Pending | Poll::Ready(Ok(Some(()))) => {
log::trace!("not enough data to decode next frame");
Poll::Pending
}
Poll::Ready(Err(e)) => Poll::Ready(Err(Either::Right(e))),
Poll::Ready(Ok(None)) => Poll::Ready(Ok(None)),
},