preserve original error during ioo shutdown

This commit is contained in:
Nikolay Kim 2020-04-13 08:58:05 +06:00
parent 942e694f1a
commit f52f7e616c

View file

@ -183,7 +183,7 @@ enum FramedState<S: Service, U: Encoder + Decoder> {
Error(DispatcherError<S::Error, U>), Error(DispatcherError<S::Error, U>),
FlushAndStop, FlushAndStop,
Shutdown(Option<DispatcherError<S::Error, U>>), Shutdown(Option<DispatcherError<S::Error, U>>),
ShutdownIo(Delay), ShutdownIo(Delay, Option<Result<(), DispatcherError<S::Error, U>>>),
} }
#[derive(Copy, Clone, PartialEq, Eq, Debug)] #[derive(Copy, Clone, PartialEq, Eq, Debug)]
@ -389,28 +389,44 @@ where
} }
FramedState::Shutdown(ref mut err) => { FramedState::Shutdown(ref mut err) => {
return if self.service.poll_shutdown(cx, err.is_some()).is_ready() { return if self.service.poll_shutdown(cx, err.is_some()).is_ready() {
if let Some(err) = err.take() { let result = if let Some(err) = err.take() {
Poll::Ready(Err(err)) if let DispatcherError::Service(_) = err {
} else { Err(err)
let pending = self.framed.close(cx).is_pending();
if self.disconnect_timeout != 0 && pending {
self.state = FramedState::ShutdownIo(delay_for(
Duration::from_millis(self.disconnect_timeout),
));
continue;
} else { } else {
Poll::Ready(Ok(())) // no need for io shutdown because io error occured
return Poll::Ready(Err(err));
} }
} else {
Ok(())
};
// frame close, closes io WR side and waits for disconnect
// on read side. we need disconnect timeout, because it
// could hang forever.
let pending = self.framed.close(cx).is_pending();
if self.disconnect_timeout != 0 && pending {
self.state = FramedState::ShutdownIo(
delay_for(Duration::from_millis(
self.disconnect_timeout,
)),
Some(result),
);
continue;
} else {
Poll::Ready(result)
} }
} else { } else {
Poll::Pending Poll::Pending
} };
} }
FramedState::ShutdownIo(ref mut delay) => { FramedState::ShutdownIo(ref mut delay, ref mut err) => {
if let Poll::Ready(res) = self.framed.close(cx) { if let Poll::Ready(res) = self.framed.close(cx) {
return Poll::Ready( return match err.take() {
res.map_err(|e| DispatcherError::Encoder(e.into())), Some(Ok(_)) | None => Poll::Ready(
); res.map_err(|e| DispatcherError::Encoder(e.into())),
),
Some(Err(e)) => Poll::Ready(Err(e)),
};
} else { } else {
ready!(Pin::new(delay).poll(cx)); ready!(Pin::new(delay).poll(cx));
return Poll::Ready(Ok(())); return Poll::Ready(Ok(()));