Fix double registation for accept back-pressure

This commit is contained in:
Nikolay Kim 2021-02-18 17:54:41 +06:00
parent 00df8452f3
commit aefa224542
6 changed files with 26 additions and 9 deletions

1
.gitignore vendored
View file

@ -3,6 +3,7 @@ target/
guide/build/
/gh-pages
cobertura.xml
.DS_Store
*.so
*.out

View file

@ -1,5 +1,9 @@
# Changes
## [0.2.0-b.12] - 2021-02-18
* Fix double registation for accept back-pressure
## [0.2.0-b.11] - 2021-02-02
* framed: fix wake write method dsp_restart_write_task

View file

@ -59,7 +59,6 @@ where
Poll::Pending
}
Err(err) => {
log::trace!("error during reading data: {:?}", err);
self.state.set_io_error(err);
Poll::Ready(())
}

View file

@ -125,7 +125,7 @@ impl State {
state.dispatch_task.wake();
}
pub(super) fn disconnect_timeout(&self) -> u16 {
pub(super) fn get_disconnect_timeout(&self) -> u16 {
self.0.disconnect_timeout.get()
}
@ -147,6 +147,13 @@ impl State {
self.0.flags.get()
}
#[inline]
/// Set disconnecto timeout
pub fn disconnect_timeout(self, timeout: u16) -> Self {
self.0.disconnect_timeout.set(timeout);
self
}
#[inline]
/// Set disconnecto timeout
pub fn set_disconnect_timeout(&self, timeout: u16) {

View file

@ -47,7 +47,7 @@ where
/// Shutdown io stream
pub fn shutdown(io: Rc<RefCell<T>>, state: State) -> Self {
let disconnect_timeout = state.disconnect_timeout() as u64;
let disconnect_timeout = state.get_disconnect_timeout() as u64;
let st = IoWriteState::Shutdown(
if disconnect_timeout != 0 {
Some(delay_for(Duration::from_millis(disconnect_timeout)))
@ -84,7 +84,7 @@ where
if this.state.is_io_shutdown() {
log::trace!("write task is instructed to shutdown");
let disconnect_timeout = this.state.disconnect_timeout() as u64;
let disconnect_timeout = this.state.get_disconnect_timeout() as u64;
this.st = IoWriteState::Shutdown(
if disconnect_timeout != 0 {
Some(delay_for(Duration::from_millis(disconnect_timeout)))
@ -202,11 +202,11 @@ where
T: AsyncRead + AsyncWrite + Unpin,
{
let len = buf.len();
log::trace!("flushing framed transport: {}", len);
if len != 0 {
let mut written = 0;
// log::trace!("flushing framed transport: {}", len);
let mut written = 0;
while written < len {
match Pin::new(&mut *io).poll_write(cx, &buf[written..]) {
Poll::Pending => break,
@ -227,7 +227,7 @@ where
}
}
}
log::trace!("flushed {} bytes", written);
// log::trace!("flushed {} bytes", written);
// remove written data
if written == len {

View file

@ -346,6 +346,10 @@ impl Accept {
if !on {
self.backpressure = false;
for (token, info) in self.sockets.iter() {
if info.timeout.is_some() {
// socket will re-register itself after timeout
continue;
}
if let Err(err) = self.poll.register(
&info.sock,
mio::Token(token + DELTA),
@ -361,8 +365,10 @@ impl Accept {
} else if on {
self.backpressure = true;
for (_, info) in self.sockets.iter() {
trace!("Enabling backpressure for {}", info.addr);
let _ = self.poll.deregister(&info.sock);
if info.timeout.is_none() {
trace!("Enabling backpressure for {}", info.addr);
let _ = self.poll.deregister(&info.sock);
}
}
}
}