diff --git a/ntex-util/src/channel/mpsc.rs b/ntex-util/src/channel/mpsc.rs index dc311a44..6b089227 100644 --- a/ntex-util/src/channel/mpsc.rs +++ b/ntex-util/src/channel/mpsc.rs @@ -1,7 +1,7 @@ //! A multi-producer, single-consumer, futures-aware, FIFO queue. use std::{collections::VecDeque, fmt, pin::Pin, task::Context, task::Poll}; -use futures_core::Stream; +use futures_core::{FusedStream, Stream}; use futures_sink::Sink; use super::cell::{Cell, WeakCell}; @@ -206,6 +206,12 @@ impl Stream for Receiver { } } +impl FusedStream for Receiver { + fn is_terminated(&self) -> bool { + self.is_closed() + } +} + impl Drop for Receiver { fn drop(&mut self) { let shared = self.shared.get_mut(); @@ -313,10 +319,12 @@ mod tests { let (tx, rx) = channel::<()>(); assert!(!tx.is_closed()); assert!(!rx.is_closed()); + assert!(!rx.is_terminated()); tx.close(); assert!(tx.is_closed()); assert!(rx.is_closed()); + assert!(rx.is_terminated()); let (tx, rx) = channel::<()>(); rx.close(); @@ -325,7 +333,9 @@ mod tests { let (tx, rx) = channel::<()>(); drop(tx); assert!(rx.is_closed()); + assert!(rx.is_terminated()); let _tx = rx.sender(); assert!(!rx.is_closed()); + assert!(!rx.is_terminated()); } }