Experimental poll based runtime (#510)

This commit is contained in:
Nikolay Kim 2025-03-09 18:11:33 +05:00 committed by GitHub
parent 3e5211eb79
commit 4c1bc3249b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
46 changed files with 4016 additions and 30 deletions

View file

@ -1,6 +1,6 @@
use std::{cell::Cell, fmt, future::poll_fn, io, task::Context, task::Poll};
use std::{cell::Cell, fmt, future::poll_fn, io, task::ready, task::Context, task::Poll};
use ntex_bytes::{BufMut, BytesVec};
use ntex_bytes::{Buf, BufMut, BytesVec};
use ntex_util::{future::lazy, future::select, future::Either, time::sleep, time::Sleep};
use crate::{AsyncRead, AsyncWrite, Flags, IoRef, ReadStatus, WriteStatus};
@ -19,6 +19,13 @@ impl ReadContext {
Self(io.clone(), Cell::new(None))
}
#[doc(hidden)]
#[inline]
/// Io tag
pub fn context(&self) -> IoContext {
IoContext::new(&self.0)
}
#[inline]
/// Io tag
pub fn tag(&self) -> &'static str {
@ -342,3 +349,313 @@ impl WriteContextBuf {
}
}
}
/// Context for io read task
pub struct IoContext(IoRef);
impl fmt::Debug for IoContext {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("IoContext").field("io", &self.0).finish()
}
}
impl IoContext {
pub(crate) fn new(io: &IoRef) -> Self {
Self(io.clone())
}
#[inline]
/// Io tag
pub fn tag(&self) -> &'static str {
self.0.tag()
}
#[doc(hidden)]
/// Io flags
pub fn flags(&self) -> crate::flags::Flags {
self.0.flags()
}
#[inline]
/// Check readiness for read operations
pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<ReadStatus> {
self.shutdown_filters();
self.0.filter().poll_read_ready(cx)
}
#[inline]
/// Check readiness for write operations
pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<WriteStatus> {
self.0.filter().poll_write_ready(cx)
}
#[inline]
/// Get io error
pub fn stopped(&self, e: Option<io::Error>) {
self.0 .0.io_stopped(e);
}
/// Wait when io get closed or preparing for close
pub async fn shutdown(&self, flush_buf: bool) {
let st = &self.0 .0;
let mut timeout = None;
poll_fn(|cx| {
let flags = self.0.flags();
if flags.intersects(Flags::IO_STOPPING | Flags::IO_STOPPED) {
Poll::Ready(())
} else {
st.write_task.register(cx.waker());
if flags.contains(Flags::IO_STOPPING_FILTERS) {
if timeout.is_none() {
timeout = Some(sleep(st.disconnect_timeout.get()));
}
if timeout.as_ref().unwrap().poll_elapsed(cx).is_ready() {
st.dispatch_task.wake();
st.insert_flags(Flags::IO_STOPPING);
return Poll::Ready(());
}
}
Poll::Pending
}
})
.await;
if flush_buf && !self.0.flags().contains(Flags::WR_PAUSED) {
st.insert_flags(Flags::WR_TASK_WAIT);
poll_fn(|cx| {
let flags = self.0.flags();
if flags.intersects(Flags::WR_PAUSED | Flags::IO_STOPPED) {
Poll::Ready(())
} else {
st.write_task.register(cx.waker());
if timeout.is_none() {
timeout = Some(sleep(st.disconnect_timeout.get()));
}
if timeout.as_ref().unwrap().poll_elapsed(cx).is_ready() {
Poll::Ready(())
} else {
Poll::Pending
}
}
})
.await;
}
}
/// Get read buffer
pub fn with_read_buf<F>(&self, f: F) -> Poll<()>
where
F: FnOnce(&mut BytesVec) -> Poll<io::Result<usize>>,
{
let inner = &self.0 .0;
let (hw, lw) = self.0.memory_pool().read_params().unpack();
let result = inner.buffer.with_read_source(&self.0, |buf| {
// make sure we've got room
let remaining = buf.remaining_mut();
if remaining < lw {
buf.reserve(hw - remaining);
}
f(buf)
});
// handle buffer changes
match result {
Poll::Ready(Ok(0)) => {
inner.io_stopped(None);
Poll::Ready(())
}
Poll::Ready(Ok(nbytes)) => {
let filter = self.0.filter();
let _ = filter
.process_read_buf(&self.0, &inner.buffer, 0, nbytes)
.and_then(|status| {
if status.nbytes > 0 {
// dest buffer has new data, wake up dispatcher
if inner.buffer.read_destination_size() >= hw {
log::trace!(
"{}: Io read buffer is too large {}, enable read back-pressure",
self.0.tag(),
nbytes
);
inner.insert_flags(Flags::BUF_R_READY | Flags::BUF_R_FULL);
} else {
inner.insert_flags(Flags::BUF_R_READY);
if nbytes >= hw {
// read task is paused because of read back-pressure
// but there is no new data in top most read buffer
// so we need to wake up read task to read more data
// otherwise read task would sleep forever
inner.read_task.wake();
}
}
log::trace!(
"{}: New {} bytes available, wakeup dispatcher",
self.0.tag(),
nbytes
);
inner.dispatch_task.wake();
} else {
if nbytes >= hw {
// read task is paused because of read back-pressure
// but there is no new data in top most read buffer
// so we need to wake up read task to read more data
// otherwise read task would sleep forever
inner.read_task.wake();
}
if inner.flags.get().contains(Flags::RD_NOTIFY) {
// in case of "notify" we must wake up dispatch task
// if we read any data from source
inner.dispatch_task.wake();
}
}
// while reading, filter wrote some data
// in that case filters need to process write buffers
// and potentialy wake write task
if status.need_write {
filter.process_write_buf(&self.0, &inner.buffer, 0)
} else {
Ok(())
}
})
.map_err(|err| {
inner.dispatch_task.wake();
inner.io_stopped(Some(err));
inner.insert_flags(Flags::BUF_R_READY);
});
Poll::Pending
}
Poll::Ready(Err(e)) => {
inner.io_stopped(Some(e));
Poll::Ready(())
}
Poll::Pending => {
self.shutdown_filters();
Poll::Pending
}
}
}
/// Get write buffer
pub fn with_write_buf<F>(&self, f: F) -> Poll<()>
where
F: FnOnce(&BytesVec) -> Poll<io::Result<usize>>,
{
let inner = &self.0 .0;
let result = inner.buffer.with_write_destination(&self.0, |buf| {
let Some(buf) =
buf.and_then(|buf| if buf.is_empty() { None } else { Some(buf) })
else {
return Poll::Ready(Ok(0));
};
match ready!(f(buf)) {
Ok(0) => {
log::trace!("{}: Disconnected during flush", self.tag());
Poll::Ready(Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write frame to transport",
)))
}
Ok(n) => {
if n == buf.len() {
buf.clear();
Poll::Ready(Ok(0))
} else {
buf.advance(n);
Poll::Ready(Ok(buf.len()))
}
}
Err(e) => Poll::Ready(Err(e)),
}
});
let mut flags = inner.flags.get();
let result = match result {
Poll::Pending => {
flags.remove(Flags::WR_PAUSED);
Poll::Pending
}
Poll::Ready(Ok(0)) => {
// all data has been written
flags.insert(Flags::WR_PAUSED);
if flags.is_task_waiting_for_write() {
flags.task_waiting_for_write_is_done();
inner.write_task.wake();
}
if flags.is_waiting_for_write() {
flags.waiting_for_write_is_done();
inner.dispatch_task.wake();
}
Poll::Ready(())
}
Poll::Ready(Ok(len)) => {
// if write buffer is smaller than high watermark value, turn off back-pressure
if flags.contains(Flags::BUF_W_BACKPRESSURE)
&& len < inner.pool.get().write_params_high() << 1
{
flags.remove(Flags::BUF_W_BACKPRESSURE);
inner.dispatch_task.wake();
}
Poll::Pending
}
Poll::Ready(Err(e)) => {
self.0 .0.io_stopped(Some(e));
Poll::Ready(())
}
};
inner.flags.set(flags);
result
}
fn shutdown_filters(&self) {
let io = &self.0;
let st = &self.0 .0;
if st.flags.get().contains(Flags::IO_STOPPING_FILTERS) {
let flags = st.flags.get();
if !flags.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING) {
let filter = io.filter();
match filter.shutdown(io, &st.buffer, 0) {
Ok(Poll::Ready(())) => {
st.dispatch_task.wake();
st.insert_flags(Flags::IO_STOPPING);
}
Ok(Poll::Pending) => {
// check read buffer, if buffer is not consumed it is unlikely
// that filter will properly complete shutdown
if flags.contains(Flags::RD_PAUSED)
|| flags.contains(Flags::BUF_R_FULL | Flags::BUF_R_READY)
{
st.dispatch_task.wake();
st.insert_flags(Flags::IO_STOPPING);
}
}
Err(err) => {
st.io_stopped(Some(err));
}
}
if let Err(err) = filter.process_write_buf(io, &st.buffer, 0) {
st.io_stopped(Some(err));
}
}
}
}
}
impl Clone for IoContext {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}