add Receiver::poll_recv() method

This commit is contained in:
Nikolay Kim 2021-12-21 17:11:10 +06:00
parent c9271144aa
commit 38258c5019
4 changed files with 23 additions and 12 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.1.4] - 2021-12-21
* mpsc: add Receiver::poll_recv() method
## [0.1.3] - 2021-12-18
* move ntex::channel::mpsc

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-util"
version = "0.1.3"
version = "0.1.4"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for ntex framework"
keywords = ["network", "framework", "async", "futures"]

View file

@ -164,17 +164,11 @@ impl<T> Receiver<T> {
pub fn is_closed(&self) -> bool {
self.shared.strong_count() == 1 || !self.shared.get_ref().has_receiver
}
}
impl<T> Unpin for Receiver<T> {}
impl<T> Stream for Receiver<T> {
type Item = T;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
/// Attempt to pull out the next value of this receiver, registering
/// the current task for wakeup if the value is not yet available,
/// and returning None if the stream is exhausted.
pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Option<T>> {
let shared = self.shared.get_mut();
if let Some(msg) = shared.buffer.pop_front() {
@ -194,6 +188,19 @@ impl<T> Stream for Receiver<T> {
}
}
impl<T> Unpin for Receiver<T> {}
impl<T> Stream for Receiver<T> {
type Item = T;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.poll_recv(cx)
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
let shared = self.shared.get_mut();

View file

@ -43,7 +43,7 @@ ntex-codec = "0.6.0"
ntex-router = "0.5.1"
ntex-service = "0.2.1"
ntex-macros = "0.1.3"
ntex-util = "0.1.3"
ntex-util = "0.1.4"
ntex-bytes = "0.1.8"
ntex-tls = "0.1.0-b.2"
ntex-io = "0.1.0-b.3"