prevent uneeded wakeups

This commit is contained in:
Nikolay Kim 2021-01-27 21:26:08 +06:00
parent a6cc3e3721
commit 7eaebd4871
3 changed files with 60 additions and 27 deletions

View file

@ -1,12 +1,8 @@
use std::collections::VecDeque;
use std::future::Future;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{collections::VecDeque, io, net::SocketAddr, pin::Pin};
use either::Either;
use futures::future::{ok, FutureExt, LocalBoxFuture, Ready};
use futures::future::{ok, Future, FutureExt, LocalBoxFuture, Ready};
use crate::rt::net::TcpStream;
use crate::service::{Service, ServiceFactory};

View file

@ -49,7 +49,12 @@ where
let mut io = self.io.borrow_mut();
let result = self.state.with_read_buf(|buf| read(&mut *io, buf, cx));
match result {
Ok(updated) => {
Ok(None) => {
self.state.enable_read_backpressure();
self.state.update_read_task(true, cx.waker());
Poll::Pending
}
Ok(Some(updated)) => {
self.state.update_read_task(updated, cx.waker());
Poll::Pending
}
@ -67,7 +72,7 @@ pub(super) fn read<T>(
io: &mut T,
buf: &mut BytesMut,
cx: &mut Context<'_>,
) -> Result<bool, Option<io::Error>>
) -> Result<Option<bool>, Option<io::Error>>
where
T: AsyncRead + AsyncWrite + Unpin,
{
@ -87,12 +92,12 @@ where
log::trace!("io stream is disconnected");
return Err(None);
} else {
updated = true;
if buf.len() > HW {
log::trace!("buffer is too large {}, pause", buf.len());
break;
return Ok(None);
}
updated = true;
}
}
Poll::Ready(Err(err)) => {
@ -102,5 +107,5 @@ where
}
}
Ok(updated)
Ok(Some(updated))
}

View file

@ -11,6 +11,7 @@ use crate::framed::write::flush;
use crate::task::LocalWaker;
const HW: usize = 16 * 1024;
const READ_HW: usize = 8 * 1024;
bitflags::bitflags! {
pub struct Flags: u16 {
@ -28,6 +29,8 @@ bitflags::bitflags! {
const RD_PAUSED = 0b0000_0010_0000;
/// new data is available
const RD_READY = 0b0000_0100_0000;
/// read buffer is full
const RD_BUF_FULL = 0b0000_1000_0000;
/// write task is ready
const WR_READY = 0b0001_0000_0000;
@ -72,6 +75,7 @@ impl hash::Hash for State {
}
impl State {
#[inline]
/// Create `State` instance
pub fn new() -> Self {
State(Rc::new(IoStateInner {
@ -86,6 +90,7 @@ impl State {
}))
}
#[inline]
/// Create `State` from Framed
pub fn from_framed<Io, U>(framed: Framed<Io, U>) -> (Io, U, Self) {
let parts = framed.into_parts();
@ -103,6 +108,7 @@ impl State {
(parts.io, parts.codec, state)
}
#[inline]
/// Convert state to a Framed instance
pub fn into_framed<Io, U>(self, io: Io, codec: U) -> Framed<Io, U> {
let mut parts = FramedParts::new(io, codec);
@ -165,6 +171,12 @@ impl State {
self.0.write_buf.borrow().len() >= HW
}
#[inline]
/// Check if read buff is full
pub fn is_read_buf_full(&self) -> bool {
self.0.read_buf.borrow().len() >= READ_HW
}
#[inline]
/// Check if read buffer has new data
pub fn is_read_ready(&self) -> bool {
@ -205,11 +217,21 @@ impl State {
#[inline]
/// Enable write back-persurre
pub fn enable_write_backpressure(&self) {
log::trace!("enable write back-pressure");
let mut flags = self.0.flags.get();
flags.insert(Flags::WR_NOT_READY);
self.0.flags.set(flags);
}
#[inline]
/// Enable read back-persurre
pub(crate) fn enable_read_backpressure(&self) {
log::trace!("enable read back-pressure");
let mut flags = self.0.flags.get();
flags.insert(Flags::RD_BUF_FULL);
self.0.flags.set(flags);
}
#[inline]
/// Check if keep-alive timeout occured
pub fn is_keepalive(&self) -> bool {
@ -251,6 +273,7 @@ impl State {
#[inline]
/// Gracefully shutdown all tasks
pub fn shutdown(&self) {
log::trace!("shutdown framed state");
let mut flags = self.0.flags.get();
flags.insert(Flags::DSP_STOP | Flags::IO_SHUTDOWN);
self.0.flags.set(flags);
@ -262,20 +285,23 @@ impl State {
#[inline]
/// Gracefully shutdown read and write io tasks
pub fn shutdown_io(&self) {
log::trace!("initiate io shutdown {:?}", self.0.flags.get());
let mut flags = self.0.flags.get();
if !flags.intersects(Flags::IO_ERR | Flags::IO_SHUTDOWN) {
log::trace!("initiate io shutdown {:?}", flags);
flags.insert(Flags::IO_SHUTDOWN);
self.0.flags.set(flags);
self.0.read_task.wake();
self.0.write_task.wake();
}
}
pub(crate) fn set_io_error(&self, err: Option<io::Error>) {
let mut flags = self.0.flags.get();
self.0.error.set(err);
self.0.read_task.wake();
self.0.write_task.wake();
self.0.dispatch_task.wake();
let mut flags = self.0.flags.get();
flags.insert(Flags::IO_ERR | Flags::DSP_STOP);
self.0.flags.set(flags);
}
@ -334,12 +360,19 @@ impl State {
#[inline]
/// Wake read io task if it is not ready
///
/// Only wakes if back-pressure is enabled on read task
/// otherwise read is already awake.
pub fn dsp_read_more_data(&self, waker: &Waker) {
let mut flags = self.0.flags.get();
flags.remove(Flags::RD_READY);
self.0.flags.set(flags);
self.0.read_task.wake();
self.0.dispatch_task.register(waker);
if flags.contains(Flags::RD_BUF_FULL) {
log::trace!("read back-pressure is enabled, wake io task");
flags.remove(Flags::RD_BUF_FULL);
self.0.read_task.wake();
}
self.0.flags.set(flags);
}
#[inline]
@ -557,13 +590,12 @@ impl State {
let is_write_sleep = write_buf.is_empty();
// encode item and wake write task
let res = codec
.encode(item, &mut *write_buf)
.map(|_| write_buf.len() < HW);
if res.is_ok() && is_write_sleep {
codec.encode(item, &mut *write_buf).map(|_| {
if is_write_sleep {
self.0.write_task.wake();
}
res
write_buf.len() < HW
})
} else {
Ok(true)
}