This commit is contained in:
Nikolay Kim 2025-03-12 14:25:18 +05:00
parent facb30dab2
commit 8f45ad2db3
6 changed files with 90 additions and 4 deletions

View file

@ -1,8 +1,7 @@
use std::{cell::RefCell, fmt, io, mem, num::NonZeroU32, rc::Rc, task::Poll};
use io_uring::{opcode, squeue::Entry, types::Fd};
use ntex_neon::driver::op::Handler;
use ntex_neon::driver::{AsRawFd, DriverApi};
use ntex_neon::driver::{op::Handler, AsRawFd, DriverApi};
use ntex_neon::Runtime;
use ntex_util::channel::oneshot;
use slab::Slab;

View file

@ -1,5 +1,11 @@
# Changes
## [2.10.0] - 2025-03-12
* Add "Inplace" channel
* Expose "yield_to" helper
## [2.9.0] - 2025-01-15
* Add EitherService/EitherServiceFactory

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-util"
version = "2.9.0"
version = "2.10.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for ntex framework"
keywords = ["network", "framework", "async", "futures"]

View file

@ -0,0 +1,81 @@
//! A futures-aware bounded(1) channel.
use std::{cell::Cell, fmt, future::poll_fn, task::Context, task::Poll};
use crate::task::LocalWaker;
/// Creates a new futures-aware, channel.
pub fn channel<T>() -> Inplace<T> {
Inplace {
value: Cell::new(None),
rx_task: LocalWaker::new(),
}
}
/// A futures-aware bounded(1) channel.
pub struct Inplace<T> {
value: Cell<Option<T>>,
rx_task: LocalWaker,
}
// The channels do not ever project Pin to the inner T
impl<T> Unpin for Inplace<T> {}
impl<T> fmt::Debug for Inplace<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Inplace<T>")
}
}
impl<T> Inplace<T> {
/// Set a successful result.
///
/// If the value is successfully enqueued for the remote end to receive,
/// then `Ok(())` is returned. If previose value is not consumed
/// then `Err` is returned with the value provided.
pub fn send(&self, val: T) -> Result<(), T> {
if let Some(v) = self.value.take() {
self.value.set(Some(v));
Err(val)
} else {
self.value.set(Some(val));
self.rx_task.wake();
Ok(())
}
}
/// Wait until the oneshot is ready and return value
pub async fn recv(&self) -> T {
poll_fn(|cx| self.poll_recv(cx)).await
}
/// Polls the oneshot to determine if value is ready
pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<T> {
// If we've got a value, then skip the logic below as we're done.
if let Some(val) = self.value.take() {
return Poll::Ready(val);
}
// Check if sender is dropped and return error if it is.
self.rx_task.register(cx.waker());
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::future::lazy;
#[ntex_macros::rt_test2]
async fn test_inplace() {
let ch = channel();
assert_eq!(lazy(|cx| ch.poll_recv(cx)).await, Poll::Pending);
assert!(ch.send(1).is_ok());
assert!(ch.send(2) == Err(2));
assert_eq!(lazy(|cx| ch.poll_recv(cx)).await, Poll::Ready(1));
assert!(ch.send(1).is_ok());
assert_eq!(ch.recv().await, 1);
}
}

View file

@ -2,6 +2,7 @@
mod cell;
pub mod condition;
pub mod inplace;
pub mod mpsc;
pub mod oneshot;
pub mod pool;

View file

@ -91,7 +91,6 @@ impl fmt::Debug for LocalWaker {
}
}
#[doc(hidden)]
/// Yields execution back to the current runtime.
pub async fn yield_to() {
use std::{future::Future, pin::Pin, task::Context, task::Poll};