diff --git a/Cargo.toml b/Cargo.toml index e2edf0af..871d9de2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,9 +45,6 @@ ntex-util = { path = "ntex-util" } ntex-compio = { path = "ntex-compio" } ntex-tokio = { path = "ntex-tokio" } -#ntex-neon = { git = "https://github.com/ntex-rs/neon.git" } -#ntex-neon = { path = "../dev/neon" } - [workspace.dependencies] async-task = "4.5.0" bitflags = "2" diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index 75dc2cd8..9a4d6f94 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -723,6 +723,27 @@ impl IoContext { /// Get read buffer pub fn with_read_buf(&self, f: F) -> Poll<()> + where + F: FnOnce(&mut BytesVec) -> Poll>, + { + let result = self.with_read_buf_inner(f); + + // check read readiness + if result.is_pending() { + if let Some(waker) = self.0 .0.read_task.take() { + let mut cx = Context::from_waker(&waker); + + if let Poll::Ready(ReadStatus::Ready) = + self.0.filter().poll_read_ready(&mut cx) + { + return Poll::Pending; + } + } + } + result + } + + fn with_read_buf_inner(&self, f: F) -> Poll<()> where F: FnOnce(&mut BytesVec) -> Poll>, { @@ -817,8 +838,33 @@ impl IoContext { } } - /// Get write buffer pub fn with_write_buf(&self, f: F) -> Poll<()> + where + F: FnOnce(&BytesVec) -> Poll>, + { + let result = self.with_write_buf_inner(f); + + // check write readiness + if result.is_pending() { + let inner = &self.0 .0; + if let Some(waker) = inner.write_task.take() { + let ready = self + .0 + .filter() + .poll_write_ready(&mut Context::from_waker(&waker)); + if !matches!( + ready, + Poll::Ready(WriteStatus::Ready | WriteStatus::Shutdown) + ) { + return Poll::Ready(()); + } + } + } + result + } + + /// Get write buffer + fn with_write_buf_inner(&self, f: F) -> Poll<()> where F: FnOnce(&BytesVec) -> Poll>, {