Better io error handling (#482)

This commit is contained in:
Nikolay Kim 2024-12-05 14:02:59 +05:00 committed by GitHub
parent 2631e70a4b
commit 22ee7f2af2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 63 additions and 45 deletions

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [2.9.2] - 2024-12-05
* Better error handling
## [2.9.1] - 2024-12-04 ## [2.9.1] - 2024-12-04
* Check service readiness for every turn * Check service readiness for every turn

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-io" name = "ntex-io"
version = "2.9.1" version = "2.9.2"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames" description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View file

@ -80,6 +80,23 @@ impl IoState {
} }
} }
/// Get current io error
pub(super) fn error(&self) -> Option<io::Error> {
if let Some(err) = self.error.take() {
self.error
.set(Some(io::Error::new(err.kind(), format!("{}", err))));
Some(err)
} else {
None
}
}
/// Get current io result
pub(super) fn error_or_disconnected(&self) -> io::Error {
self.error()
.unwrap_or_else(|| io::Error::new(io::ErrorKind::NotConnected, "Disconnected"))
}
pub(super) fn io_stopped(&self, err: Option<io::Error>) { pub(super) fn io_stopped(&self, err: Option<io::Error>) {
if err.is_some() { if err.is_some() {
self.error.set(err); self.error.set(err);
@ -257,19 +274,6 @@ impl<F> Io<F> {
fn io_ref(&self) -> &IoRef { fn io_ref(&self) -> &IoRef {
unsafe { &*self.0.get() } unsafe { &*self.0.get() }
} }
/// Get current io error
fn error(&self) -> Option<io::Error> {
self.st().error.take()
}
/// Get current io error
fn error_or_disconnected(&self) -> io::Error {
self.st()
.error
.take()
.unwrap_or_else(|| io::Error::new(io::ErrorKind::Other, "Disconnected"))
}
} }
impl<F: FilterLayer, T: Filter> Io<Layer<F, T>> { impl<F: FilterLayer, T: Filter> Io<Layer<F, T>> {
@ -333,7 +337,7 @@ impl<F> Io<F> {
"Timeout", "Timeout",
))), ))),
Err(RecvError::Stop) => Err(Either::Right(io::Error::new( Err(RecvError::Stop) => Err(Either::Right(io::Error::new(
io::ErrorKind::Other, io::ErrorKind::UnexpectedEof,
"Dispatcher stopped", "Dispatcher stopped",
))), ))),
Err(RecvError::WriteBackpressure) => { Err(RecvError::WriteBackpressure) => {
@ -423,7 +427,7 @@ impl<F> Io<F> {
let mut flags = st.flags.get(); let mut flags = st.flags.get();
if flags.is_stopped() { if flags.is_stopped() {
Poll::Ready(Err(self.error_or_disconnected())) Poll::Ready(Err(st.error_or_disconnected()))
} else { } else {
st.dispatch_task.register(cx.waker()); st.dispatch_task.register(cx.waker());
@ -511,7 +515,7 @@ impl<F> Io<F> {
let st = self.st(); let st = self.st();
let flags = st.flags.get(); let flags = st.flags.get();
if flags.is_stopped() { if flags.is_stopped() {
Err(RecvError::PeerGone(self.error())) Err(RecvError::PeerGone(st.error()))
} else if flags.contains(Flags::DSP_STOP) { } else if flags.contains(Flags::DSP_STOP) {
st.remove_flags(Flags::DSP_STOP); st.remove_flags(Flags::DSP_STOP);
Err(RecvError::Stop) Err(RecvError::Stop)
@ -545,12 +549,12 @@ impl<F> Io<F> {
/// otherwise wake up when size of write buffer is lower than /// otherwise wake up when size of write buffer is lower than
/// buffer max size. /// buffer max size.
pub fn poll_flush(&self, cx: &mut Context<'_>, full: bool) -> Poll<io::Result<()>> { pub fn poll_flush(&self, cx: &mut Context<'_>, full: bool) -> Poll<io::Result<()>> {
let st = self.st();
let flags = self.flags(); let flags = self.flags();
if flags.is_stopped() { if flags.is_stopped() {
Poll::Ready(Err(self.error_or_disconnected())) Poll::Ready(Err(st.error_or_disconnected()))
} else { } else {
let st = self.st();
let len = st.buffer.write_destination_size(); let len = st.buffer.write_destination_size();
if len > 0 { if len > 0 {
if full { if full {
@ -575,7 +579,7 @@ impl<F> Io<F> {
let flags = st.flags.get(); let flags = st.flags.get();
if flags.is_stopped() { if flags.is_stopped() {
if let Some(err) = self.error() { if let Some(err) = st.error() {
Poll::Ready(Err(err)) Poll::Ready(Err(err))
} else { } else {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
@ -611,7 +615,7 @@ impl<F> Io<F> {
let st = self.st(); let st = self.st();
let flags = st.flags.get(); let flags = st.flags.get();
if flags.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING) { if flags.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING) {
Poll::Ready(IoStatusUpdate::PeerGone(self.error())) Poll::Ready(IoStatusUpdate::PeerGone(st.error()))
} else if flags.contains(Flags::DSP_STOP) { } else if flags.contains(Flags::DSP_STOP) {
st.remove_flags(Flags::DSP_STOP); st.remove_flags(Flags::DSP_STOP);
Poll::Ready(IoStatusUpdate::Stop) Poll::Ready(IoStatusUpdate::Stop)

View file

@ -191,7 +191,7 @@ impl IoRef {
F: FnOnce(&mut BytesVec) -> R, F: FnOnce(&mut BytesVec) -> R,
{ {
if self.0.flags.get().contains(Flags::IO_STOPPED) { if self.0.flags.get().contains(Flags::IO_STOPPED) {
Err(io::Error::new(io::ErrorKind::Other, "Disconnected")) Err(self.0.error_or_disconnected())
} else { } else {
let result = self.0.buffer.with_write_source(self, f); let result = self.0.buffer.with_write_source(self, f);
self.0.filter().process_write_buf(self, &self.0.buffer, 0)?; self.0.filter().process_write_buf(self, &self.0.buffer, 0)?;

View file

@ -360,7 +360,7 @@ pub fn bind_addr<S: net::ToSocketAddrs>(
Err(e) Err(e)
} else { } else {
Err(io::Error::new( Err(io::Error::new(
io::ErrorKind::Other, io::ErrorKind::InvalidInput,
"Cannot bind to address.", "Cannot bind to address.",
)) ))
} }

View file

@ -51,11 +51,11 @@ impl<T: Address> SslConnector<T> {
log::trace!("{}: SSL Handshake start for: {:?}", io.tag(), host); log::trace!("{}: SSL Handshake start for: {:?}", io.tag(), host);
match openssl.configure() { match openssl.configure() {
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e).into()), Err(e) => Err(io::Error::new(io::ErrorKind::InvalidInput, e).into()),
Ok(config) => { Ok(config) => {
let ssl = config let ssl = config
.into_ssl(&host) .into_ssl(&host)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
let tag = io.tag(); let tag = io.tag();
match connect_io(io, ssl).await { match connect_io(io, ssl).await {
Ok(io) => { Ok(io) => {
@ -64,7 +64,10 @@ impl<T: Address> SslConnector<T> {
} }
Err(e) => { Err(e) => {
log::trace!("{}: SSL Handshake error: {:?}", tag, e); log::trace!("{}: SSL Handshake error: {:?}", tag, e);
Err(io::Error::new(io::ErrorKind::Other, format!("{}", e)).into()) Err(
io::Error::new(io::ErrorKind::InvalidInput, format!("{}", e))
.into(),
)
} }
} }
} }

View file

@ -250,7 +250,9 @@ async fn handle_result<T, F>(
ssl::ErrorCode::WANT_READ => { ssl::ErrorCode::WANT_READ => {
let res = io.read_notify().await; let res = io.read_notify().await;
match res? { match res? {
None => Err(io::Error::new(io::ErrorKind::Other, "disconnected")), None => {
Err(io::Error::new(io::ErrorKind::NotConnected, "disconnected"))
}
_ => Ok(None), _ => Ok(None),
} }
} }

View file

@ -187,14 +187,17 @@ async fn get_response(
err err
); );
pl.set_error( pl.set_error(
io::Error::new(io::ErrorKind::Other, err) io::Error::new(
io::ErrorKind::UnexpectedEof,
err,
)
.into(), .into(),
); );
} }
_ => { _ => {
pl.set_error( pl.set_error(
io::Error::new( io::Error::new(
io::ErrorKind::Other, io::ErrorKind::Unsupported,
"unexpected h2 message", "unexpected h2 message",
) )
.into(), .into(),
@ -216,7 +219,7 @@ async fn get_response(
} }
} }
_ => Err(SendRequestError::Error(Box::new(io::Error::new( _ => Err(SendRequestError::Error(Box::new(io::Error::new(
io::ErrorKind::Other, io::ErrorKind::Unsupported,
"unexpected h2 message", "unexpected h2 message",
)))), )))),
} }

View file

@ -117,7 +117,7 @@ impl<B: MessageBody> MessageBody for Encoder<B> {
Poll::Ready(Ok(Err(e))) => return Poll::Ready(Some(Err(Box::new(e)))), Poll::Ready(Ok(Err(e))) => return Poll::Ready(Some(Err(Box::new(e)))),
Poll::Ready(Err(_)) => { Poll::Ready(Err(_)) => {
return Poll::Ready(Some(Err(Box::new(io::Error::new( return Poll::Ready(Some(Err(Box::new(io::Error::new(
io::ErrorKind::Other, io::ErrorKind::Interrupted,
"Canceled", "Canceled",
))))); )))));
} }

View file

@ -217,7 +217,7 @@ pub enum BlockingError<E: fmt::Debug> {
impl From<crate::rt::JoinError> for PayloadError { impl From<crate::rt::JoinError> for PayloadError {
fn from(_: crate::rt::JoinError) -> Self { fn from(_: crate::rt::JoinError) -> Self {
PayloadError::Io(io::Error::new( PayloadError::Io(io::Error::new(
io::ErrorKind::Other, io::ErrorKind::Interrupted,
"Operation is canceled", "Operation is canceled",
)) ))
} }
@ -228,7 +228,7 @@ impl From<BlockingError<io::Error>> for PayloadError {
match err { match err {
BlockingError::Error(e) => PayloadError::Io(e), BlockingError::Error(e) => PayloadError::Io(e),
BlockingError::Canceled => PayloadError::Io(io::Error::new( BlockingError::Canceled => PayloadError::Io(io::Error::new(
io::ErrorKind::Other, io::ErrorKind::Interrupted,
"Operation is canceled", "Operation is canceled",
)), )),
} }

View file

@ -408,7 +408,9 @@ where
h2::MessageKind::Disconnect(err) => { h2::MessageKind::Disconnect(err) => {
log::debug!("Connection is disconnected {:?}", err); log::debug!("Connection is disconnected {:?}", err);
if let Some(mut sender) = self.streams.borrow_mut().remove(&stream.id()) { if let Some(mut sender) = self.streams.borrow_mut().remove(&stream.id()) {
sender.set_error(io::Error::new(io::ErrorKind::Other, err).into()); sender.set_error(
io::Error::new(io::ErrorKind::UnexpectedEof, err).into(),
);
} }
return Ok(()); return Ok(());
} }

View file

@ -257,7 +257,7 @@ where
server, server,
client: Client::build().finish(), client: Client::build().finish(),
} }
.set_client_timeout(Seconds(30), Millis(30_000)) .set_client_timeout(Seconds(45), Millis(45_000))
} }
#[derive(Debug)] #[derive(Debug)]

View file

@ -467,7 +467,7 @@ where
Err(e) Err(e)
} else { } else {
Err(io::Error::new( Err(io::Error::new(
io::ErrorKind::Other, io::ErrorKind::InvalidInput,
"Cannot bind to address.", "Cannot bind to address.",
)) ))
} }

View file

@ -717,8 +717,8 @@ where
.map_err(|e| log::error!("Cannot set alpn protocol: {:?}", e)); .map_err(|e| log::error!("Cannot set alpn protocol: {:?}", e));
Connector::default() Connector::default()
.lifetime(Seconds::ZERO) .lifetime(Seconds::ZERO)
.keep_alive(Seconds(30)) .keep_alive(Seconds(45))
.timeout(Millis(30_000)) .timeout(Millis(45_000))
.disconnect_timeout(Seconds(5)) .disconnect_timeout(Seconds(5))
.openssl(builder.build()) .openssl(builder.build())
.finish() .finish()
@ -727,14 +727,14 @@ where
{ {
Connector::default() Connector::default()
.lifetime(Seconds::ZERO) .lifetime(Seconds::ZERO)
.timeout(Millis(30_000)) .timeout(Millis(45_000))
.finish() .finish()
} }
}; };
Client::build() Client::build()
.connector(connector) .connector(connector)
.timeout(Seconds(30)) .timeout(Seconds(45))
.finish() .finish()
}; };

View file

@ -54,7 +54,7 @@ impl WsTransport {
Ok(()) Ok(())
} else { } else {
self.insert_flags(Flags::PROTO_ERR); self.insert_flags(Flags::PROTO_ERR);
Err(io::Error::new(io::ErrorKind::Other, err_message)) Err(io::Error::new(io::ErrorKind::InvalidData, err_message))
} }
} }
} }
@ -96,7 +96,7 @@ impl FilterLayer for WsTransport {
self.codec.decode_vec(&mut src).map_err(|e| { self.codec.decode_vec(&mut src).map_err(|e| {
log::trace!("Failed to decode ws codec frames: {:?}", e); log::trace!("Failed to decode ws codec frames: {:?}", e);
self.insert_flags(Flags::PROTO_ERR); self.insert_flags(Flags::PROTO_ERR);
io::Error::new(io::ErrorKind::Other, e) io::Error::new(io::ErrorKind::InvalidData, e)
})? { })? {
frame frame
} else { } else {
@ -123,14 +123,14 @@ impl FilterLayer for WsTransport {
Frame::Continuation(Item::FirstText(_)) => { Frame::Continuation(Item::FirstText(_)) => {
self.insert_flags(Flags::PROTO_ERR); self.insert_flags(Flags::PROTO_ERR);
return Err(io::Error::new( return Err(io::Error::new(
io::ErrorKind::Other, io::ErrorKind::InvalidData,
"WebSocket Text continuation frames are not supported", "WebSocket Text continuation frames are not supported",
)); ));
} }
Frame::Text(_) => { Frame::Text(_) => {
self.insert_flags(Flags::PROTO_ERR); self.insert_flags(Flags::PROTO_ERR);
return Err(io::Error::new( return Err(io::Error::new(
io::ErrorKind::Other, io::ErrorKind::InvalidData,
"WebSockets Text frames are not supported", "WebSockets Text frames are not supported",
)); ));
} }