This commit is contained in:
Nikolay Kim 2025-03-09 14:59:37 +05:00
parent 41e9f45bf8
commit 57e087e20b
18 changed files with 711 additions and 759 deletions

View file

@ -64,6 +64,7 @@ futures-util = "0.3.29"
fxhash = "0.2"
libc = "0.2.164"
log = "0.4"
nohash-hasher = "0.2.0"
scoped-tls = "1.0.1"
slab = "0.4.9"
socket2 = "0.5.6"

View file

@ -1,5 +1,9 @@
# Changes
## [2.11.0] - 2025-03-10
* Add single io context
## [2.10.0] - 2025-02-26
* Impl Filter for Sealed #506

View file

@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "2.10.0"
version = "2.11.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]

View file

@ -1244,6 +1244,8 @@ mod tests {
sleep(Millis(50)).await;
if let DispatchItem::Item(msg) = msg {
Ok::<_, ()>(Some(msg.freeze()))
} else if let DispatchItem::Disconnect(_) = msg {
Ok::<_, ()>(None)
} else {
panic!()
}

View file

@ -29,7 +29,7 @@ pub use self::filter::{Base, Filter, Layer};
pub use self::framed::Framed;
pub use self::io::{Io, IoRef, OnDisconnect};
pub use self::seal::{IoBoxed, Sealed};
pub use self::tasks::{ReadContext, WriteContext, WriteContextBuf};
pub use self::tasks::{IoContext, ReadContext, WriteContext, WriteContextBuf};
pub use self::timer::TimerHandle;
pub use self::utils::{seal, Decoded};
@ -77,7 +77,7 @@ pub trait FilterLayer: fmt::Debug + 'static {
#[inline]
/// Check readiness for read operations
fn poll_read_ready(&self, waker: &mut Context<'_>) -> Poll<ReadStatus> {
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<ReadStatus> {
Poll::Ready(ReadStatus::Ready)
}

View file

@ -19,26 +19,21 @@ impl ReadContext {
Self(io.clone(), Cell::new(None))
}
#[doc(hidden)]
#[inline]
/// Io tag
pub fn context(&self) -> IoContext {
IoContext::new(&self.0)
}
#[inline]
/// Io tag
pub fn tag(&self) -> &'static str {
self.0.tag()
}
#[doc(hidden)]
/// Io flags
pub fn flags(&self) -> crate::flags::Flags {
self.0.flags()
}
#[inline]
/// Check readiness for read operations
pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<ReadStatus> {
self.0.filter().poll_read_ready(cx)
}
/// Wait when io get closed or preparing for close
pub async fn wait_for_close(&self) {
async fn wait_for_close(&self) {
poll_fn(|cx| {
let flags = self.0.flags();
@ -55,111 +50,6 @@ impl ReadContext {
.await
}
#[inline]
/// Get io error
pub fn set_stopped(&self, e: Option<io::Error>) {
self.0 .0.io_stopped(e);
}
/// Get read buffer
pub fn with_buf<F>(&self, f: F) -> Poll<()>
where
F: FnOnce(&mut BytesVec) -> Poll<io::Result<usize>>,
{
let inner = &self.0 .0;
let (hw, lw) = self.0.memory_pool().read_params().unpack();
let result = inner.buffer.with_read_source(&self.0, |buf| {
// make sure we've got room
let remaining = buf.remaining_mut();
if remaining < lw {
buf.reserve(hw - remaining);
}
// call provided callback
f(buf)
});
// handle buffer changes
match result {
Poll::Ready(Ok(0)) => {
inner.io_stopped(None);
Poll::Ready(())
}
Poll::Ready(Ok(nbytes)) => {
let filter = self.0.filter();
let _ = filter
.process_read_buf(&self.0, &inner.buffer, 0, nbytes)
.and_then(|status| {
if status.nbytes > 0 {
// dest buffer has new data, wake up dispatcher
if inner.buffer.read_destination_size() >= hw {
log::trace!(
"{}: Io read buffer is too large {}, enable read back-pressure",
self.0.tag(),
nbytes
);
inner.insert_flags(Flags::BUF_R_READY | Flags::BUF_R_FULL);
} else {
inner.insert_flags(Flags::BUF_R_READY);
if nbytes >= hw {
// read task is paused because of read back-pressure
// but there is no new data in top most read buffer
// so we need to wake up read task to read more data
// otherwise read task would sleep forever
inner.read_task.wake();
}
}
log::trace!(
"{}: New {} bytes available, wakeup dispatcher",
self.0.tag(),
nbytes
);
inner.dispatch_task.wake();
} else {
if nbytes >= hw {
// read task is paused because of read back-pressure
// but there is no new data in top most read buffer
// so we need to wake up read task to read more data
// otherwise read task would sleep forever
inner.read_task.wake();
}
if inner.flags.get().contains(Flags::RD_NOTIFY) {
// in case of "notify" we must wake up dispatch task
// if we read any data from source
inner.dispatch_task.wake();
}
}
// while reading, filter wrote some data
// in that case filters need to process write buffers
// and potentialy wake write task
if status.need_write {
filter.process_write_buf(&self.0, &inner.buffer, 0)
} else {
Ok(())
}
})
.map_err(|err| {
inner.dispatch_task.wake();
inner.io_stopped(Some(err));
inner.insert_flags(Flags::BUF_R_READY);
});
Poll::Pending
}
Poll::Ready(Err(e)) => {
inner.io_stopped(Some(e));
Poll::Ready(())
}
Poll::Pending => {
if inner.flags.get().contains(Flags::IO_STOPPING_FILTERS) {
shutdown_filters(&self.0);
}
Poll::Pending
}
}
}
/// Handle read io operations
pub async fn handle<T>(&self, io: &mut T)
where
@ -283,70 +173,18 @@ impl ReadContext {
}
}
pub fn shutdown_filters(&self, cx: &mut Context<'_>) {
fn shutdown_filters(&self, cx: &mut Context<'_>) {
let st = &self.0 .0;
let filter = self.0.filter();
if st.flags.get().contains(Flags::IO_STOPPING_FILTERS) {
let filter = self.0.filter();
match filter.shutdown(&self.0, &st.buffer, 0) {
Ok(Poll::Ready(())) => {
st.dispatch_task.wake();
st.insert_flags(Flags::IO_STOPPING);
}
Ok(Poll::Pending) => {
let flags = st.flags.get();
// check read buffer, if buffer is not consumed it is unlikely
// that filter will properly complete shutdown
if flags.contains(Flags::RD_PAUSED)
|| flags.contains(Flags::BUF_R_FULL | Flags::BUF_R_READY)
{
st.dispatch_task.wake();
st.insert_flags(Flags::IO_STOPPING);
} else {
// filter shutdown timeout
let timeout = self
.1
.take()
.unwrap_or_else(|| sleep(st.disconnect_timeout.get()));
if timeout.poll_elapsed(cx).is_ready() {
st.dispatch_task.wake();
st.insert_flags(Flags::IO_STOPPING);
} else {
self.1.set(Some(timeout));
}
}
}
Err(err) => {
st.io_stopped(Some(err));
}
}
if let Err(err) = filter.process_write_buf(&self.0, &st.buffer, 0) {
st.io_stopped(Some(err));
}
}
}
}
impl Clone for ReadContext {
fn clone(&self) -> Self {
Self(self.0.clone(), Cell::new(None))
}
}
fn shutdown_filters(io: &IoRef) {
let st = &io.0;
let flags = st.flags.get();
if !flags.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING) {
let filter = io.filter();
match filter.shutdown(io, &st.buffer, 0) {
match filter.shutdown(&self.0, &st.buffer, 0) {
Ok(Poll::Ready(())) => {
st.dispatch_task.wake();
st.insert_flags(Flags::IO_STOPPING);
}
Ok(Poll::Pending) => {
let flags = st.flags.get();
// check read buffer, if buffer is not consumed it is unlikely
// that filter will properly complete shutdown
if flags.contains(Flags::RD_PAUSED)
@ -354,13 +192,25 @@ fn shutdown_filters(io: &IoRef) {
{
st.dispatch_task.wake();
st.insert_flags(Flags::IO_STOPPING);
} else {
// filter shutdown timeout
let timeout = self
.1
.take()
.unwrap_or_else(|| sleep(st.disconnect_timeout.get()));
if timeout.poll_elapsed(cx).is_ready() {
st.dispatch_task.wake();
st.insert_flags(Flags::IO_STOPPING);
} else {
self.1.set(Some(timeout));
}
}
}
Err(err) => {
st.io_stopped(Some(err));
}
}
if let Err(err) = filter.process_write_buf(io, &st.buffer, 0) {
if let Err(err) = filter.process_write_buf(&self.0, &st.buffer, 0) {
st.io_stopped(Some(err));
}
}
@ -393,12 +243,6 @@ impl WriteContext {
poll_fn(|cx| self.0.filter().poll_write_ready(cx)).await
}
#[inline]
/// Check readiness for write operations
pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<WriteStatus> {
self.0.filter().poll_write_ready(cx)
}
/// Indicate that write io task is stopped
fn close(&self, err: Option<io::Error>) {
self.0 .0.io_stopped(err);
@ -417,11 +261,143 @@ impl WriteContext {
.await
}
/// Wait when io get closed or preparing for close
pub async fn wait_for_shutdown(&self, flush_buf: bool) {
let st = &self.0 .0;
/// Handle write io operations
pub async fn handle<T>(&self, io: &mut T)
where
T: AsyncWrite,
{
let mut buf = WriteContextBuf {
io: self.0.clone(),
buf: None,
};
// filter shutdown timeout
loop {
match self.ready().await {
WriteStatus::Ready => {
// write io stream
match select(io.write(&mut buf), self.when_stopped()).await {
Either::Left(Ok(_)) => continue,
Either::Left(Err(e)) => self.close(Some(e)),
Either::Right(_) => return,
}
}
WriteStatus::Shutdown => {
log::trace!("{}: Write task is instructed to shutdown", self.tag());
let fut = async {
// write io stream
io.write(&mut buf).await?;
io.flush().await?;
io.shutdown().await?;
Ok(())
};
match select(sleep(self.0 .0.disconnect_timeout.get()), fut).await {
Either::Left(_) => self.close(None),
Either::Right(res) => self.close(res.err()),
}
}
WriteStatus::Terminate => {
log::trace!("{}: Write task is instructed to terminate", self.tag());
self.close(io.shutdown().await.err());
}
}
return;
}
}
}
impl WriteContextBuf {
pub fn set(&mut self, mut buf: BytesVec) {
if buf.is_empty() {
self.io.memory_pool().release_write_buf(buf);
} else if let Some(b) = self.buf.take() {
buf.extend_from_slice(&b);
self.io.memory_pool().release_write_buf(b);
self.buf = Some(buf);
} else if let Some(b) = self.io.0.buffer.set_write_destination(buf) {
// write buffer is already set
self.buf = Some(b);
}
// if write buffer is smaller than high watermark value, turn off back-pressure
let inner = &self.io.0;
let len = self.buf.as_ref().map(|b| b.len()).unwrap_or_default()
+ inner.buffer.write_destination_size();
let mut flags = inner.flags.get();
if len == 0 {
if flags.is_waiting_for_write() {
flags.waiting_for_write_is_done();
inner.dispatch_task.wake();
}
flags.insert(Flags::WR_PAUSED);
inner.flags.set(flags);
} else if flags.contains(Flags::BUF_W_BACKPRESSURE)
&& len < inner.pool.get().write_params_high() << 1
{
flags.remove(Flags::BUF_W_BACKPRESSURE);
inner.flags.set(flags);
inner.dispatch_task.wake();
}
}
pub fn take(&mut self) -> Option<BytesVec> {
if let Some(buf) = self.buf.take() {
Some(buf)
} else {
self.io.0.buffer.get_write_destination()
}
}
}
/// Context for io read task
pub struct IoContext(IoRef);
impl fmt::Debug for IoContext {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("IoContext").field("io", &self.0).finish()
}
}
impl IoContext {
pub(crate) fn new(io: &IoRef) -> Self {
Self(io.clone())
}
#[inline]
/// Io tag
pub fn tag(&self) -> &'static str {
self.0.tag()
}
#[doc(hidden)]
/// Io flags
pub fn flags(&self) -> crate::flags::Flags {
self.0.flags()
}
#[inline]
/// Check readiness for read operations
pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<ReadStatus> {
self.shutdown_filters();
self.0.filter().poll_read_ready(cx)
}
#[inline]
/// Check readiness for write operations
pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<WriteStatus> {
self.0.filter().poll_write_ready(cx)
}
#[inline]
/// Get io error
pub fn stopped(&self, e: Option<io::Error>) {
self.0 .0.io_stopped(e);
}
/// Wait when io get closed or preparing for close
pub async fn shutdown(&self, flush_buf: bool) {
let st = &self.0 .0;
let mut timeout = None;
poll_fn(|cx| {
@ -471,83 +447,125 @@ impl WriteContext {
}
}
/// Handle write io operations
pub async fn handle<T>(&self, io: &mut T)
/// Get read buffer
pub fn with_read_buf<F>(&self, f: F) -> Poll<()>
where
T: AsyncWrite,
F: FnOnce(&mut BytesVec) -> Poll<io::Result<usize>>,
{
let mut buf = WriteContextBuf {
io: self.0.clone(),
buf: None,
};
loop {
match self.ready().await {
WriteStatus::Ready => {
// write io stream
match select(io.write(&mut buf), self.when_stopped()).await {
Either::Left(Ok(_)) => continue,
Either::Left(Err(e)) => self.close(Some(e)),
Either::Right(_) => return,
}
}
WriteStatus::Shutdown => {
log::trace!("{}: Write task is instructed to shutdown", self.tag());
let fut = async {
// write io stream
io.write(&mut buf).await?;
io.flush().await?;
io.shutdown().await?;
Ok(())
};
match select(sleep(self.0 .0.disconnect_timeout.get()), fut).await {
Either::Left(_) => self.close(None),
Either::Right(res) => self.close(res.err()),
}
}
WriteStatus::Terminate => {
log::trace!("{}: Write task is instructed to terminate", self.tag());
self.close(io.shutdown().await.err());
}
let inner = &self.0 .0;
let (hw, lw) = self.0.memory_pool().read_params().unpack();
let result = inner.buffer.with_read_source(&self.0, |buf| {
// make sure we've got room
let remaining = buf.remaining_mut();
if remaining < lw {
buf.reserve(hw - remaining);
}
f(buf)
});
// handle buffer changes
match result {
Poll::Ready(Ok(0)) => {
inner.io_stopped(None);
Poll::Ready(())
}
Poll::Ready(Ok(nbytes)) => {
let filter = self.0.filter();
let _ = filter
.process_read_buf(&self.0, &inner.buffer, 0, nbytes)
.and_then(|status| {
if status.nbytes > 0 {
// dest buffer has new data, wake up dispatcher
if inner.buffer.read_destination_size() >= hw {
log::trace!(
"{}: Io read buffer is too large {}, enable read back-pressure",
self.0.tag(),
nbytes
);
inner.insert_flags(Flags::BUF_R_READY | Flags::BUF_R_FULL);
} else {
inner.insert_flags(Flags::BUF_R_READY);
if nbytes >= hw {
// read task is paused because of read back-pressure
// but there is no new data in top most read buffer
// so we need to wake up read task to read more data
// otherwise read task would sleep forever
inner.read_task.wake();
}
}
log::trace!(
"{}: New {} bytes available, wakeup dispatcher",
self.0.tag(),
nbytes
);
inner.dispatch_task.wake();
} else {
if nbytes >= hw {
// read task is paused because of read back-pressure
// but there is no new data in top most read buffer
// so we need to wake up read task to read more data
// otherwise read task would sleep forever
inner.read_task.wake();
}
if inner.flags.get().contains(Flags::RD_NOTIFY) {
// in case of "notify" we must wake up dispatch task
// if we read any data from source
inner.dispatch_task.wake();
}
}
// while reading, filter wrote some data
// in that case filters need to process write buffers
// and potentialy wake write task
if status.need_write {
filter.process_write_buf(&self.0, &inner.buffer, 0)
} else {
Ok(())
}
})
.map_err(|err| {
inner.dispatch_task.wake();
inner.io_stopped(Some(err));
inner.insert_flags(Flags::BUF_R_READY);
});
Poll::Pending
}
Poll::Ready(Err(e)) => {
inner.io_stopped(Some(e));
Poll::Ready(())
}
Poll::Pending => {
self.shutdown_filters();
Poll::Pending
}
return;
}
}
/// Get write buffer
pub fn with_buf<F>(&self, f: F) -> Poll<()>
pub fn with_write_buf<F>(&self, f: F) -> Poll<()>
where
F: FnOnce(&BytesVec) -> Poll<io::Result<usize>>,
{
let inner = &self.0 .0;
// call provided callback
let result = inner.buffer.with_write_destination(&self.0, |buf| {
let buf = if let Some(buf) = buf {
buf
} else {
let Some(buf) =
buf.and_then(|buf| if buf.is_empty() { None } else { Some(buf) })
else {
return Poll::Ready(Ok(0));
};
if buf.is_empty() {
return Poll::Ready(Ok(0));
}
let result = ready!(f(buf));
match result {
match ready!(f(buf)) {
Ok(0) => {
log::trace!("{}: Disconnected during flush", self.tag());
Poll::Ready(Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write frame to transport",
)))
}
Ok(n) => {
if n == 0 {
log::trace!(
"{}: Disconnected during flush, written {}",
self.tag(),
n
);
Poll::Ready(Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write frame to transport",
)))
} else if n == buf.len() {
if n == buf.len() {
buf.clear();
Poll::Ready(Ok(0))
} else {
@ -566,33 +584,33 @@ impl WriteContext {
flags.remove(Flags::WR_PAUSED);
Poll::Pending
}
Poll::Ready(Ok(0)) => {
// all data has been written
flags.insert(Flags::WR_PAUSED);
if flags.is_task_waiting_for_write() {
flags.task_waiting_for_write_is_done();
inner.write_task.wake();
}
if flags.is_waiting_for_write() {
flags.waiting_for_write_is_done();
inner.dispatch_task.wake();
}
Poll::Ready(())
}
Poll::Ready(Ok(len)) => {
// if write buffer is smaller than high watermark value, turn off back-pressure
if len == 0 {
flags.insert(Flags::WR_PAUSED);
if flags.is_task_waiting_for_write() {
flags.task_waiting_for_write_is_done();
inner.write_task.wake();
}
if flags.is_waiting_for_write() {
flags.waiting_for_write_is_done();
inner.dispatch_task.wake();
}
Poll::Ready(())
} else if flags.contains(Flags::BUF_W_BACKPRESSURE)
if flags.contains(Flags::BUF_W_BACKPRESSURE)
&& len < inner.pool.get().write_params_high() << 1
{
flags.remove(Flags::BUF_W_BACKPRESSURE);
inner.dispatch_task.wake();
Poll::Pending
} else {
Poll::Pending
}
Poll::Pending
}
Poll::Ready(Err(e)) => {
self.close(Some(e));
self.0 .0.io_stopped(Some(e));
Poll::Ready(())
}
};
@ -600,54 +618,44 @@ impl WriteContext {
inner.flags.set(flags);
result
}
fn shutdown_filters(&self) {
let io = &self.0;
let st = &self.0 .0;
if st.flags.get().contains(Flags::IO_STOPPING_FILTERS) {
let flags = st.flags.get();
if !flags.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING) {
let filter = io.filter();
match filter.shutdown(io, &st.buffer, 0) {
Ok(Poll::Ready(())) => {
st.dispatch_task.wake();
st.insert_flags(Flags::IO_STOPPING);
}
Ok(Poll::Pending) => {
// check read buffer, if buffer is not consumed it is unlikely
// that filter will properly complete shutdown
if flags.contains(Flags::RD_PAUSED)
|| flags.contains(Flags::BUF_R_FULL | Flags::BUF_R_READY)
{
st.dispatch_task.wake();
st.insert_flags(Flags::IO_STOPPING);
}
}
Err(err) => {
st.io_stopped(Some(err));
}
}
if let Err(err) = filter.process_write_buf(io, &st.buffer, 0) {
st.io_stopped(Some(err));
}
}
}
}
}
impl Clone for WriteContext {
impl Clone for IoContext {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl WriteContextBuf {
pub fn set(&mut self, mut buf: BytesVec) {
if buf.is_empty() {
self.io.memory_pool().release_write_buf(buf);
} else if let Some(b) = self.buf.take() {
buf.extend_from_slice(&b);
self.io.memory_pool().release_write_buf(b);
self.buf = Some(buf);
} else if let Some(b) = self.io.0.buffer.set_write_destination(buf) {
// write buffer is already set
self.buf = Some(b);
}
// if write buffer is smaller than high watermark value, turn off back-pressure
let inner = &self.io.0;
let len = self.buf.as_ref().map(|b| b.len()).unwrap_or_default()
+ inner.buffer.write_destination_size();
let mut flags = inner.flags.get();
if len == 0 {
if flags.is_waiting_for_write() {
flags.waiting_for_write_is_done();
inner.dispatch_task.wake();
}
flags.insert(Flags::WR_PAUSED);
inner.flags.set(flags);
} else if flags.contains(Flags::BUF_W_BACKPRESSURE)
&& len < inner.pool.get().write_params_high() << 1
{
flags.remove(Flags::BUF_W_BACKPRESSURE);
inner.flags.set(flags);
inner.dispatch_task.wake();
}
}
pub fn take(&mut self) -> Option<BytesVec> {
if let Some(buf) = self.buf.take() {
Some(buf)
} else {
self.io.0.buffer.get_write_destination()
}
}
}

View file

@ -34,6 +34,7 @@ cfg-if = { workspace = true }
crossbeam-channel = { workspace = true }
socket2 = { workspace = true }
slab = { workspace = true }
nohash-hasher = { workspace = true }
# Windows specific dependencies
[target.'cfg(windows)'.dependencies]

View file

@ -1,15 +1,14 @@
#![allow(clippy::type_complexity)]
pub use std::os::fd::{AsRawFd, OwnedFd, RawFd};
use std::{cell::Cell, cell::RefCell, collections::HashMap, io, rc::Rc, sync::Arc};
use std::{cell::Cell, cell::RefCell, io, rc::Rc, sync::Arc};
use std::{num::NonZeroUsize, os::fd::BorrowedFd, pin::Pin, task::Poll, time::Duration};
use crossbeam_queue::SegQueue;
use nohash_hasher::IntMap;
use polling::{Event, Events, Poller};
use crate::{
op::Handler, op::Interest, syscall, AsyncifyPool, Entry, Key, ProactorBuilder,
};
use crate::{op::Handler, op::Interest, AsyncifyPool, Entry, Key, ProactorBuilder};
pub(crate) mod op;
@ -132,14 +131,6 @@ enum Change {
},
}
// #[derive(Debug)]
// struct BatchChange {
// fd: RawFd,
// batch: usize,
// user_data: usize,
// interest: InterestChange,
// }
pub struct DriverApi {
batch: usize,
changes: Rc<RefCell<Vec<Change>>>,
@ -191,7 +182,7 @@ impl DriverApi {
pub(crate) struct Driver {
poll: Arc<Poller>,
events: RefCell<Events>,
registry: RefCell<HashMap<RawFd, FdItem>>,
registry: RefCell<IntMap<RawFd, FdItem>>,
pool: AsyncifyPool,
pool_completed: Arc<SegQueue<Entry>>,
hid: Cell<usize>,
@ -212,7 +203,7 @@ impl Driver {
Ok(Self {
poll: Arc::new(Poller::new()?),
events: RefCell::new(events),
registry: RefCell::new(HashMap::default()),
registry: RefCell::new(Default::default()),
pool: builder.create_or_get_thread_pool(),
pool_completed: Arc::new(SegQueue::new()),
hid: Cell::new(0),
@ -241,34 +232,6 @@ impl Driver {
Key::new(self.as_raw_fd(), op)
}
fn renew(
&self,
fd: BorrowedFd,
renew_event: Event,
registry: &mut HashMap<RawFd, FdItem>,
) -> io::Result<()> {
if !renew_event.readable && !renew_event.writable {
// crate::log(format!("DELETE - {:?}", fd.as_raw_fd()));
if let Some(item) = registry.remove(&fd.as_raw_fd()) {
if !item.flags.contains(Flags::NEW) {
self.poll.delete(fd)?;
}
}
} else {
if let Some(item) = registry.get(&fd.as_raw_fd()) {
if item.flags.contains(Flags::NEW) {
// crate::log(format!("ADD - {:?}", fd.as_raw_fd()));
unsafe { self.poll.add(&fd, renew_event)? };
return Ok(());
}
}
// crate::log(format!("MODIFY - {:?} {:?}", fd.as_raw_fd(), renew_event));
self.poll.modify(fd, renew_event)?;
}
Ok(())
}
pub fn attach(&self, _fd: RawFd) -> io::Result<()> {
Ok(())
}
@ -299,32 +262,33 @@ impl Driver {
}
let mut events = self.events.borrow_mut();
let res = self.poll.wait(&mut events, timeout);
res?;
self.poll.wait(&mut events, timeout)?;
if events.is_empty() && timeout != Some(Duration::ZERO) && timeout.is_some() {
return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT));
}
// println!("POLL, events: {:?}", events.len());
if !events.is_empty() {
if events.is_empty() {
if timeout.is_some() && timeout != Some(Duration::ZERO) {
return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT));
}
} else {
// println!("POLL, events: {:?}", events.len());
let mut registry = self.registry.borrow_mut();
let mut handlers = self.handlers.take().unwrap();
for event in events.iter() {
let user_data = event.key;
let fd = user_data as RawFd;
log::debug!(
"receive {} for {:?} {:?}",
user_data,
event,
registry.get_mut(&fd)
);
let fd = event.key as RawFd;
log::debug!("Event {:?} for {:?}", event, registry.get(&fd));
if let Some(item) = registry.get_mut(&fd) {
self.handle_batch_event(event, item, &mut handlers);
if event.readable {
if let Some(user_data) = item.user_data(Interest::Readable) {
handlers[item.batch].readable(user_data)
}
}
if event.writable {
if let Some(user_data) = item.user_data(Interest::Writable) {
handlers[item.batch].writable(user_data)
}
}
}
}
drop(registry);
self.handlers.set(Some(handlers));
}
@ -348,31 +312,12 @@ impl Driver {
Ok(())
}
fn handle_batch_event(
&self,
event: Event,
item: &mut FdItem,
handlers: &mut [Box<dyn Handler>],
) {
if event.readable {
if let Some(user_data) = item.user_data(Interest::Readable) {
handlers[item.batch].readable(user_data)
}
}
if event.writable {
if let Some(user_data) = item.user_data(Interest::Writable) {
handlers[item.batch].writable(user_data)
}
}
}
/// re-calc driver changes
unsafe fn apply_changes(&self) -> io::Result<()> {
let mut changes = self.changes.borrow_mut();
if changes.is_empty() {
return Ok(());
}
log::debug!("Apply driver changes, {:?}", changes.len());
let mut registry = self.registry.borrow_mut();
@ -412,20 +357,26 @@ impl Driver {
};
if let Some(fd) = fd {
let result = registry.get_mut(&fd).and_then(|item| {
if let Some(item) = registry.get_mut(&fd) {
if item.flags.contains(Flags::CHANGED) {
item.flags.remove(Flags::CHANGED);
Some((item.event(fd as usize), item.flags.contains(Flags::NEW)))
} else {
None
}
});
if let Some((event, new)) = result {
self.renew(BorrowedFd::borrow_raw(fd), event, &mut registry)?;
if new {
if let Some(item) = registry.get_mut(&fd) {
let new = item.flags.contains(Flags::NEW);
let renew_event = item.event(fd as usize);
if !renew_event.readable && !renew_event.writable {
// crate::log(format!("DELETE - {:?}", fd.as_raw_fd()));
registry.remove(&fd);
if !new {
self.poll.delete(BorrowedFd::borrow_raw(fd))?;
}
} else if new {
item.flags.remove(Flags::NEW);
// crate::log(format!("ADD - {:?}", fd.as_raw_fd()));
unsafe { self.poll.add(fd, renew_event)? };
} else {
// crate::log(format!("MODIFY - {:?} {:?}", fd.as_raw_fd(), renew_event));
self.poll.modify(BorrowedFd::borrow_raw(fd), renew_event)?;
}
}
}
@ -436,7 +387,6 @@ impl Driver {
}
fn push_blocking(&self, user_data: usize) {
// -> Poll<io::Result<usize>> {
let poll = self.poll.clone();
let completed = self.pool_completed.clone();
let mut closure = move || {
@ -449,30 +399,26 @@ impl Driver {
completed.push(Entry::new(user_data, res));
poll.notify().ok();
};
loop {
match self.pool.dispatch(closure) {
Ok(()) => return,
Err(e) => {
closure = e.0;
self.poll_blocking();
}
}
while let Err(e) = self.pool.dispatch(closure) {
closure = e.0;
self.poll_blocking();
}
}
fn poll_blocking(&self) -> bool {
if self.pool_completed.is_empty() {
return false;
}
while let Some(entry) = self.pool_completed.pop() {
unsafe {
entry.notify();
false
} else {
while let Some(entry) = self.pool_completed.pop() {
unsafe {
entry.notify();
}
}
true
}
true
}
/// Get notification handle
pub fn handle(&self) -> NotifyHandle {
NotifyHandle::new(self.poll.clone())
}
@ -506,7 +452,7 @@ impl NotifyHandle {
Self { poll }
}
/// Notify the inner driver.
/// Notify the driver
pub fn notify(&self) -> io::Result<()> {
self.poll.notify()
}

View file

@ -1,9 +1,10 @@
use std::{io, marker::Send, os::fd::FromRawFd, os::fd::RawFd, pin::Pin, task::Poll};
use super::{syscall, AsRawFd, Decision, OpCode};
use crate::op::*;
pub use crate::unix::op::*;
use super::{AsRawFd, Decision, OpCode};
use crate::{op::*, syscall};
impl<D, F> OpCode for Asyncify<F, D>
where
D: Send + 'static,

View file

@ -37,7 +37,7 @@ default-rt = ["ntex-rt/default-rt", "ntex-runtime", "ntex-iodriver", "slab", "so
ntex-service = "3.3"
ntex-bytes = "0.1"
ntex-http = "0.1"
ntex-io = "2.8"
ntex-io = "2.11"
ntex-rt = "0.4.25"
ntex-util = "2.5"

View file

@ -6,7 +6,7 @@ use ntex_runtime::Runtime;
use slab::Slab;
use ntex_bytes::BufMut;
use ntex_io::{ReadContext, WriteContext};
use ntex_io::IoContext;
bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
@ -25,8 +25,7 @@ pub(crate) struct StreamCtl<T> {
struct TcpStreamItem<T> {
io: Option<T>,
fd: RawFd,
read: ReadContext,
write: WriteContext,
context: IoContext,
flags: Flags,
ref_count: usize,
}
@ -78,15 +77,9 @@ impl<T: AsRawFd + 'static> CompioOps<T> {
})
}
pub(crate) fn register(
&self,
io: T,
read: ReadContext,
write: WriteContext,
) -> StreamCtl<T> {
pub(crate) fn register(&self, io: T, context: IoContext) -> StreamCtl<T> {
let item = TcpStreamItem {
read,
write,
context,
fd: io.as_raw_fd(),
io: Some(io),
flags: Flags::empty(),
@ -146,7 +139,7 @@ impl<T> Handler for CompioOpsBatcher<T> {
match change {
Change::Readable => {
let item = &mut streams[id];
let result = item.read.with_buf(|buf| {
let result = item.context.with_read_buf(|buf| {
let chunk = buf.chunk_mut();
let b = chunk.as_mut_ptr();
Poll::Ready(
@ -155,7 +148,12 @@ impl<T> Handler for CompioOpsBatcher<T> {
))
.inspect(|size| {
unsafe { buf.advance_mut(*size) };
log::debug!("FD: {:?}, BUF: {:?}", item.fd, buf);
log::debug!(
"FD: {:?}, SIZE: {:?}, BUF: {:?}",
item.fd,
size,
buf
);
}),
)
});
@ -167,7 +165,7 @@ impl<T> Handler for CompioOpsBatcher<T> {
}
Change::Writable => {
let item = &mut streams[id];
let result = item.write.with_buf(|buf| {
let result = item.context.with_write_buf(|buf| {
let slice = &buf[..];
syscall!(
break libc::write(item.fd, slice.as_ptr() as _, slice.len())
@ -181,7 +179,7 @@ impl<T> Handler for CompioOpsBatcher<T> {
}
Change::Error(err) => {
if let Some(item) = streams.get_mut(id) {
item.read.set_stopped(Some(err));
item.context.stopped(Some(err));
if !item.flags.contains(Flags::ERROR) {
item.flags.insert(Flags::ERROR);
item.flags.remove(Flags::RD | Flags::WR);
@ -211,9 +209,20 @@ impl<T> Handler for CompioOpsBatcher<T> {
}
}
pub(crate) trait Closable {
async fn close(self) -> io::Result<()>;
}
impl<T> StreamCtl<T> {
pub(crate) fn take_io(&self) -> Option<T> {
self.with(|streams| streams[self.id].io.take())
pub(crate) async fn close(self) -> io::Result<()>
where
T: Closable,
{
if let Some(io) = self.with(|streams| streams[self.id].io.take()) {
io.close().await
} else {
Ok(())
}
}
pub(crate) fn with_io<F, R>(&self, f: F) -> R
@ -267,7 +276,7 @@ impl<T> StreamCtl<T> {
if !item.flags.contains(Flags::WR) {
log::debug!("Resume io write ({}), {:?}", self.id, item.fd);
let result = item.write.with_buf(|buf| {
let result = item.context.with_write_buf(|buf| {
log::debug!("Writing io ({}), buf: {:?}", self.id, buf.len());
let slice = &buf[..];
@ -275,7 +284,11 @@ impl<T> StreamCtl<T> {
});
if result.is_pending() {
log::debug!("Write is pending ({}), {:?}", self.id, item.read.flags());
log::debug!(
"Write is pending ({}), {:?}",
self.id,
item.context.flags()
);
item.flags.insert(Flags::WR);
self.inner

View file

@ -1,28 +1,30 @@
use std::{any, future::poll_fn, io, task::Poll};
use ntex_io::{
types, Handle, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus,
types, Handle, IoContext, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus,
};
use ntex_runtime::{net::TcpStream, net::UnixStream, spawn};
use super::driver::{CompioOps, StreamCtl};
use super::driver::{Closable, CompioOps, StreamCtl};
impl IoStream for super::TcpStream {
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
fn start(self, read: ReadContext, _: WriteContext) -> Option<Box<dyn Handle>> {
let io = self.0;
let ctl = CompioOps::current().register(io, read.clone(), write.clone());
let context = read.context();
let ctl = CompioOps::current().register(io, context.clone());
let ctl2 = ctl.clone();
spawn(async move { run(ctl, read, write).await }).detach();
spawn(async move { run(ctl, context).await }).detach();
Some(Box::new(HandleWrapper(ctl2)))
}
}
impl IoStream for super::UnixStream {
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
fn start(self, read: ReadContext, _: WriteContext) -> Option<Box<dyn Handle>> {
let io = self.0;
let ctl = CompioOps::current().register(io, read.clone(), write.clone());
spawn(async move { run(ctl, read, write).await }).detach();
let context = read.context();
let ctl = CompioOps::current().register(io, context.clone());
spawn(async move { run(ctl, context).await }).detach();
None
}
@ -42,10 +44,6 @@ impl Handle for HandleWrapper {
}
}
trait Closable {
async fn close(self) -> io::Result<()>;
}
impl Closable for TcpStream {
async fn close(self) -> io::Result<()> {
TcpStream::close(self).await
@ -64,24 +62,10 @@ enum Status {
Terminate,
}
async fn run<T: Closable>(ctl: StreamCtl<T>, read: ReadContext, write: WriteContext) {
async fn run<T: Closable>(ctl: StreamCtl<T>, context: IoContext) {
// Handle io read readiness
let st = poll_fn(|cx| {
read.shutdown_filters(cx);
let read_st = read.poll_ready(cx);
let write_st = write.poll_ready(cx);
//println!("\n\n");
//println!(
// "IO2 read-st {:?}, write-st: {:?}, flags: {:?}",
// read_st,
// write_st,
// read.io().flags()
//);
//println!("\n\n");
//let read = match read.poll_ready(cx) {
let read = match read_st {
let read = match context.poll_read_ready(cx) {
Poll::Ready(ReadStatus::Ready) => {
ctl.resume_read();
Poll::Pending
@ -93,7 +77,7 @@ async fn run<T: Closable>(ctl: StreamCtl<T>, read: ReadContext, write: WriteCont
}
};
let write = match write_st {
let write = match context.poll_write_ready(cx) {
Poll::Ready(WriteStatus::Ready) => {
ctl.resume_write();
Poll::Pending
@ -114,15 +98,10 @@ async fn run<T: Closable>(ctl: StreamCtl<T>, read: ReadContext, write: WriteCont
.await;
ctl.resume_write();
if st == Status::Shutdown {
write.wait_for_shutdown(true).await;
} else {
write.wait_for_shutdown(false).await;
}
context.shutdown(st == Status::Shutdown).await;
ctl.pause_all();
let io = ctl.take_io().unwrap();
let result = io.close().await;
let result = ctl.close().await;
read.set_stopped(result.err());
context.stopped(result.err());
}

View file

@ -289,12 +289,12 @@ mod default_rt {
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
pub fn spawn<F>(f: F) -> Task<F::Output>
where
F: Future + 'static,
{
let ptr = crate::CB.with(|cb| (cb.borrow().0)());
let fut = ntex_runtime::spawn(async move {
let task = ntex_runtime::spawn(async move {
if let Some(ptr) = ptr {
let mut f = std::pin::pin!(f);
let result = poll_fn(|ctx| {
@ -311,7 +311,7 @@ mod default_rt {
}
});
JoinHandle { fut: Some(fut) }
Task { task: Some(task) }
}
/// Executes a future on the current thread. This does not create a new Arbiter
@ -322,7 +322,7 @@ mod default_rt {
///
/// This function panics if ntex system is not running.
#[inline]
pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output>
pub fn spawn_fn<F, R>(f: F) -> Task<R::Output>
where
F: FnOnce() -> R + 'static,
R: Future + 'static,
@ -330,6 +330,35 @@ mod default_rt {
spawn(async move { f().await })
}
/// A spawned task.
pub struct Task<T> {
task: Option<ntex_runtime::Task<T>>,
}
impl<T> Task<T> {
pub fn is_finished(&self) -> bool {
if let Some(hnd) = &self.task {
hnd.is_finished()
} else {
true
}
}
}
impl<T> Drop for Task<T> {
fn drop(&mut self) {
self.task.take().unwrap().detach();
}
}
impl<T> Future for Task<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(self.task.as_mut().unwrap()).poll(cx)
}
}
#[derive(Debug, Copy, Clone)]
pub struct JoinError;

View file

@ -33,7 +33,6 @@ ntex-iodriver = "0.1"
async-task = { workspace = true }
cfg-if = { workspace = true }
crossbeam-queue = { workspace = true }
futures-util = { workspace = true }
scoped-tls = { workspace = true }
fxhash = { workspace = true }
log = { workspace = true }

View file

@ -1,16 +1,13 @@
#![allow(clippy::type_complexity)]
use std::any::{Any, TypeId};
use std::collections::{HashMap, VecDeque};
use std::future::{ready, Future};
use std::task::Context;
use std::{
cell::Cell, cell::RefCell, io, panic::AssertUnwindSafe, sync::Arc, thread,
cell::Cell, cell::RefCell, future::Future, io, sync::Arc, task::Context, thread,
time::Duration,
};
use async_task::{Runnable, Task};
use crossbeam_queue::SegQueue;
use futures_util::{future::Either, FutureExt};
use ntex_iodriver::{
op::Asyncify, AsRawFd, Key, NotifyHandle, OpCode, Proactor, ProactorBuilder, PushEntry,
RawFd,
@ -51,6 +48,238 @@ impl RemoteHandle {
}
}
/// The async runtime for ntex. It is a thread local runtime, and cannot be
/// sent to other threads.
pub struct Runtime {
driver: Proactor,
runnables: Arc<RunnableQueue>,
event_interval: usize,
data: RefCell<HashMap<TypeId, Box<dyn Any>, fxhash::FxBuildHasher>>,
}
impl Runtime {
/// Create [`Runtime`] with default config.
pub fn new() -> io::Result<Self> {
Self::builder().build()
}
/// Create a builder for [`Runtime`].
pub fn builder() -> RuntimeBuilder {
RuntimeBuilder::new()
}
#[allow(clippy::arc_with_non_send_sync)]
fn with_builder(builder: &RuntimeBuilder) -> io::Result<Self> {
Ok(Self {
driver: builder.proactor_builder.build()?,
runnables: Arc::new(RunnableQueue::new()),
event_interval: builder.event_interval,
data: RefCell::new(HashMap::default()),
})
}
/// Perform a function on the current runtime.
///
/// ## Panics
///
/// This method will panic if there are no running [`Runtime`].
pub fn with_current<T, F: FnOnce(&Self) -> T>(f: F) -> T {
#[cold]
fn not_in_ntex_runtime() -> ! {
panic!("not in a ntex runtime")
}
if CURRENT_RUNTIME.is_set() {
CURRENT_RUNTIME.with(f)
} else {
not_in_ntex_runtime()
}
}
/// Get current driver
pub fn driver(&self) -> &Proactor {
&self.driver
}
/// Get handle for current runtime
pub fn handle(&self) -> RemoteHandle {
RemoteHandle {
handle: self.driver.handle(),
runnables: self.runnables.clone(),
}
}
/// Attach a raw file descriptor/handle/socket to the runtime.
///
/// You only need this when authoring your own high-level APIs. High-level
/// resources in this crate are attached automatically.
pub fn attach(&self, fd: RawFd) -> io::Result<()> {
self.driver.attach(fd)
}
/// Block on the future till it completes.
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
CURRENT_RUNTIME.set(self, || {
let mut result = None;
unsafe { self.spawn_unchecked(async { result = Some(future.await) }) }.detach();
self.runnables.run(self.event_interval);
loop {
if let Some(result) = result.take() {
return result;
}
self.poll_with_driver(self.runnables.has_tasks(), || {
self.runnables.run(self.event_interval);
});
}
})
}
/// Spawns a new asynchronous task, returning a [`Task`] for it.
///
/// Spawning a task enables the task to execute concurrently to other tasks.
/// There is no guarantee that a spawned task will execute to completion.
pub fn spawn<F: Future + 'static>(&self, future: F) -> Task<F::Output> {
unsafe { self.spawn_unchecked(future) }
}
/// Spawns a new asynchronous task, returning a [`Task`] for it.
///
/// # Safety
///
/// The caller should ensure the captured lifetime is long enough.
pub unsafe fn spawn_unchecked<F: Future>(&self, future: F) -> Task<F::Output> {
let runnables = self.runnables.clone();
let handle = self.driver.handle();
let schedule = move |runnable| {
runnables.schedule(runnable, &handle);
};
let (runnable, task) = async_task::spawn_unchecked(future, schedule);
runnable.schedule();
task
}
/// Spawns a blocking task in a new thread, and wait for it.
///
/// The task will not be cancelled even if the future is dropped.
pub fn spawn_blocking<F, R>(&self, f: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let op = Asyncify::new(move || {
let res = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
(Ok(0), res)
});
// It is safe to use `submit` here because the task is spawned immediately.
unsafe {
let fut = self.submit_with_flags(op);
self.spawn_unchecked(async move { fut.await.0 .1.into_inner() })
}
}
fn submit_raw<T: OpCode + 'static>(
&self,
op: T,
) -> PushEntry<Key<T>, (io::Result<usize>, T)> {
self.driver.push(op)
}
fn submit_with_flags<T: OpCode + 'static>(
&self,
op: T,
) -> impl Future<Output = ((io::Result<usize>, T), u32)> {
let fut = self.submit_raw(op);
async move {
match fut {
PushEntry::Pending(user_data) => OpFuture::new(user_data).await,
PushEntry::Ready(res) => {
// submit_flags won't be ready immediately, if ready, it must be error without
// flags
(res, 0)
}
}
}
}
pub(crate) fn cancel_op<T: OpCode>(&self, op: Key<T>) {
self.driver.cancel(op);
}
pub(crate) fn poll_task<T: OpCode>(
&self,
cx: &mut Context,
op: Key<T>,
) -> PushEntry<Key<T>, ((io::Result<usize>, T), u32)> {
self.driver.pop(op).map_pending(|mut k| {
self.driver.update_waker(&mut k, cx.waker().clone());
k
})
}
fn poll_with_driver<F: FnOnce()>(&self, has_tasks: bool, f: F) {
let timeout = if has_tasks {
Some(Duration::ZERO)
} else {
None
};
if let Err(e) = self.driver.poll(timeout, f) {
match e.kind() {
io::ErrorKind::TimedOut | io::ErrorKind::Interrupted => {
log::debug!("expected error: {e}");
}
_ => panic!("{e:?}"),
}
}
}
/// Insert a type into this runtime.
pub fn insert<T: 'static>(&self, val: T) {
self.data
.borrow_mut()
.insert(TypeId::of::<T>(), Box::new(val));
}
/// Check if container contains entry
pub fn contains<T: 'static>(&self) -> bool {
self.data.borrow().contains_key(&TypeId::of::<T>())
}
/// Get a reference to a type previously inserted on this runtime.
pub fn get<T>(&self) -> Option<T>
where
T: Clone + 'static,
{
self.data
.borrow()
.get(&TypeId::of::<T>())
.and_then(|boxed| boxed.downcast_ref().cloned())
}
}
impl AsRawFd for Runtime {
fn as_raw_fd(&self) -> RawFd {
self.driver.as_raw_fd()
}
}
impl Drop for Runtime {
fn drop(&mut self) {
CURRENT_RUNTIME.set(self, || {
while self.runnables.sync_runnables.pop().is_some() {}
loop {
let runnable = self.runnables.local_runnables.borrow_mut().pop_front();
if runnable.is_none() {
break;
}
}
})
}
}
struct RunnableQueue {
id: thread::ThreadId,
idle: Cell<bool>,
@ -108,266 +337,6 @@ impl RunnableQueue {
}
}
/// The async runtime for ntex. It is a thread local runtime, and cannot be
/// sent to other threads.
pub struct Runtime {
driver: Proactor,
runnables: Arc<RunnableQueue>,
event_interval: usize,
data: RefCell<HashMap<TypeId, Box<dyn Any>, fxhash::FxBuildHasher>>,
}
impl Runtime {
/// Create [`Runtime`] with default config.
pub fn new() -> io::Result<Self> {
Self::builder().build()
}
/// Create a builder for [`Runtime`].
pub fn builder() -> RuntimeBuilder {
RuntimeBuilder::new()
}
#[allow(clippy::arc_with_non_send_sync)]
fn with_builder(builder: &RuntimeBuilder) -> io::Result<Self> {
Ok(Self {
driver: builder.proactor_builder.build()?,
runnables: Arc::new(RunnableQueue::new()),
event_interval: builder.event_interval,
data: RefCell::new(HashMap::default()),
})
}
/// Try to perform a function on the current runtime, and if no runtime is
/// running, return the function back.
pub fn try_with_current<T, F: FnOnce(&Self) -> T>(f: F) -> Result<T, F> {
if CURRENT_RUNTIME.is_set() {
Ok(CURRENT_RUNTIME.with(f))
} else {
Err(f)
}
}
/// Perform a function on the current runtime.
///
/// ## Panics
///
/// This method will panic if there are no running [`Runtime`].
pub fn with_current<T, F: FnOnce(&Self) -> T>(f: F) -> T {
#[cold]
fn not_in_ntex_runtime() -> ! {
panic!("not in a ntex runtime")
}
if CURRENT_RUNTIME.is_set() {
CURRENT_RUNTIME.with(f)
} else {
not_in_ntex_runtime()
}
}
/// Get current driver
pub fn driver(&self) -> &Proactor {
&self.driver
}
/// Get handle for current runtime
pub fn handle(&self) -> RemoteHandle {
RemoteHandle {
handle: self.driver.handle(),
runnables: self.runnables.clone(),
}
}
/// Set this runtime as current runtime, and perform a function in the
/// current scope.
pub fn enter<T, F: FnOnce() -> T>(&self, f: F) -> T {
CURRENT_RUNTIME.set(self, f)
}
/// Spawns a new asynchronous task, returning a [`Task`] for it.
///
/// # Safety
///
/// The caller should ensure the captured lifetime long enough.
pub unsafe fn spawn_unchecked<F: Future>(&self, future: F) -> Task<F::Output> {
let runnables = self.runnables.clone();
let handle = self.driver.handle();
let schedule = move |runnable| {
runnables.schedule(runnable, &handle);
};
let (runnable, task) = async_task::spawn_unchecked(future, schedule);
runnable.schedule();
task
}
/// Low level API to control the runtime.
///
/// Run the scheduled tasks.
///
/// The return value indicates whether there are still tasks in the queue.
pub fn run(&self) -> bool {
self.runnables.run(self.event_interval);
self.runnables.has_tasks()
}
/// Block on the future till it completes.
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
CURRENT_RUNTIME.set(self, || {
let mut result = None;
unsafe { self.spawn_unchecked(async { result = Some(future.await) }) }.detach();
self.runnables.run(self.event_interval);
loop {
if let Some(result) = result.take() {
return result;
}
self.poll_with_driver(self.runnables.has_tasks(), || {
self.runnables.run(self.event_interval);
});
}
})
}
/// Spawns a new asynchronous task, returning a [`Task`] for it.
///
/// Spawning a task enables the task to execute concurrently to other tasks.
/// There is no guarantee that a spawned task will execute to completion.
pub fn spawn<F: Future + 'static>(&self, future: F) -> JoinHandle<F::Output> {
unsafe { self.spawn_unchecked(AssertUnwindSafe(future).catch_unwind()) }
}
/// Spawns a blocking task in a new thread, and wait for it.
///
/// The task will not be cancelled even if the future is dropped.
pub fn spawn_blocking<T: Send + 'static>(
&self,
f: impl (FnOnce() -> T) + Send + 'static,
) -> JoinHandle<T> {
let op = Asyncify::new(move || {
let res = std::panic::catch_unwind(AssertUnwindSafe(f));
(Ok(0), res)
});
// It is safe and sound to use `submit` here because the task is spawned
// immediately.
unsafe {
self.spawn_unchecked(
self.submit_with_flags(op)
.map(|(res, _)| res)
.map(|res| res.1.into_inner()),
)
}
}
/// Attach a raw file descriptor/handle/socket to the runtime.
///
/// You only need this when authoring your own high-level APIs. High-level
/// resources in this crate are attached automatically.
pub fn attach(&self, fd: RawFd) -> io::Result<()> {
self.driver.attach(fd)
}
fn submit_raw<T: OpCode + 'static>(
&self,
op: T,
) -> PushEntry<Key<T>, (io::Result<usize>, T)> {
self.driver.push(op)
}
fn submit_with_flags<T: OpCode + 'static>(
&self,
op: T,
) -> impl Future<Output = ((io::Result<usize>, T), u32)> {
match self.submit_raw(op) {
PushEntry::Pending(user_data) => Either::Left(OpFuture::new(user_data)),
PushEntry::Ready(res) => {
// submit_flags won't be ready immediately, if ready, it must be error without
// flags
Either::Right(ready((res, 0)))
}
}
}
pub(crate) fn cancel_op<T: OpCode>(&self, op: Key<T>) {
self.driver.cancel(op);
}
pub(crate) fn poll_task<T: OpCode>(
&self,
cx: &mut Context,
op: Key<T>,
) -> PushEntry<Key<T>, ((io::Result<usize>, T), u32)> {
self.driver.pop(op).map_pending(|mut k| {
self.driver.update_waker(&mut k, cx.waker().clone());
k
})
}
fn poll_with_driver<F: FnOnce()>(&self, has_tasks: bool, f: F) {
let timeout = if has_tasks {
Some(Duration::ZERO)
} else {
None
};
match self.driver.poll(timeout, f) {
Ok(()) => {}
Err(e) => match e.kind() {
io::ErrorKind::TimedOut | io::ErrorKind::Interrupted => {
log::debug!("expected error: {e}");
}
_ => {
panic!("{e:?}")
}
},
}
}
/// Insert a type into this runtime.
pub fn insert<T: 'static>(&self, val: T) {
self.data
.borrow_mut()
.insert(TypeId::of::<T>(), Box::new(val));
}
/// Check if container contains entry
pub fn contains<T: 'static>(&self) -> bool {
self.data.borrow().contains_key(&TypeId::of::<T>())
}
/// Get a reference to a type previously inserted on this runtime.
pub fn get<T>(&self) -> Option<T>
where
T: Clone + 'static,
{
self.data
.borrow()
.get(&TypeId::of::<T>())
.and_then(|boxed| boxed.downcast_ref().cloned())
}
}
impl Drop for Runtime {
fn drop(&mut self) {
self.enter(|| {
while self.runnables.sync_runnables.pop().is_some() {}
loop {
let runnable = self.runnables.local_runnables.borrow_mut().pop_front();
if runnable.is_none() {
break;
}
}
})
}
}
impl AsRawFd for Runtime {
fn as_raw_fd(&self) -> RawFd {
self.driver.as_raw_fd()
}
}
/// Builder for [`Runtime`].
#[derive(Debug, Clone)]
pub struct RuntimeBuilder {
@ -420,7 +389,7 @@ impl RuntimeBuilder {
///
/// This method doesn't create runtime. It tries to obtain the current runtime
/// by [`Runtime::with_current`].
pub fn spawn<F: Future + 'static>(future: F) -> JoinHandle<F::Output> {
pub fn spawn<F: Future + 'static>(future: F) -> Task<F::Output> {
Runtime::with_current(|r| r.spawn(future))
}

View file

@ -73,9 +73,9 @@ ntex-util = "2.8"
ntex-bytes = "0.1.27"
ntex-server = "2.7"
ntex-h2 = "1.8.1"
ntex-rt = "0.4.22"
ntex-io = "2.9"
ntex-net = "2.4"
ntex-rt = "0.4.25"
ntex-io = "2.11"
ntex-net = "2.5"
ntex-tls = "2.3"
base64 = "0.22"

View file

@ -220,7 +220,7 @@ async fn test_connection_reuse() {
)))
});
let client = Client::build().timeout(Seconds(10)).finish();
let client = Client::build().timeout(Seconds(30)).finish();
// req 1
let request = client.get(srv.url("/")).send();
@ -255,7 +255,7 @@ async fn test_connection_force_close() {
)))
});
let client = Client::build().timeout(Seconds(10)).finish();
let client = Client::build().timeout(Seconds(30)).finish();
// req 1
let request = client.get(srv.url("/")).force_close().send();
@ -263,7 +263,7 @@ async fn test_connection_force_close() {
assert!(response.status().is_success());
// req 2
let client = Client::build().timeout(Seconds(10)).finish();
let client = Client::build().timeout(Seconds(30)).finish();
let req = client.post(srv.url("/")).force_close();
let response = req.send().await.unwrap();
assert!(response.status().is_success());
@ -291,7 +291,7 @@ async fn test_connection_server_close() {
)))
});
let client = Client::build().timeout(Seconds(10)).finish();
let client = Client::build().timeout(Seconds(30)).finish();
// req 1
let request = client.get(srv.url("/")).send();
@ -814,7 +814,7 @@ async fn client_read_until_eof() {
// client request
let req = Client::build()
.timeout(Seconds(5))
.timeout(Seconds(30))
.finish()
.get(format!("http://{}/", addr).as_str());
let mut response = req.send().await.unwrap();

View file

@ -868,7 +868,7 @@ async fn test_reading_deflate_encoding_large_random_rustls() {
// client request
let req = srv
.post("/")
.timeout(Millis(10_000))
.timeout(Millis(30_000))
.header(CONTENT_ENCODING, "deflate")
.send_stream(TestBody::new(Bytes::from(enc), 1024));
@ -909,7 +909,7 @@ async fn test_reading_deflate_encoding_large_random_rustls_h1() {
// client request
let req = srv
.post("/")
.timeout(Millis(10_000))
.timeout(Millis(30_000))
.header(CONTENT_ENCODING, "deflate")
.send_stream(TestBody::new(Bytes::from(enc), 1024));
@ -950,7 +950,7 @@ async fn test_reading_deflate_encoding_large_random_rustls_h2() {
// client request
let req = srv
.post("/")
.timeout(Millis(10_000))
.timeout(Millis(30_000))
.header(CONTENT_ENCODING, "deflate")
.send_stream(TestBody::new(Bytes::from(enc), 1024));