diff --git a/Cargo.toml b/Cargo.toml index e6e8e22f..7461076d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,6 +64,7 @@ futures-util = "0.3.29" fxhash = "0.2" libc = "0.2.164" log = "0.4" +nohash-hasher = "0.2.0" scoped-tls = "1.0.1" slab = "0.4.9" socket2 = "0.5.6" diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index 508c0ada..ff7201c9 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.11.0] - 2025-03-10 + +* Add single io context + ## [2.10.0] - 2025-02-26 * Impl Filter for Sealed #506 diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index d8ab4eed..40d4ed20 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "2.10.0" +version = "2.11.0" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index 9efab367..4c03d312 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -1244,6 +1244,8 @@ mod tests { sleep(Millis(50)).await; if let DispatchItem::Item(msg) = msg { Ok::<_, ()>(Some(msg.freeze())) + } else if let DispatchItem::Disconnect(_) = msg { + Ok::<_, ()>(None) } else { panic!() } diff --git a/ntex-io/src/lib.rs b/ntex-io/src/lib.rs index 2cf628da..6d4b6bdd 100644 --- a/ntex-io/src/lib.rs +++ b/ntex-io/src/lib.rs @@ -29,7 +29,7 @@ pub use self::filter::{Base, Filter, Layer}; pub use self::framed::Framed; pub use self::io::{Io, IoRef, OnDisconnect}; pub use self::seal::{IoBoxed, Sealed}; -pub use self::tasks::{ReadContext, WriteContext, WriteContextBuf}; +pub use self::tasks::{IoContext, ReadContext, WriteContext, WriteContextBuf}; pub use self::timer::TimerHandle; pub use self::utils::{seal, Decoded}; @@ -77,7 +77,7 @@ pub trait FilterLayer: fmt::Debug + 'static { #[inline] /// Check readiness for read operations - fn poll_read_ready(&self, waker: &mut Context<'_>) -> Poll { + fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll { Poll::Ready(ReadStatus::Ready) } diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index 6447c139..ae67263f 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -19,26 +19,21 @@ impl ReadContext { Self(io.clone(), Cell::new(None)) } + #[doc(hidden)] + #[inline] + /// Io tag + pub fn context(&self) -> IoContext { + IoContext::new(&self.0) + } + #[inline] /// Io tag pub fn tag(&self) -> &'static str { self.0.tag() } - #[doc(hidden)] - /// Io flags - pub fn flags(&self) -> crate::flags::Flags { - self.0.flags() - } - - #[inline] - /// Check readiness for read operations - pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll { - self.0.filter().poll_read_ready(cx) - } - /// Wait when io get closed or preparing for close - pub async fn wait_for_close(&self) { + async fn wait_for_close(&self) { poll_fn(|cx| { let flags = self.0.flags(); @@ -55,111 +50,6 @@ impl ReadContext { .await } - #[inline] - /// Get io error - pub fn set_stopped(&self, e: Option) { - self.0 .0.io_stopped(e); - } - - /// Get read buffer - pub fn with_buf(&self, f: F) -> Poll<()> - where - F: FnOnce(&mut BytesVec) -> Poll>, - { - let inner = &self.0 .0; - let (hw, lw) = self.0.memory_pool().read_params().unpack(); - let result = inner.buffer.with_read_source(&self.0, |buf| { - // make sure we've got room - let remaining = buf.remaining_mut(); - if remaining < lw { - buf.reserve(hw - remaining); - } - - // call provided callback - f(buf) - }); - - // handle buffer changes - match result { - Poll::Ready(Ok(0)) => { - inner.io_stopped(None); - Poll::Ready(()) - } - Poll::Ready(Ok(nbytes)) => { - let filter = self.0.filter(); - let _ = filter - .process_read_buf(&self.0, &inner.buffer, 0, nbytes) - .and_then(|status| { - if status.nbytes > 0 { - // dest buffer has new data, wake up dispatcher - if inner.buffer.read_destination_size() >= hw { - log::trace!( - "{}: Io read buffer is too large {}, enable read back-pressure", - self.0.tag(), - nbytes - ); - inner.insert_flags(Flags::BUF_R_READY | Flags::BUF_R_FULL); - } else { - inner.insert_flags(Flags::BUF_R_READY); - - if nbytes >= hw { - // read task is paused because of read back-pressure - // but there is no new data in top most read buffer - // so we need to wake up read task to read more data - // otherwise read task would sleep forever - inner.read_task.wake(); - } - } - log::trace!( - "{}: New {} bytes available, wakeup dispatcher", - self.0.tag(), - nbytes - ); - inner.dispatch_task.wake(); - } else { - if nbytes >= hw { - // read task is paused because of read back-pressure - // but there is no new data in top most read buffer - // so we need to wake up read task to read more data - // otherwise read task would sleep forever - inner.read_task.wake(); - } - if inner.flags.get().contains(Flags::RD_NOTIFY) { - // in case of "notify" we must wake up dispatch task - // if we read any data from source - inner.dispatch_task.wake(); - } - } - - // while reading, filter wrote some data - // in that case filters need to process write buffers - // and potentialy wake write task - if status.need_write { - filter.process_write_buf(&self.0, &inner.buffer, 0) - } else { - Ok(()) - } - }) - .map_err(|err| { - inner.dispatch_task.wake(); - inner.io_stopped(Some(err)); - inner.insert_flags(Flags::BUF_R_READY); - }); - Poll::Pending - } - Poll::Ready(Err(e)) => { - inner.io_stopped(Some(e)); - Poll::Ready(()) - } - Poll::Pending => { - if inner.flags.get().contains(Flags::IO_STOPPING_FILTERS) { - shutdown_filters(&self.0); - } - Poll::Pending - } - } - } - /// Handle read io operations pub async fn handle(&self, io: &mut T) where @@ -283,70 +173,18 @@ impl ReadContext { } } - pub fn shutdown_filters(&self, cx: &mut Context<'_>) { + fn shutdown_filters(&self, cx: &mut Context<'_>) { let st = &self.0 .0; + let filter = self.0.filter(); - if st.flags.get().contains(Flags::IO_STOPPING_FILTERS) { - let filter = self.0.filter(); - - match filter.shutdown(&self.0, &st.buffer, 0) { - Ok(Poll::Ready(())) => { - st.dispatch_task.wake(); - st.insert_flags(Flags::IO_STOPPING); - } - Ok(Poll::Pending) => { - let flags = st.flags.get(); - - // check read buffer, if buffer is not consumed it is unlikely - // that filter will properly complete shutdown - if flags.contains(Flags::RD_PAUSED) - || flags.contains(Flags::BUF_R_FULL | Flags::BUF_R_READY) - { - st.dispatch_task.wake(); - st.insert_flags(Flags::IO_STOPPING); - } else { - // filter shutdown timeout - let timeout = self - .1 - .take() - .unwrap_or_else(|| sleep(st.disconnect_timeout.get())); - if timeout.poll_elapsed(cx).is_ready() { - st.dispatch_task.wake(); - st.insert_flags(Flags::IO_STOPPING); - } else { - self.1.set(Some(timeout)); - } - } - } - Err(err) => { - st.io_stopped(Some(err)); - } - } - if let Err(err) = filter.process_write_buf(&self.0, &st.buffer, 0) { - st.io_stopped(Some(err)); - } - } - } -} - -impl Clone for ReadContext { - fn clone(&self) -> Self { - Self(self.0.clone(), Cell::new(None)) - } -} - -fn shutdown_filters(io: &IoRef) { - let st = &io.0; - let flags = st.flags.get(); - - if !flags.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING) { - let filter = io.filter(); - match filter.shutdown(io, &st.buffer, 0) { + match filter.shutdown(&self.0, &st.buffer, 0) { Ok(Poll::Ready(())) => { st.dispatch_task.wake(); st.insert_flags(Flags::IO_STOPPING); } Ok(Poll::Pending) => { + let flags = st.flags.get(); + // check read buffer, if buffer is not consumed it is unlikely // that filter will properly complete shutdown if flags.contains(Flags::RD_PAUSED) @@ -354,13 +192,25 @@ fn shutdown_filters(io: &IoRef) { { st.dispatch_task.wake(); st.insert_flags(Flags::IO_STOPPING); + } else { + // filter shutdown timeout + let timeout = self + .1 + .take() + .unwrap_or_else(|| sleep(st.disconnect_timeout.get())); + if timeout.poll_elapsed(cx).is_ready() { + st.dispatch_task.wake(); + st.insert_flags(Flags::IO_STOPPING); + } else { + self.1.set(Some(timeout)); + } } } Err(err) => { st.io_stopped(Some(err)); } } - if let Err(err) = filter.process_write_buf(io, &st.buffer, 0) { + if let Err(err) = filter.process_write_buf(&self.0, &st.buffer, 0) { st.io_stopped(Some(err)); } } @@ -393,12 +243,6 @@ impl WriteContext { poll_fn(|cx| self.0.filter().poll_write_ready(cx)).await } - #[inline] - /// Check readiness for write operations - pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll { - self.0.filter().poll_write_ready(cx) - } - /// Indicate that write io task is stopped fn close(&self, err: Option) { self.0 .0.io_stopped(err); @@ -417,11 +261,143 @@ impl WriteContext { .await } - /// Wait when io get closed or preparing for close - pub async fn wait_for_shutdown(&self, flush_buf: bool) { - let st = &self.0 .0; + /// Handle write io operations + pub async fn handle(&self, io: &mut T) + where + T: AsyncWrite, + { + let mut buf = WriteContextBuf { + io: self.0.clone(), + buf: None, + }; - // filter shutdown timeout + loop { + match self.ready().await { + WriteStatus::Ready => { + // write io stream + match select(io.write(&mut buf), self.when_stopped()).await { + Either::Left(Ok(_)) => continue, + Either::Left(Err(e)) => self.close(Some(e)), + Either::Right(_) => return, + } + } + WriteStatus::Shutdown => { + log::trace!("{}: Write task is instructed to shutdown", self.tag()); + + let fut = async { + // write io stream + io.write(&mut buf).await?; + io.flush().await?; + io.shutdown().await?; + Ok(()) + }; + match select(sleep(self.0 .0.disconnect_timeout.get()), fut).await { + Either::Left(_) => self.close(None), + Either::Right(res) => self.close(res.err()), + } + } + WriteStatus::Terminate => { + log::trace!("{}: Write task is instructed to terminate", self.tag()); + self.close(io.shutdown().await.err()); + } + } + return; + } + } +} + +impl WriteContextBuf { + pub fn set(&mut self, mut buf: BytesVec) { + if buf.is_empty() { + self.io.memory_pool().release_write_buf(buf); + } else if let Some(b) = self.buf.take() { + buf.extend_from_slice(&b); + self.io.memory_pool().release_write_buf(b); + self.buf = Some(buf); + } else if let Some(b) = self.io.0.buffer.set_write_destination(buf) { + // write buffer is already set + self.buf = Some(b); + } + + // if write buffer is smaller than high watermark value, turn off back-pressure + let inner = &self.io.0; + let len = self.buf.as_ref().map(|b| b.len()).unwrap_or_default() + + inner.buffer.write_destination_size(); + let mut flags = inner.flags.get(); + + if len == 0 { + if flags.is_waiting_for_write() { + flags.waiting_for_write_is_done(); + inner.dispatch_task.wake(); + } + flags.insert(Flags::WR_PAUSED); + inner.flags.set(flags); + } else if flags.contains(Flags::BUF_W_BACKPRESSURE) + && len < inner.pool.get().write_params_high() << 1 + { + flags.remove(Flags::BUF_W_BACKPRESSURE); + inner.flags.set(flags); + inner.dispatch_task.wake(); + } + } + + pub fn take(&mut self) -> Option { + if let Some(buf) = self.buf.take() { + Some(buf) + } else { + self.io.0.buffer.get_write_destination() + } + } +} + +/// Context for io read task +pub struct IoContext(IoRef); + +impl fmt::Debug for IoContext { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("IoContext").field("io", &self.0).finish() + } +} + +impl IoContext { + pub(crate) fn new(io: &IoRef) -> Self { + Self(io.clone()) + } + + #[inline] + /// Io tag + pub fn tag(&self) -> &'static str { + self.0.tag() + } + + #[doc(hidden)] + /// Io flags + pub fn flags(&self) -> crate::flags::Flags { + self.0.flags() + } + + #[inline] + /// Check readiness for read operations + pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll { + self.shutdown_filters(); + self.0.filter().poll_read_ready(cx) + } + + #[inline] + /// Check readiness for write operations + pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll { + self.0.filter().poll_write_ready(cx) + } + + #[inline] + /// Get io error + pub fn stopped(&self, e: Option) { + self.0 .0.io_stopped(e); + } + + /// Wait when io get closed or preparing for close + pub async fn shutdown(&self, flush_buf: bool) { + let st = &self.0 .0; let mut timeout = None; poll_fn(|cx| { @@ -471,83 +447,125 @@ impl WriteContext { } } - /// Handle write io operations - pub async fn handle(&self, io: &mut T) + /// Get read buffer + pub fn with_read_buf(&self, f: F) -> Poll<()> where - T: AsyncWrite, + F: FnOnce(&mut BytesVec) -> Poll>, { - let mut buf = WriteContextBuf { - io: self.0.clone(), - buf: None, - }; - - loop { - match self.ready().await { - WriteStatus::Ready => { - // write io stream - match select(io.write(&mut buf), self.when_stopped()).await { - Either::Left(Ok(_)) => continue, - Either::Left(Err(e)) => self.close(Some(e)), - Either::Right(_) => return, - } - } - WriteStatus::Shutdown => { - log::trace!("{}: Write task is instructed to shutdown", self.tag()); - - let fut = async { - // write io stream - io.write(&mut buf).await?; - io.flush().await?; - io.shutdown().await?; - Ok(()) - }; - match select(sleep(self.0 .0.disconnect_timeout.get()), fut).await { - Either::Left(_) => self.close(None), - Either::Right(res) => self.close(res.err()), - } - } - WriteStatus::Terminate => { - log::trace!("{}: Write task is instructed to terminate", self.tag()); - self.close(io.shutdown().await.err()); - } + let inner = &self.0 .0; + let (hw, lw) = self.0.memory_pool().read_params().unpack(); + let result = inner.buffer.with_read_source(&self.0, |buf| { + // make sure we've got room + let remaining = buf.remaining_mut(); + if remaining < lw { + buf.reserve(hw - remaining); + } + + f(buf) + }); + + // handle buffer changes + match result { + Poll::Ready(Ok(0)) => { + inner.io_stopped(None); + Poll::Ready(()) + } + Poll::Ready(Ok(nbytes)) => { + let filter = self.0.filter(); + let _ = filter + .process_read_buf(&self.0, &inner.buffer, 0, nbytes) + .and_then(|status| { + if status.nbytes > 0 { + // dest buffer has new data, wake up dispatcher + if inner.buffer.read_destination_size() >= hw { + log::trace!( + "{}: Io read buffer is too large {}, enable read back-pressure", + self.0.tag(), + nbytes + ); + inner.insert_flags(Flags::BUF_R_READY | Flags::BUF_R_FULL); + } else { + inner.insert_flags(Flags::BUF_R_READY); + + if nbytes >= hw { + // read task is paused because of read back-pressure + // but there is no new data in top most read buffer + // so we need to wake up read task to read more data + // otherwise read task would sleep forever + inner.read_task.wake(); + } + } + log::trace!( + "{}: New {} bytes available, wakeup dispatcher", + self.0.tag(), + nbytes + ); + inner.dispatch_task.wake(); + } else { + if nbytes >= hw { + // read task is paused because of read back-pressure + // but there is no new data in top most read buffer + // so we need to wake up read task to read more data + // otherwise read task would sleep forever + inner.read_task.wake(); + } + if inner.flags.get().contains(Flags::RD_NOTIFY) { + // in case of "notify" we must wake up dispatch task + // if we read any data from source + inner.dispatch_task.wake(); + } + } + + // while reading, filter wrote some data + // in that case filters need to process write buffers + // and potentialy wake write task + if status.need_write { + filter.process_write_buf(&self.0, &inner.buffer, 0) + } else { + Ok(()) + } + }) + .map_err(|err| { + inner.dispatch_task.wake(); + inner.io_stopped(Some(err)); + inner.insert_flags(Flags::BUF_R_READY); + }); + Poll::Pending + } + Poll::Ready(Err(e)) => { + inner.io_stopped(Some(e)); + Poll::Ready(()) + } + Poll::Pending => { + self.shutdown_filters(); + Poll::Pending } - return; } } /// Get write buffer - pub fn with_buf(&self, f: F) -> Poll<()> + pub fn with_write_buf(&self, f: F) -> Poll<()> where F: FnOnce(&BytesVec) -> Poll>, { let inner = &self.0 .0; - - // call provided callback let result = inner.buffer.with_write_destination(&self.0, |buf| { - let buf = if let Some(buf) = buf { - buf - } else { + let Some(buf) = + buf.and_then(|buf| if buf.is_empty() { None } else { Some(buf) }) + else { return Poll::Ready(Ok(0)); }; - if buf.is_empty() { - return Poll::Ready(Ok(0)); - } - let result = ready!(f(buf)); - - match result { + match ready!(f(buf)) { + Ok(0) => { + log::trace!("{}: Disconnected during flush", self.tag()); + Poll::Ready(Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to write frame to transport", + ))) + } Ok(n) => { - if n == 0 { - log::trace!( - "{}: Disconnected during flush, written {}", - self.tag(), - n - ); - Poll::Ready(Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write frame to transport", - ))) - } else if n == buf.len() { + if n == buf.len() { buf.clear(); Poll::Ready(Ok(0)) } else { @@ -566,33 +584,33 @@ impl WriteContext { flags.remove(Flags::WR_PAUSED); Poll::Pending } + Poll::Ready(Ok(0)) => { + // all data has been written + flags.insert(Flags::WR_PAUSED); + + if flags.is_task_waiting_for_write() { + flags.task_waiting_for_write_is_done(); + inner.write_task.wake(); + } + + if flags.is_waiting_for_write() { + flags.waiting_for_write_is_done(); + inner.dispatch_task.wake(); + } + Poll::Ready(()) + } Poll::Ready(Ok(len)) => { // if write buffer is smaller than high watermark value, turn off back-pressure - if len == 0 { - flags.insert(Flags::WR_PAUSED); - - if flags.is_task_waiting_for_write() { - flags.task_waiting_for_write_is_done(); - inner.write_task.wake(); - } - - if flags.is_waiting_for_write() { - flags.waiting_for_write_is_done(); - inner.dispatch_task.wake(); - } - Poll::Ready(()) - } else if flags.contains(Flags::BUF_W_BACKPRESSURE) + if flags.contains(Flags::BUF_W_BACKPRESSURE) && len < inner.pool.get().write_params_high() << 1 { flags.remove(Flags::BUF_W_BACKPRESSURE); inner.dispatch_task.wake(); - Poll::Pending - } else { - Poll::Pending } + Poll::Pending } Poll::Ready(Err(e)) => { - self.close(Some(e)); + self.0 .0.io_stopped(Some(e)); Poll::Ready(()) } }; @@ -600,54 +618,44 @@ impl WriteContext { inner.flags.set(flags); result } + + fn shutdown_filters(&self) { + let io = &self.0; + let st = &self.0 .0; + if st.flags.get().contains(Flags::IO_STOPPING_FILTERS) { + let flags = st.flags.get(); + + if !flags.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING) { + let filter = io.filter(); + match filter.shutdown(io, &st.buffer, 0) { + Ok(Poll::Ready(())) => { + st.dispatch_task.wake(); + st.insert_flags(Flags::IO_STOPPING); + } + Ok(Poll::Pending) => { + // check read buffer, if buffer is not consumed it is unlikely + // that filter will properly complete shutdown + if flags.contains(Flags::RD_PAUSED) + || flags.contains(Flags::BUF_R_FULL | Flags::BUF_R_READY) + { + st.dispatch_task.wake(); + st.insert_flags(Flags::IO_STOPPING); + } + } + Err(err) => { + st.io_stopped(Some(err)); + } + } + if let Err(err) = filter.process_write_buf(io, &st.buffer, 0) { + st.io_stopped(Some(err)); + } + } + } + } } -impl Clone for WriteContext { +impl Clone for IoContext { fn clone(&self) -> Self { Self(self.0.clone()) } } - -impl WriteContextBuf { - pub fn set(&mut self, mut buf: BytesVec) { - if buf.is_empty() { - self.io.memory_pool().release_write_buf(buf); - } else if let Some(b) = self.buf.take() { - buf.extend_from_slice(&b); - self.io.memory_pool().release_write_buf(b); - self.buf = Some(buf); - } else if let Some(b) = self.io.0.buffer.set_write_destination(buf) { - // write buffer is already set - self.buf = Some(b); - } - - // if write buffer is smaller than high watermark value, turn off back-pressure - let inner = &self.io.0; - let len = self.buf.as_ref().map(|b| b.len()).unwrap_or_default() - + inner.buffer.write_destination_size(); - let mut flags = inner.flags.get(); - - if len == 0 { - if flags.is_waiting_for_write() { - flags.waiting_for_write_is_done(); - inner.dispatch_task.wake(); - } - flags.insert(Flags::WR_PAUSED); - inner.flags.set(flags); - } else if flags.contains(Flags::BUF_W_BACKPRESSURE) - && len < inner.pool.get().write_params_high() << 1 - { - flags.remove(Flags::BUF_W_BACKPRESSURE); - inner.flags.set(flags); - inner.dispatch_task.wake(); - } - } - - pub fn take(&mut self) -> Option { - if let Some(buf) = self.buf.take() { - Some(buf) - } else { - self.io.0.buffer.get_write_destination() - } - } -} diff --git a/ntex-iodriver/Cargo.toml b/ntex-iodriver/Cargo.toml index 6fd9d8ff..2bd61049 100644 --- a/ntex-iodriver/Cargo.toml +++ b/ntex-iodriver/Cargo.toml @@ -34,6 +34,7 @@ cfg-if = { workspace = true } crossbeam-channel = { workspace = true } socket2 = { workspace = true } slab = { workspace = true } +nohash-hasher = { workspace = true } # Windows specific dependencies [target.'cfg(windows)'.dependencies] diff --git a/ntex-iodriver/src/poll/mod.rs b/ntex-iodriver/src/poll/mod.rs index a68c6a29..7305e7be 100644 --- a/ntex-iodriver/src/poll/mod.rs +++ b/ntex-iodriver/src/poll/mod.rs @@ -1,15 +1,14 @@ #![allow(clippy::type_complexity)] pub use std::os::fd::{AsRawFd, OwnedFd, RawFd}; -use std::{cell::Cell, cell::RefCell, collections::HashMap, io, rc::Rc, sync::Arc}; +use std::{cell::Cell, cell::RefCell, io, rc::Rc, sync::Arc}; use std::{num::NonZeroUsize, os::fd::BorrowedFd, pin::Pin, task::Poll, time::Duration}; use crossbeam_queue::SegQueue; +use nohash_hasher::IntMap; use polling::{Event, Events, Poller}; -use crate::{ - op::Handler, op::Interest, syscall, AsyncifyPool, Entry, Key, ProactorBuilder, -}; +use crate::{op::Handler, op::Interest, AsyncifyPool, Entry, Key, ProactorBuilder}; pub(crate) mod op; @@ -132,14 +131,6 @@ enum Change { }, } -// #[derive(Debug)] -// struct BatchChange { -// fd: RawFd, -// batch: usize, -// user_data: usize, -// interest: InterestChange, -// } - pub struct DriverApi { batch: usize, changes: Rc>>, @@ -191,7 +182,7 @@ impl DriverApi { pub(crate) struct Driver { poll: Arc, events: RefCell, - registry: RefCell>, + registry: RefCell>, pool: AsyncifyPool, pool_completed: Arc>, hid: Cell, @@ -212,7 +203,7 @@ impl Driver { Ok(Self { poll: Arc::new(Poller::new()?), events: RefCell::new(events), - registry: RefCell::new(HashMap::default()), + registry: RefCell::new(Default::default()), pool: builder.create_or_get_thread_pool(), pool_completed: Arc::new(SegQueue::new()), hid: Cell::new(0), @@ -241,34 +232,6 @@ impl Driver { Key::new(self.as_raw_fd(), op) } - fn renew( - &self, - fd: BorrowedFd, - renew_event: Event, - registry: &mut HashMap, - ) -> io::Result<()> { - if !renew_event.readable && !renew_event.writable { - // crate::log(format!("DELETE - {:?}", fd.as_raw_fd())); - - if let Some(item) = registry.remove(&fd.as_raw_fd()) { - if !item.flags.contains(Flags::NEW) { - self.poll.delete(fd)?; - } - } - } else { - if let Some(item) = registry.get(&fd.as_raw_fd()) { - if item.flags.contains(Flags::NEW) { - // crate::log(format!("ADD - {:?}", fd.as_raw_fd())); - unsafe { self.poll.add(&fd, renew_event)? }; - return Ok(()); - } - } - // crate::log(format!("MODIFY - {:?} {:?}", fd.as_raw_fd(), renew_event)); - self.poll.modify(fd, renew_event)?; - } - Ok(()) - } - pub fn attach(&self, _fd: RawFd) -> io::Result<()> { Ok(()) } @@ -299,32 +262,33 @@ impl Driver { } let mut events = self.events.borrow_mut(); - let res = self.poll.wait(&mut events, timeout); - res?; + self.poll.wait(&mut events, timeout)?; - if events.is_empty() && timeout != Some(Duration::ZERO) && timeout.is_some() { - return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)); - } - // println!("POLL, events: {:?}", events.len()); - - if !events.is_empty() { + if events.is_empty() { + if timeout.is_some() && timeout != Some(Duration::ZERO) { + return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)); + } + } else { + // println!("POLL, events: {:?}", events.len()); let mut registry = self.registry.borrow_mut(); let mut handlers = self.handlers.take().unwrap(); for event in events.iter() { - let user_data = event.key; - let fd = user_data as RawFd; - log::debug!( - "receive {} for {:?} {:?}", - user_data, - event, - registry.get_mut(&fd) - ); + let fd = event.key as RawFd; + log::debug!("Event {:?} for {:?}", event, registry.get(&fd)); if let Some(item) = registry.get_mut(&fd) { - self.handle_batch_event(event, item, &mut handlers); + if event.readable { + if let Some(user_data) = item.user_data(Interest::Readable) { + handlers[item.batch].readable(user_data) + } + } + if event.writable { + if let Some(user_data) = item.user_data(Interest::Writable) { + handlers[item.batch].writable(user_data) + } + } } } - drop(registry); self.handlers.set(Some(handlers)); } @@ -348,31 +312,12 @@ impl Driver { Ok(()) } - fn handle_batch_event( - &self, - event: Event, - item: &mut FdItem, - handlers: &mut [Box], - ) { - if event.readable { - if let Some(user_data) = item.user_data(Interest::Readable) { - handlers[item.batch].readable(user_data) - } - } - if event.writable { - if let Some(user_data) = item.user_data(Interest::Writable) { - handlers[item.batch].writable(user_data) - } - } - } - /// re-calc driver changes unsafe fn apply_changes(&self) -> io::Result<()> { let mut changes = self.changes.borrow_mut(); if changes.is_empty() { return Ok(()); } - log::debug!("Apply driver changes, {:?}", changes.len()); let mut registry = self.registry.borrow_mut(); @@ -412,20 +357,26 @@ impl Driver { }; if let Some(fd) = fd { - let result = registry.get_mut(&fd).and_then(|item| { + if let Some(item) = registry.get_mut(&fd) { if item.flags.contains(Flags::CHANGED) { item.flags.remove(Flags::CHANGED); - Some((item.event(fd as usize), item.flags.contains(Flags::NEW))) - } else { - None - } - }); - if let Some((event, new)) = result { - self.renew(BorrowedFd::borrow_raw(fd), event, &mut registry)?; - if new { - if let Some(item) = registry.get_mut(&fd) { + let new = item.flags.contains(Flags::NEW); + let renew_event = item.event(fd as usize); + + if !renew_event.readable && !renew_event.writable { + // crate::log(format!("DELETE - {:?}", fd.as_raw_fd())); + registry.remove(&fd); + if !new { + self.poll.delete(BorrowedFd::borrow_raw(fd))?; + } + } else if new { item.flags.remove(Flags::NEW); + // crate::log(format!("ADD - {:?}", fd.as_raw_fd())); + unsafe { self.poll.add(fd, renew_event)? }; + } else { + // crate::log(format!("MODIFY - {:?} {:?}", fd.as_raw_fd(), renew_event)); + self.poll.modify(BorrowedFd::borrow_raw(fd), renew_event)?; } } } @@ -436,7 +387,6 @@ impl Driver { } fn push_blocking(&self, user_data: usize) { - // -> Poll> { let poll = self.poll.clone(); let completed = self.pool_completed.clone(); let mut closure = move || { @@ -449,30 +399,26 @@ impl Driver { completed.push(Entry::new(user_data, res)); poll.notify().ok(); }; - loop { - match self.pool.dispatch(closure) { - Ok(()) => return, - Err(e) => { - closure = e.0; - self.poll_blocking(); - } - } + while let Err(e) = self.pool.dispatch(closure) { + closure = e.0; + self.poll_blocking(); } } fn poll_blocking(&self) -> bool { if self.pool_completed.is_empty() { - return false; - } - - while let Some(entry) = self.pool_completed.pop() { - unsafe { - entry.notify(); + false + } else { + while let Some(entry) = self.pool_completed.pop() { + unsafe { + entry.notify(); + } } + true } - true } + /// Get notification handle pub fn handle(&self) -> NotifyHandle { NotifyHandle::new(self.poll.clone()) } @@ -506,7 +452,7 @@ impl NotifyHandle { Self { poll } } - /// Notify the inner driver. + /// Notify the driver pub fn notify(&self) -> io::Result<()> { self.poll.notify() } diff --git a/ntex-iodriver/src/poll/op.rs b/ntex-iodriver/src/poll/op.rs index 4a546643..e848b5de 100644 --- a/ntex-iodriver/src/poll/op.rs +++ b/ntex-iodriver/src/poll/op.rs @@ -1,9 +1,10 @@ use std::{io, marker::Send, os::fd::FromRawFd, os::fd::RawFd, pin::Pin, task::Poll}; -use super::{syscall, AsRawFd, Decision, OpCode}; -use crate::op::*; pub use crate::unix::op::*; +use super::{AsRawFd, Decision, OpCode}; +use crate::{op::*, syscall}; + impl OpCode for Asyncify where D: Send + 'static, diff --git a/ntex-net/Cargo.toml b/ntex-net/Cargo.toml index 28b84cc8..112d84bc 100644 --- a/ntex-net/Cargo.toml +++ b/ntex-net/Cargo.toml @@ -37,7 +37,7 @@ default-rt = ["ntex-rt/default-rt", "ntex-runtime", "ntex-iodriver", "slab", "so ntex-service = "3.3" ntex-bytes = "0.1" ntex-http = "0.1" -ntex-io = "2.8" +ntex-io = "2.11" ntex-rt = "0.4.25" ntex-util = "2.5" diff --git a/ntex-net/src/rt/driver.rs b/ntex-net/src/rt/driver.rs index c2cd24e1..00461480 100644 --- a/ntex-net/src/rt/driver.rs +++ b/ntex-net/src/rt/driver.rs @@ -6,7 +6,7 @@ use ntex_runtime::Runtime; use slab::Slab; use ntex_bytes::BufMut; -use ntex_io::{ReadContext, WriteContext}; +use ntex_io::IoContext; bitflags::bitflags! { #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] @@ -25,8 +25,7 @@ pub(crate) struct StreamCtl { struct TcpStreamItem { io: Option, fd: RawFd, - read: ReadContext, - write: WriteContext, + context: IoContext, flags: Flags, ref_count: usize, } @@ -78,15 +77,9 @@ impl CompioOps { }) } - pub(crate) fn register( - &self, - io: T, - read: ReadContext, - write: WriteContext, - ) -> StreamCtl { + pub(crate) fn register(&self, io: T, context: IoContext) -> StreamCtl { let item = TcpStreamItem { - read, - write, + context, fd: io.as_raw_fd(), io: Some(io), flags: Flags::empty(), @@ -146,7 +139,7 @@ impl Handler for CompioOpsBatcher { match change { Change::Readable => { let item = &mut streams[id]; - let result = item.read.with_buf(|buf| { + let result = item.context.with_read_buf(|buf| { let chunk = buf.chunk_mut(); let b = chunk.as_mut_ptr(); Poll::Ready( @@ -155,7 +148,12 @@ impl Handler for CompioOpsBatcher { )) .inspect(|size| { unsafe { buf.advance_mut(*size) }; - log::debug!("FD: {:?}, BUF: {:?}", item.fd, buf); + log::debug!( + "FD: {:?}, SIZE: {:?}, BUF: {:?}", + item.fd, + size, + buf + ); }), ) }); @@ -167,7 +165,7 @@ impl Handler for CompioOpsBatcher { } Change::Writable => { let item = &mut streams[id]; - let result = item.write.with_buf(|buf| { + let result = item.context.with_write_buf(|buf| { let slice = &buf[..]; syscall!( break libc::write(item.fd, slice.as_ptr() as _, slice.len()) @@ -181,7 +179,7 @@ impl Handler for CompioOpsBatcher { } Change::Error(err) => { if let Some(item) = streams.get_mut(id) { - item.read.set_stopped(Some(err)); + item.context.stopped(Some(err)); if !item.flags.contains(Flags::ERROR) { item.flags.insert(Flags::ERROR); item.flags.remove(Flags::RD | Flags::WR); @@ -211,9 +209,20 @@ impl Handler for CompioOpsBatcher { } } +pub(crate) trait Closable { + async fn close(self) -> io::Result<()>; +} + impl StreamCtl { - pub(crate) fn take_io(&self) -> Option { - self.with(|streams| streams[self.id].io.take()) + pub(crate) async fn close(self) -> io::Result<()> + where + T: Closable, + { + if let Some(io) = self.with(|streams| streams[self.id].io.take()) { + io.close().await + } else { + Ok(()) + } } pub(crate) fn with_io(&self, f: F) -> R @@ -267,7 +276,7 @@ impl StreamCtl { if !item.flags.contains(Flags::WR) { log::debug!("Resume io write ({}), {:?}", self.id, item.fd); - let result = item.write.with_buf(|buf| { + let result = item.context.with_write_buf(|buf| { log::debug!("Writing io ({}), buf: {:?}", self.id, buf.len()); let slice = &buf[..]; @@ -275,7 +284,11 @@ impl StreamCtl { }); if result.is_pending() { - log::debug!("Write is pending ({}), {:?}", self.id, item.read.flags()); + log::debug!( + "Write is pending ({}), {:?}", + self.id, + item.context.flags() + ); item.flags.insert(Flags::WR); self.inner diff --git a/ntex-net/src/rt/io.rs b/ntex-net/src/rt/io.rs index 138f2644..75b8a003 100644 --- a/ntex-net/src/rt/io.rs +++ b/ntex-net/src/rt/io.rs @@ -1,28 +1,30 @@ use std::{any, future::poll_fn, io, task::Poll}; use ntex_io::{ - types, Handle, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus, + types, Handle, IoContext, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus, }; use ntex_runtime::{net::TcpStream, net::UnixStream, spawn}; -use super::driver::{CompioOps, StreamCtl}; +use super::driver::{Closable, CompioOps, StreamCtl}; impl IoStream for super::TcpStream { - fn start(self, read: ReadContext, write: WriteContext) -> Option> { + fn start(self, read: ReadContext, _: WriteContext) -> Option> { let io = self.0; - let ctl = CompioOps::current().register(io, read.clone(), write.clone()); + let context = read.context(); + let ctl = CompioOps::current().register(io, context.clone()); let ctl2 = ctl.clone(); - spawn(async move { run(ctl, read, write).await }).detach(); + spawn(async move { run(ctl, context).await }).detach(); Some(Box::new(HandleWrapper(ctl2))) } } impl IoStream for super::UnixStream { - fn start(self, read: ReadContext, write: WriteContext) -> Option> { + fn start(self, read: ReadContext, _: WriteContext) -> Option> { let io = self.0; - let ctl = CompioOps::current().register(io, read.clone(), write.clone()); - spawn(async move { run(ctl, read, write).await }).detach(); + let context = read.context(); + let ctl = CompioOps::current().register(io, context.clone()); + spawn(async move { run(ctl, context).await }).detach(); None } @@ -42,10 +44,6 @@ impl Handle for HandleWrapper { } } -trait Closable { - async fn close(self) -> io::Result<()>; -} - impl Closable for TcpStream { async fn close(self) -> io::Result<()> { TcpStream::close(self).await @@ -64,24 +62,10 @@ enum Status { Terminate, } -async fn run(ctl: StreamCtl, read: ReadContext, write: WriteContext) { +async fn run(ctl: StreamCtl, context: IoContext) { // Handle io read readiness let st = poll_fn(|cx| { - read.shutdown_filters(cx); - - let read_st = read.poll_ready(cx); - let write_st = write.poll_ready(cx); - //println!("\n\n"); - //println!( - // "IO2 read-st {:?}, write-st: {:?}, flags: {:?}", - // read_st, - // write_st, - // read.io().flags() - //); - //println!("\n\n"); - - //let read = match read.poll_ready(cx) { - let read = match read_st { + let read = match context.poll_read_ready(cx) { Poll::Ready(ReadStatus::Ready) => { ctl.resume_read(); Poll::Pending @@ -93,7 +77,7 @@ async fn run(ctl: StreamCtl, read: ReadContext, write: WriteCont } }; - let write = match write_st { + let write = match context.poll_write_ready(cx) { Poll::Ready(WriteStatus::Ready) => { ctl.resume_write(); Poll::Pending @@ -114,15 +98,10 @@ async fn run(ctl: StreamCtl, read: ReadContext, write: WriteCont .await; ctl.resume_write(); - if st == Status::Shutdown { - write.wait_for_shutdown(true).await; - } else { - write.wait_for_shutdown(false).await; - } + context.shutdown(st == Status::Shutdown).await; ctl.pause_all(); - let io = ctl.take_io().unwrap(); - let result = io.close().await; + let result = ctl.close().await; - read.set_stopped(result.err()); + context.stopped(result.err()); } diff --git a/ntex-rt/src/lib.rs b/ntex-rt/src/lib.rs index 4f4071d2..b2fc2e3c 100644 --- a/ntex-rt/src/lib.rs +++ b/ntex-rt/src/lib.rs @@ -289,12 +289,12 @@ mod default_rt { /// /// This function panics if ntex system is not running. #[inline] - pub fn spawn(f: F) -> JoinHandle + pub fn spawn(f: F) -> Task where F: Future + 'static, { let ptr = crate::CB.with(|cb| (cb.borrow().0)()); - let fut = ntex_runtime::spawn(async move { + let task = ntex_runtime::spawn(async move { if let Some(ptr) = ptr { let mut f = std::pin::pin!(f); let result = poll_fn(|ctx| { @@ -311,7 +311,7 @@ mod default_rt { } }); - JoinHandle { fut: Some(fut) } + Task { task: Some(task) } } /// Executes a future on the current thread. This does not create a new Arbiter @@ -322,7 +322,7 @@ mod default_rt { /// /// This function panics if ntex system is not running. #[inline] - pub fn spawn_fn(f: F) -> JoinHandle + pub fn spawn_fn(f: F) -> Task where F: FnOnce() -> R + 'static, R: Future + 'static, @@ -330,6 +330,35 @@ mod default_rt { spawn(async move { f().await }) } + /// A spawned task. + pub struct Task { + task: Option>, + } + + impl Task { + pub fn is_finished(&self) -> bool { + if let Some(hnd) = &self.task { + hnd.is_finished() + } else { + true + } + } + } + + impl Drop for Task { + fn drop(&mut self) { + self.task.take().unwrap().detach(); + } + } + + impl Future for Task { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(self.task.as_mut().unwrap()).poll(cx) + } + } + #[derive(Debug, Copy, Clone)] pub struct JoinError; diff --git a/ntex-runtime/Cargo.toml b/ntex-runtime/Cargo.toml index bcdd601d..a9b5c32d 100644 --- a/ntex-runtime/Cargo.toml +++ b/ntex-runtime/Cargo.toml @@ -33,7 +33,6 @@ ntex-iodriver = "0.1" async-task = { workspace = true } cfg-if = { workspace = true } crossbeam-queue = { workspace = true } -futures-util = { workspace = true } scoped-tls = { workspace = true } fxhash = { workspace = true } log = { workspace = true } diff --git a/ntex-runtime/src/rt.rs b/ntex-runtime/src/rt.rs index 604163c1..f36fd01e 100644 --- a/ntex-runtime/src/rt.rs +++ b/ntex-runtime/src/rt.rs @@ -1,16 +1,13 @@ #![allow(clippy::type_complexity)] use std::any::{Any, TypeId}; use std::collections::{HashMap, VecDeque}; -use std::future::{ready, Future}; -use std::task::Context; use std::{ - cell::Cell, cell::RefCell, io, panic::AssertUnwindSafe, sync::Arc, thread, + cell::Cell, cell::RefCell, future::Future, io, sync::Arc, task::Context, thread, time::Duration, }; use async_task::{Runnable, Task}; use crossbeam_queue::SegQueue; -use futures_util::{future::Either, FutureExt}; use ntex_iodriver::{ op::Asyncify, AsRawFd, Key, NotifyHandle, OpCode, Proactor, ProactorBuilder, PushEntry, RawFd, @@ -51,6 +48,238 @@ impl RemoteHandle { } } +/// The async runtime for ntex. It is a thread local runtime, and cannot be +/// sent to other threads. +pub struct Runtime { + driver: Proactor, + runnables: Arc, + event_interval: usize, + data: RefCell, fxhash::FxBuildHasher>>, +} + +impl Runtime { + /// Create [`Runtime`] with default config. + pub fn new() -> io::Result { + Self::builder().build() + } + + /// Create a builder for [`Runtime`]. + pub fn builder() -> RuntimeBuilder { + RuntimeBuilder::new() + } + + #[allow(clippy::arc_with_non_send_sync)] + fn with_builder(builder: &RuntimeBuilder) -> io::Result { + Ok(Self { + driver: builder.proactor_builder.build()?, + runnables: Arc::new(RunnableQueue::new()), + event_interval: builder.event_interval, + data: RefCell::new(HashMap::default()), + }) + } + + /// Perform a function on the current runtime. + /// + /// ## Panics + /// + /// This method will panic if there are no running [`Runtime`]. + pub fn with_current T>(f: F) -> T { + #[cold] + fn not_in_ntex_runtime() -> ! { + panic!("not in a ntex runtime") + } + + if CURRENT_RUNTIME.is_set() { + CURRENT_RUNTIME.with(f) + } else { + not_in_ntex_runtime() + } + } + + /// Get current driver + pub fn driver(&self) -> &Proactor { + &self.driver + } + + /// Get handle for current runtime + pub fn handle(&self) -> RemoteHandle { + RemoteHandle { + handle: self.driver.handle(), + runnables: self.runnables.clone(), + } + } + + /// Attach a raw file descriptor/handle/socket to the runtime. + /// + /// You only need this when authoring your own high-level APIs. High-level + /// resources in this crate are attached automatically. + pub fn attach(&self, fd: RawFd) -> io::Result<()> { + self.driver.attach(fd) + } + + /// Block on the future till it completes. + pub fn block_on(&self, future: F) -> F::Output { + CURRENT_RUNTIME.set(self, || { + let mut result = None; + unsafe { self.spawn_unchecked(async { result = Some(future.await) }) }.detach(); + + self.runnables.run(self.event_interval); + loop { + if let Some(result) = result.take() { + return result; + } + + self.poll_with_driver(self.runnables.has_tasks(), || { + self.runnables.run(self.event_interval); + }); + } + }) + } + + /// Spawns a new asynchronous task, returning a [`Task`] for it. + /// + /// Spawning a task enables the task to execute concurrently to other tasks. + /// There is no guarantee that a spawned task will execute to completion. + pub fn spawn(&self, future: F) -> Task { + unsafe { self.spawn_unchecked(future) } + } + + /// Spawns a new asynchronous task, returning a [`Task`] for it. + /// + /// # Safety + /// + /// The caller should ensure the captured lifetime is long enough. + pub unsafe fn spawn_unchecked(&self, future: F) -> Task { + let runnables = self.runnables.clone(); + let handle = self.driver.handle(); + let schedule = move |runnable| { + runnables.schedule(runnable, &handle); + }; + let (runnable, task) = async_task::spawn_unchecked(future, schedule); + runnable.schedule(); + task + } + + /// Spawns a blocking task in a new thread, and wait for it. + /// + /// The task will not be cancelled even if the future is dropped. + pub fn spawn_blocking(&self, f: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let op = Asyncify::new(move || { + let res = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)); + (Ok(0), res) + }); + // It is safe to use `submit` here because the task is spawned immediately. + unsafe { + let fut = self.submit_with_flags(op); + self.spawn_unchecked(async move { fut.await.0 .1.into_inner() }) + } + } + + fn submit_raw( + &self, + op: T, + ) -> PushEntry, (io::Result, T)> { + self.driver.push(op) + } + + fn submit_with_flags( + &self, + op: T, + ) -> impl Future, T), u32)> { + let fut = self.submit_raw(op); + + async move { + match fut { + PushEntry::Pending(user_data) => OpFuture::new(user_data).await, + PushEntry::Ready(res) => { + // submit_flags won't be ready immediately, if ready, it must be error without + // flags + (res, 0) + } + } + } + } + + pub(crate) fn cancel_op(&self, op: Key) { + self.driver.cancel(op); + } + + pub(crate) fn poll_task( + &self, + cx: &mut Context, + op: Key, + ) -> PushEntry, ((io::Result, T), u32)> { + self.driver.pop(op).map_pending(|mut k| { + self.driver.update_waker(&mut k, cx.waker().clone()); + k + }) + } + + fn poll_with_driver(&self, has_tasks: bool, f: F) { + let timeout = if has_tasks { + Some(Duration::ZERO) + } else { + None + }; + + if let Err(e) = self.driver.poll(timeout, f) { + match e.kind() { + io::ErrorKind::TimedOut | io::ErrorKind::Interrupted => { + log::debug!("expected error: {e}"); + } + _ => panic!("{e:?}"), + } + } + } + + /// Insert a type into this runtime. + pub fn insert(&self, val: T) { + self.data + .borrow_mut() + .insert(TypeId::of::(), Box::new(val)); + } + + /// Check if container contains entry + pub fn contains(&self) -> bool { + self.data.borrow().contains_key(&TypeId::of::()) + } + + /// Get a reference to a type previously inserted on this runtime. + pub fn get(&self) -> Option + where + T: Clone + 'static, + { + self.data + .borrow() + .get(&TypeId::of::()) + .and_then(|boxed| boxed.downcast_ref().cloned()) + } +} + +impl AsRawFd for Runtime { + fn as_raw_fd(&self) -> RawFd { + self.driver.as_raw_fd() + } +} + +impl Drop for Runtime { + fn drop(&mut self) { + CURRENT_RUNTIME.set(self, || { + while self.runnables.sync_runnables.pop().is_some() {} + loop { + let runnable = self.runnables.local_runnables.borrow_mut().pop_front(); + if runnable.is_none() { + break; + } + } + }) + } +} + struct RunnableQueue { id: thread::ThreadId, idle: Cell, @@ -108,266 +337,6 @@ impl RunnableQueue { } } -/// The async runtime for ntex. It is a thread local runtime, and cannot be -/// sent to other threads. -pub struct Runtime { - driver: Proactor, - runnables: Arc, - event_interval: usize, - data: RefCell, fxhash::FxBuildHasher>>, -} - -impl Runtime { - /// Create [`Runtime`] with default config. - pub fn new() -> io::Result { - Self::builder().build() - } - - /// Create a builder for [`Runtime`]. - pub fn builder() -> RuntimeBuilder { - RuntimeBuilder::new() - } - - #[allow(clippy::arc_with_non_send_sync)] - fn with_builder(builder: &RuntimeBuilder) -> io::Result { - Ok(Self { - driver: builder.proactor_builder.build()?, - runnables: Arc::new(RunnableQueue::new()), - event_interval: builder.event_interval, - data: RefCell::new(HashMap::default()), - }) - } - - /// Try to perform a function on the current runtime, and if no runtime is - /// running, return the function back. - pub fn try_with_current T>(f: F) -> Result { - if CURRENT_RUNTIME.is_set() { - Ok(CURRENT_RUNTIME.with(f)) - } else { - Err(f) - } - } - - /// Perform a function on the current runtime. - /// - /// ## Panics - /// - /// This method will panic if there are no running [`Runtime`]. - pub fn with_current T>(f: F) -> T { - #[cold] - fn not_in_ntex_runtime() -> ! { - panic!("not in a ntex runtime") - } - - if CURRENT_RUNTIME.is_set() { - CURRENT_RUNTIME.with(f) - } else { - not_in_ntex_runtime() - } - } - - /// Get current driver - pub fn driver(&self) -> &Proactor { - &self.driver - } - - /// Get handle for current runtime - pub fn handle(&self) -> RemoteHandle { - RemoteHandle { - handle: self.driver.handle(), - runnables: self.runnables.clone(), - } - } - - /// Set this runtime as current runtime, and perform a function in the - /// current scope. - pub fn enter T>(&self, f: F) -> T { - CURRENT_RUNTIME.set(self, f) - } - - /// Spawns a new asynchronous task, returning a [`Task`] for it. - /// - /// # Safety - /// - /// The caller should ensure the captured lifetime long enough. - pub unsafe fn spawn_unchecked(&self, future: F) -> Task { - let runnables = self.runnables.clone(); - let handle = self.driver.handle(); - let schedule = move |runnable| { - runnables.schedule(runnable, &handle); - }; - let (runnable, task) = async_task::spawn_unchecked(future, schedule); - runnable.schedule(); - task - } - - /// Low level API to control the runtime. - /// - /// Run the scheduled tasks. - /// - /// The return value indicates whether there are still tasks in the queue. - pub fn run(&self) -> bool { - self.runnables.run(self.event_interval); - self.runnables.has_tasks() - } - - /// Block on the future till it completes. - pub fn block_on(&self, future: F) -> F::Output { - CURRENT_RUNTIME.set(self, || { - let mut result = None; - unsafe { self.spawn_unchecked(async { result = Some(future.await) }) }.detach(); - - self.runnables.run(self.event_interval); - loop { - if let Some(result) = result.take() { - return result; - } - - self.poll_with_driver(self.runnables.has_tasks(), || { - self.runnables.run(self.event_interval); - }); - } - }) - } - - /// Spawns a new asynchronous task, returning a [`Task`] for it. - /// - /// Spawning a task enables the task to execute concurrently to other tasks. - /// There is no guarantee that a spawned task will execute to completion. - pub fn spawn(&self, future: F) -> JoinHandle { - unsafe { self.spawn_unchecked(AssertUnwindSafe(future).catch_unwind()) } - } - - /// Spawns a blocking task in a new thread, and wait for it. - /// - /// The task will not be cancelled even if the future is dropped. - pub fn spawn_blocking( - &self, - f: impl (FnOnce() -> T) + Send + 'static, - ) -> JoinHandle { - let op = Asyncify::new(move || { - let res = std::panic::catch_unwind(AssertUnwindSafe(f)); - (Ok(0), res) - }); - // It is safe and sound to use `submit` here because the task is spawned - // immediately. - unsafe { - self.spawn_unchecked( - self.submit_with_flags(op) - .map(|(res, _)| res) - .map(|res| res.1.into_inner()), - ) - } - } - - /// Attach a raw file descriptor/handle/socket to the runtime. - /// - /// You only need this when authoring your own high-level APIs. High-level - /// resources in this crate are attached automatically. - pub fn attach(&self, fd: RawFd) -> io::Result<()> { - self.driver.attach(fd) - } - - fn submit_raw( - &self, - op: T, - ) -> PushEntry, (io::Result, T)> { - self.driver.push(op) - } - - fn submit_with_flags( - &self, - op: T, - ) -> impl Future, T), u32)> { - match self.submit_raw(op) { - PushEntry::Pending(user_data) => Either::Left(OpFuture::new(user_data)), - PushEntry::Ready(res) => { - // submit_flags won't be ready immediately, if ready, it must be error without - // flags - Either::Right(ready((res, 0))) - } - } - } - - pub(crate) fn cancel_op(&self, op: Key) { - self.driver.cancel(op); - } - - pub(crate) fn poll_task( - &self, - cx: &mut Context, - op: Key, - ) -> PushEntry, ((io::Result, T), u32)> { - self.driver.pop(op).map_pending(|mut k| { - self.driver.update_waker(&mut k, cx.waker().clone()); - k - }) - } - - fn poll_with_driver(&self, has_tasks: bool, f: F) { - let timeout = if has_tasks { - Some(Duration::ZERO) - } else { - None - }; - - match self.driver.poll(timeout, f) { - Ok(()) => {} - Err(e) => match e.kind() { - io::ErrorKind::TimedOut | io::ErrorKind::Interrupted => { - log::debug!("expected error: {e}"); - } - _ => { - panic!("{e:?}") - } - }, - } - } - - /// Insert a type into this runtime. - pub fn insert(&self, val: T) { - self.data - .borrow_mut() - .insert(TypeId::of::(), Box::new(val)); - } - - /// Check if container contains entry - pub fn contains(&self) -> bool { - self.data.borrow().contains_key(&TypeId::of::()) - } - - /// Get a reference to a type previously inserted on this runtime. - pub fn get(&self) -> Option - where - T: Clone + 'static, - { - self.data - .borrow() - .get(&TypeId::of::()) - .and_then(|boxed| boxed.downcast_ref().cloned()) - } -} - -impl Drop for Runtime { - fn drop(&mut self) { - self.enter(|| { - while self.runnables.sync_runnables.pop().is_some() {} - loop { - let runnable = self.runnables.local_runnables.borrow_mut().pop_front(); - if runnable.is_none() { - break; - } - } - }) - } -} - -impl AsRawFd for Runtime { - fn as_raw_fd(&self) -> RawFd { - self.driver.as_raw_fd() - } -} - /// Builder for [`Runtime`]. #[derive(Debug, Clone)] pub struct RuntimeBuilder { @@ -420,7 +389,7 @@ impl RuntimeBuilder { /// /// This method doesn't create runtime. It tries to obtain the current runtime /// by [`Runtime::with_current`]. -pub fn spawn(future: F) -> JoinHandle { +pub fn spawn(future: F) -> Task { Runtime::with_current(|r| r.spawn(future)) } diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 323c101f..e13d0a14 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -73,9 +73,9 @@ ntex-util = "2.8" ntex-bytes = "0.1.27" ntex-server = "2.7" ntex-h2 = "1.8.1" -ntex-rt = "0.4.22" -ntex-io = "2.9" -ntex-net = "2.4" +ntex-rt = "0.4.25" +ntex-io = "2.11" +ntex-net = "2.5" ntex-tls = "2.3" base64 = "0.22" diff --git a/ntex/tests/http_awc_client.rs b/ntex/tests/http_awc_client.rs index eb18624b..3c953c58 100644 --- a/ntex/tests/http_awc_client.rs +++ b/ntex/tests/http_awc_client.rs @@ -220,7 +220,7 @@ async fn test_connection_reuse() { ))) }); - let client = Client::build().timeout(Seconds(10)).finish(); + let client = Client::build().timeout(Seconds(30)).finish(); // req 1 let request = client.get(srv.url("/")).send(); @@ -255,7 +255,7 @@ async fn test_connection_force_close() { ))) }); - let client = Client::build().timeout(Seconds(10)).finish(); + let client = Client::build().timeout(Seconds(30)).finish(); // req 1 let request = client.get(srv.url("/")).force_close().send(); @@ -263,7 +263,7 @@ async fn test_connection_force_close() { assert!(response.status().is_success()); // req 2 - let client = Client::build().timeout(Seconds(10)).finish(); + let client = Client::build().timeout(Seconds(30)).finish(); let req = client.post(srv.url("/")).force_close(); let response = req.send().await.unwrap(); assert!(response.status().is_success()); @@ -291,7 +291,7 @@ async fn test_connection_server_close() { ))) }); - let client = Client::build().timeout(Seconds(10)).finish(); + let client = Client::build().timeout(Seconds(30)).finish(); // req 1 let request = client.get(srv.url("/")).send(); @@ -814,7 +814,7 @@ async fn client_read_until_eof() { // client request let req = Client::build() - .timeout(Seconds(5)) + .timeout(Seconds(30)) .finish() .get(format!("http://{}/", addr).as_str()); let mut response = req.send().await.unwrap(); diff --git a/ntex/tests/web_server.rs b/ntex/tests/web_server.rs index cbb7956d..b7bf1b75 100644 --- a/ntex/tests/web_server.rs +++ b/ntex/tests/web_server.rs @@ -868,7 +868,7 @@ async fn test_reading_deflate_encoding_large_random_rustls() { // client request let req = srv .post("/") - .timeout(Millis(10_000)) + .timeout(Millis(30_000)) .header(CONTENT_ENCODING, "deflate") .send_stream(TestBody::new(Bytes::from(enc), 1024)); @@ -909,7 +909,7 @@ async fn test_reading_deflate_encoding_large_random_rustls_h1() { // client request let req = srv .post("/") - .timeout(Millis(10_000)) + .timeout(Millis(30_000)) .header(CONTENT_ENCODING, "deflate") .send_stream(TestBody::new(Bytes::from(enc), 1024)); @@ -950,7 +950,7 @@ async fn test_reading_deflate_encoding_large_random_rustls_h2() { // client request let req = srv .post("/") - .timeout(Millis(10_000)) + .timeout(Millis(30_000)) .header(CONTENT_ENCODING, "deflate") .send_stream(TestBody::new(Bytes::from(enc), 1024));