Do not process data in Dispatcher from read buffer after disconnect

This commit is contained in:
Nikolay Kim 2023-11-21 00:56:09 +06:00
parent 9db4f21d71
commit 096aae2732
2 changed files with 53 additions and 42 deletions

View file

@ -4,6 +4,8 @@
* Remove slow frame timer if service is not ready
* Do not process data in Dispatcher from read buffer after disconnect
## [0.3.8] - 2023-11-17
* Remove useless logs

View file

@ -324,49 +324,58 @@ where
let item = match ready!(slf.poll_service(cx)) {
PollService::Ready => {
// decode incoming bytes if buffer is ready
match slf.shared.io.poll_recv_decode(&slf.shared.codec, cx) {
Ok(decoded) => {
slf.update_timer(&decoded);
if let Some(el) = decoded.item {
DispatchItem::Item(el)
} else {
return Poll::Pending;
if slf.shared.io.is_closed() {
log::trace!("io has been closed, stop dispatcher");
slf.st = DispatcherState::Stop;
DispatchItem::Disconnect(None)
} else {
match slf.shared.io.poll_recv_decode(&slf.shared.codec, cx)
{
Ok(decoded) => {
slf.update_timer(&decoded);
if let Some(el) = decoded.item {
DispatchItem::Item(el)
} else {
return Poll::Pending;
}
}
}
Err(RecvError::KeepAlive) => {
log::trace!("keep-alive error, stopping dispatcher");
slf.st = DispatcherState::Stop;
if slf.flags.contains(Flags::READ_TIMEOUT) {
DispatchItem::ReadTimeout
} else {
DispatchItem::KeepAliveTimeout
Err(RecvError::KeepAlive) => {
log::trace!(
"keep-alive error, stopping dispatcher"
);
slf.st = DispatcherState::Stop;
if slf.flags.contains(Flags::READ_TIMEOUT) {
DispatchItem::ReadTimeout
} else {
DispatchItem::KeepAliveTimeout
}
}
Err(RecvError::Stop) => {
log::trace!("dispatcher is instructed to stop");
slf.st = DispatcherState::Stop;
continue;
}
Err(RecvError::WriteBackpressure) => {
// instruct write task to notify dispatcher when data is flushed
slf.st = DispatcherState::Backpressure;
DispatchItem::WBackPressureEnabled
}
Err(RecvError::Decoder(err)) => {
log::trace!(
"decoder error, stopping dispatcher: {:?}",
err
);
slf.st = DispatcherState::Stop;
DispatchItem::DecoderError(err)
}
Err(RecvError::PeerGone(err)) => {
log::trace!(
"peer is gone, stopping dispatcher: {:?}",
err
);
slf.st = DispatcherState::Stop;
DispatchItem::Disconnect(err)
}
}
Err(RecvError::Stop) => {
log::trace!("dispatcher is instructed to stop");
slf.st = DispatcherState::Stop;
continue;
}
Err(RecvError::WriteBackpressure) => {
// instruct write task to notify dispatcher when data is flushed
slf.st = DispatcherState::Backpressure;
DispatchItem::WBackPressureEnabled
}
Err(RecvError::Decoder(err)) => {
log::trace!(
"decoder error, stopping dispatcher: {:?}",
err
);
slf.st = DispatcherState::Stop;
DispatchItem::DecoderError(err)
}
Err(RecvError::PeerGone(err)) => {
log::trace!(
"peer is gone, stopping dispatcher: {:?}",
err
);
slf.st = DispatcherState::Stop;
DispatchItem::Disconnect(err)
}
}
}
@ -492,8 +501,8 @@ where
// remove all timers
if self.flags.contains(Flags::READ_TIMEOUT) {
self.flags.remove(Flags::READ_TIMEOUT);
self.shared.io.stop_timer();
}
self.shared.io.stop_timer();
match ready!(self.shared.io.poll_read_pause(cx)) {
IoStatusUpdate::KeepAlive => {