Refactor io write task (#167)

This commit is contained in:
Nikolay Kim 2023-01-26 20:18:21 +06:00 committed by GitHub
parent c794439139
commit 38614715ca
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 418 additions and 411 deletions

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.2.2] - 2023-01-26
* Update io api usage
## [0.2.0] - 2023-01-04 ## [0.2.0] - 2023-01-04
* Release * Release

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-async-std" name = "ntex-async-std"
version = "0.2.1" version = "0.2.2"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "async-std intergration for ntex framework" description = "async-std intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -17,7 +17,7 @@ path = "src/lib.rs"
[dependencies] [dependencies]
ntex-bytes = "0.1.19" ntex-bytes = "0.1.19"
ntex-io = "0.2.1" ntex-io = "0.2.4"
ntex-util = "0.2.0" ntex-util = "0.2.0"
async-oneshot = "0.5.0" async-oneshot = "0.5.0"
log = "0.4" log = "0.4"

View file

@ -140,10 +140,14 @@ impl Future for WriteTask {
} }
} }
// flush framed instance // flush io stream
match flush_io(&mut this.io.0, &this.state, cx) { let io = &mut this.io.0;
Poll::Pending | Poll::Ready(true) => Poll::Pending, match ready!(this.state.with_buf(|buf| flush_io(io, buf, cx))) {
Poll::Ready(false) => Poll::Ready(()), Ok(()) => Poll::Pending,
Err(e) => {
this.state.close(Some(e));
Poll::Ready(())
}
} }
} }
Poll::Ready(WriteStatus::Timeout(time)) => { Poll::Ready(WriteStatus::Timeout(time)) => {
@ -182,28 +186,27 @@ impl Future for WriteTask {
match st { match st {
Shutdown::None => { Shutdown::None => {
// flush write buffer // flush write buffer
match flush_io(&mut this.io.0, &this.state, cx) { let io = &mut this.io.0;
Poll::Ready(true) => { match this.state.with_buf(|buf| flush_io(io, buf, cx)) {
if this Poll::Ready(Ok(())) => {
.io if let Err(e) =
.0 this.io.0.shutdown(std::net::Shutdown::Write)
.shutdown(std::net::Shutdown::Write)
.is_err()
{ {
this.state.close(None); this.state.close(Some(e));
return Poll::Ready(()); return Poll::Ready(());
} }
*st = Shutdown::Stopping(0); *st = Shutdown::Stopping(0);
continue; continue;
} }
Poll::Ready(false) => { Poll::Ready(Err(err)) => {
log::trace!( log::trace!(
"write task is closed with err during flush" "write task is closed with err during flush, {:?}",
err
); );
this.state.close(None); this.state.close(Some(err));
return Poll::Ready(()); return Poll::Ready(());
} }
_ => (), Poll::Pending => (),
} }
} }
Shutdown::Stopping(ref mut count) => { Shutdown::Stopping(ref mut count) => {
@ -255,77 +258,64 @@ impl Future for WriteTask {
/// Flush write buffer to underlying I/O stream. /// Flush write buffer to underlying I/O stream.
pub(super) fn flush_io<T: Read + Write + Unpin>( pub(super) fn flush_io<T: Read + Write + Unpin>(
io: &mut T, io: &mut T,
state: &WriteContext, buf: &mut Option<BytesVec>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<bool> { ) -> Poll<io::Result<()>> {
let mut buf = if let Some(buf) = state.get_write_buf() { if let Some(buf) = buf {
buf let len = buf.len();
} else {
return Poll::Ready(true);
};
let len = buf.len();
let pool = state.memory_pool();
if len != 0 { if len != 0 {
// log::trace!("flushing framed transport: {:?}", buf.len()); // log::trace!("flushing framed transport: {:?}", buf.len());
let mut written = 0; let mut written = 0;
while written < len { let result = loop {
match Pin::new(&mut *io).poll_write(cx, &buf[written..]) { break match Pin::new(&mut *io).poll_write(cx, &buf[written..]) {
Poll::Pending => break, Poll::Ready(Ok(n)) => {
Poll::Ready(Ok(n)) => { if n == 0 {
if n == 0 { log::trace!("Disconnected during flush, written {}", written);
log::trace!("Disconnected during flush, written {}", written); Poll::Ready(Err(io::Error::new(
pool.release_write_buf(buf); io::ErrorKind::WriteZero,
state.close(Some(io::Error::new( "failed to write frame to transport",
io::ErrorKind::WriteZero, )))
"failed to write frame to transport", } else {
))); written += n;
return Poll::Ready(false); if written == len {
} else { buf.clear();
written += n Poll::Ready(Ok(()))
} else {
continue;
}
}
}
Poll::Pending => {
// remove written data
buf.advance(written);
Poll::Pending
}
Poll::Ready(Err(e)) => {
log::trace!("Error during flush: {}", e);
Poll::Ready(Err(e))
}
};
};
// log::trace!("flushed {} bytes", written);
// flush
return if written > 0 {
match Pin::new(&mut *io).poll_flush(cx) {
Poll::Ready(Ok(_)) => result,
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => {
log::trace!("error during flush: {}", e);
Poll::Ready(Err(e))
} }
} }
Poll::Ready(Err(e)) => { } else {
log::trace!("Error during flush: {}", e); result
pool.release_write_buf(buf); };
state.close(Some(e));
return Poll::Ready(false);
}
}
} }
log::trace!("flushed {} bytes", written);
// remove written data
let result = if written == len {
buf.clear();
if let Err(e) = state.release_write_buf(buf) {
state.close(Some(e));
return Poll::Ready(false);
}
Poll::Ready(true)
} else {
buf.advance(written);
if let Err(e) = state.release_write_buf(buf) {
state.close(Some(e));
return Poll::Ready(false);
}
Poll::Pending
};
// flush
match Pin::new(&mut *io).poll_flush(cx) {
Poll::Ready(Ok(_)) => result,
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => {
log::trace!("error during flush: {}", e);
state.close(Some(e));
Poll::Ready(false)
}
}
} else {
Poll::Ready(true)
} }
Poll::Ready(Ok(()))
} }
pub fn poll_read_buf<T: Read>( pub fn poll_read_buf<T: Read>(
@ -458,10 +448,14 @@ mod unixstream {
} }
} }
// flush framed instance // flush io stream
match flush_io(&mut this.io.0, &this.state, cx) { let io = &mut this.io.0;
Poll::Pending | Poll::Ready(true) => Poll::Pending, match ready!(this.state.with_buf(|buf| flush_io(io, buf, cx))) {
Poll::Ready(false) => Poll::Ready(()), Ok(()) => Poll::Pending,
Err(e) => {
this.state.close(Some(e));
Poll::Ready(())
}
} }
} }
Poll::Ready(WriteStatus::Timeout(time)) => { Poll::Ready(WriteStatus::Timeout(time)) => {
@ -500,28 +494,27 @@ mod unixstream {
match st { match st {
Shutdown::None => { Shutdown::None => {
// flush write buffer // flush write buffer
match flush_io(&mut this.io.0, &this.state, cx) { let io = &mut this.io.0;
Poll::Ready(true) => { match this.state.with_buf(|buf| flush_io(io, buf, cx)) {
if this Poll::Ready(Ok(())) => {
.io if let Err(e) =
.0 this.io.0.shutdown(std::net::Shutdown::Write)
.shutdown(std::net::Shutdown::Write)
.is_err()
{ {
this.state.close(None); this.state.close(Some(e));
return Poll::Ready(()); return Poll::Ready(());
} }
*st = Shutdown::Stopping(0); *st = Shutdown::Stopping(0);
continue; continue;
} }
Poll::Ready(false) => { Poll::Ready(Err(err)) => {
log::trace!( log::trace!(
"write task is closed with err during flush" "write task is closed with err during flush, {:?}",
); err
this.state.close(None); );
this.state.close(Some(err));
return Poll::Ready(()); return Poll::Ready(());
} }
_ => (), Poll::Pending => (),
} }
} }
Shutdown::Stopping(ref mut count) => { Shutdown::Stopping(ref mut count) => {

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.2.2] - 2023-01-26
* Update io api usage
## [0.2.0] - 2023-01-04 ## [0.2.0] - 2023-01-04
* Release * Release

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-glommio" name = "ntex-glommio"
version = "0.2.1" version = "0.2.2"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "glommio intergration for ntex framework" description = "glommio intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -17,7 +17,7 @@ path = "src/lib.rs"
[dependencies] [dependencies]
ntex-bytes = "0.1.19" ntex-bytes = "0.1.19"
ntex-io = "0.2.1" ntex-io = "0.2.4"
ntex-util = "0.2.0" ntex-util = "0.2.0"
async-oneshot = "0.5.0" async-oneshot = "0.5.0"
futures-lite = "1.12" futures-lite = "1.12"

View file

@ -148,10 +148,17 @@ impl Future for WriteTask {
} }
} }
// flush framed instance // flush io stream
match flush_io(&mut *this.io.0.borrow_mut(), &this.state, cx) { match ready!(this.state.with_buf(|buf| flush_io(
Poll::Pending | Poll::Ready(true) => Poll::Pending, &mut *this.io.0.borrow_mut(),
Poll::Ready(false) => Poll::Ready(()), buf,
cx
))) {
Ok(()) => Poll::Pending,
Err(e) => {
this.state.close(Some(e));
Poll::Ready(())
}
} }
} }
Poll::Ready(WriteStatus::Timeout(time)) => { Poll::Ready(WriteStatus::Timeout(time)) => {
@ -194,10 +201,9 @@ impl Future for WriteTask {
match st { match st {
Shutdown::None(ref mut fut) => { Shutdown::None(ref mut fut) => {
// flush write buffer // flush write buffer
let flush_result = let mut io = this.io.0.borrow_mut();
flush_io(&mut *this.io.0.borrow_mut(), &this.state, cx); match this.state.with_buf(|buf| flush_io(&mut *io, buf, cx)) {
match flush_result { Poll::Ready(Ok(())) => {
Poll::Ready(true) => {
if ready!(fut.poll(cx)).is_err() { if ready!(fut.poll(cx)).is_err() {
this.state.close(None); this.state.close(None);
return Poll::Ready(()); return Poll::Ready(());
@ -205,14 +211,15 @@ impl Future for WriteTask {
*st = Shutdown::Stopping(0); *st = Shutdown::Stopping(0);
continue; continue;
} }
Poll::Ready(false) => { Poll::Ready(Err(err)) => {
log::trace!( log::trace!(
"write task is closed with err during flush" "write task is closed with err during flush, {:?}",
err
); );
this.state.close(None); this.state.close(Some(err));
return Poll::Ready(()); return Poll::Ready(());
} }
_ => (), Poll::Pending => (),
} }
} }
Shutdown::Stopping(ref mut count) => { Shutdown::Stopping(ref mut count) => {
@ -266,77 +273,64 @@ impl Future for WriteTask {
/// Flush write buffer to underlying I/O stream. /// Flush write buffer to underlying I/O stream.
pub(super) fn flush_io<T: AsyncRead + AsyncWrite + Unpin>( pub(super) fn flush_io<T: AsyncRead + AsyncWrite + Unpin>(
io: &mut T, io: &mut T,
state: &WriteContext, buf: &mut Option<BytesVec>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<bool> { ) -> Poll<io::Result<()>> {
let mut buf = if let Some(buf) = state.get_write_buf() { if let Some(buf) = buf {
buf let len = buf.len();
} else {
return Poll::Ready(true);
};
let len = buf.len();
let pool = state.memory_pool();
if len != 0 { if len != 0 {
// log::trace!("flushing framed transport: {:?}", buf.len()); // log::trace!("flushing framed transport: {:?}", buf.len());
let mut written = 0; let mut written = 0;
while written < len { let result = loop {
match Pin::new(&mut *io).poll_write(cx, &buf[written..]) { break match Pin::new(&mut *io).poll_write(cx, &buf[written..]) {
Poll::Pending => break, Poll::Ready(Ok(n)) => {
Poll::Ready(Ok(n)) => { if n == 0 {
if n == 0 { log::trace!("Disconnected during flush, written {}", written);
log::trace!("Disconnected during flush, written {}", written); Poll::Ready(Err(io::Error::new(
pool.release_write_buf(buf); io::ErrorKind::WriteZero,
state.close(Some(io::Error::new( "failed to write frame to transport",
io::ErrorKind::WriteZero, )))
"failed to write frame to transport", } else {
))); written += n;
return Poll::Ready(false); if written == len {
} else { buf.clear();
written += n Poll::Ready(Ok(()))
} else {
continue;
}
}
}
Poll::Pending => {
// remove written data
buf.advance(written);
Poll::Pending
}
Poll::Ready(Err(e)) => {
log::trace!("Error during flush: {}", e);
Poll::Ready(Err(e))
}
};
};
log::trace!("flushed {} bytes", written);
// flush
return if written > 0 {
match Pin::new(&mut *io).poll_flush(cx) {
Poll::Ready(Ok(_)) => result,
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => {
log::trace!("error during flush: {}", e);
Poll::Ready(Err(e))
} }
} }
Poll::Ready(Err(e)) => { } else {
log::trace!("Error during flush: {}", e); result
pool.release_write_buf(buf); };
state.close(Some(e));
return Poll::Ready(false);
}
}
} }
log::trace!("flushed {} bytes", written);
// remove written data
let result = if written == len {
buf.clear();
if let Err(e) = state.release_write_buf(buf) {
state.close(Some(e));
return Poll::Ready(false);
}
Poll::Ready(true)
} else {
buf.advance(written);
if let Err(e) = state.release_write_buf(buf) {
state.close(Some(e));
return Poll::Ready(false);
}
Poll::Pending
};
// flush
match Pin::new(&mut *io).poll_flush(cx) {
Poll::Ready(Ok(_)) => result,
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => {
log::trace!("error during flush: {}", e);
state.close(Some(e));
Poll::Ready(false)
}
}
} else {
Poll::Ready(true)
} }
Poll::Ready(Ok(()))
} }
pub fn poll_read_buf<T: AsyncRead>( pub fn poll_read_buf<T: AsyncRead>(
@ -456,10 +450,17 @@ impl Future for UnixWriteTask {
} }
} }
// flush framed instance // flush io stream
match flush_io(&mut *this.io.0.borrow_mut(), &this.state, cx) { match ready!(this.state.with_buf(|buf| flush_io(
Poll::Pending | Poll::Ready(true) => Poll::Pending, &mut *this.io.0.borrow_mut(),
Poll::Ready(false) => Poll::Ready(()), buf,
cx
))) {
Ok(()) => Poll::Pending,
Err(e) => {
this.state.close(Some(e));
Poll::Ready(())
}
} }
} }
Poll::Ready(WriteStatus::Timeout(time)) => { Poll::Ready(WriteStatus::Timeout(time)) => {
@ -502,10 +503,9 @@ impl Future for UnixWriteTask {
match st { match st {
Shutdown::None(ref mut fut) => { Shutdown::None(ref mut fut) => {
// flush write buffer // flush write buffer
let flush_result = let mut io = this.io.0.borrow_mut();
flush_io(&mut *this.io.0.borrow_mut(), &this.state, cx); match this.state.with_buf(|buf| flush_io(&mut *io, buf, cx)) {
match flush_result { Poll::Ready(Ok(())) => {
Poll::Ready(true) => {
if ready!(fut.poll(cx)).is_err() { if ready!(fut.poll(cx)).is_err() {
this.state.close(None); this.state.close(None);
return Poll::Ready(()); return Poll::Ready(());
@ -513,14 +513,15 @@ impl Future for UnixWriteTask {
*st = Shutdown::Stopping(0); *st = Shutdown::Stopping(0);
continue; continue;
} }
Poll::Ready(false) => { Poll::Ready(Err(err)) => {
log::trace!( log::trace!(
"write task is closed with err during flush" "write task is closed with err during flush, {:?}",
err
); );
this.state.close(None); this.state.close(Some(err));
return Poll::Ready(()); return Poll::Ready(());
} }
_ => (), Poll::Pending => (),
} }
} }
Shutdown::Stopping(ref mut count) => { Shutdown::Stopping(ref mut count) => {

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.2.4] - 2023-01-26
* Refactor write task management
## [0.2.3] - 2023-01-25 ## [0.2.3] - 2023-01-25
* Optimize buffers layout * Optimize buffers layout

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-io" name = "ntex-io"
version = "0.2.3" version = "0.2.4"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames" description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View file

@ -151,18 +151,6 @@ impl Stack {
&mut self.get_last_level().1 &mut self.get_last_level().1
} }
pub(crate) fn last_write_buf_size(&mut self) -> usize {
self.get_last_level()
.1
.as_ref()
.map(|b| b.len())
.unwrap_or(0)
}
pub(crate) fn set_last_write_buf(&mut self, buf: BytesVec) {
self.get_last_level().1 = Some(buf);
}
pub(crate) fn release(&mut self, pool: PoolRef) { pub(crate) fn release(&mut self, pool: PoolRef) {
let items = match &mut self.buffers { let items = match &mut self.buffers {
Either::Left(b) => &mut b[..], Either::Left(b) => &mut b[..],

View file

@ -128,7 +128,12 @@ impl Filter for Base {
#[inline] #[inline]
fn process_write_buf(&self, _: &IoRef, s: &mut Stack, _: usize) -> io::Result<()> { fn process_write_buf(&self, _: &IoRef, s: &mut Stack, _: usize) -> io::Result<()> {
if let Some(buf) = s.last_write_buf() { if let Some(buf) = s.last_write_buf() {
if buf.len() >= self.0.memory_pool().write_params_high() { let len = buf.len();
if len > 0 && self.0.flags().contains(Flags::WR_PAUSED) {
self.0 .0.remove_flags(Flags::WR_PAUSED);
self.0 .0.write_task.wake();
}
if len >= self.0.memory_pool().write_params_high() {
self.0 .0.insert_flags(Flags::WR_BACKPRESSURE); self.0 .0.insert_flags(Flags::WR_BACKPRESSURE);
} }
} }

View file

@ -32,17 +32,19 @@ bitflags::bitflags! {
const RD_BUF_FULL = 0b0000_0000_0100_0000; const RD_BUF_FULL = 0b0000_0000_0100_0000;
/// wait write completion /// wait write completion
const WR_WAIT = 0b0000_0000_1000_0000; const WR_WAIT = 0b0000_0001_0000_0000;
/// write buffer is full /// write buffer is full
const WR_BACKPRESSURE = 0b0000_0001_0000_0000; const WR_BACKPRESSURE = 0b0000_0010_0000_0000;
/// write task paused
const WR_PAUSED = 0b0000_0100_0000_0000;
/// dispatcher is marked stopped /// dispatcher is marked stopped
const DSP_STOP = 0b0000_0010_0000_0000; const DSP_STOP = 0b0001_0000_0000_0000;
/// keep-alive timeout occured /// keep-alive timeout occured
const DSP_KEEPALIVE = 0b0000_0100_0000_0000; const DSP_KEEPALIVE = 0b0010_0000_0000_0000;
/// keep-alive timeout started /// keep-alive timeout started
const KEEPALIVE = 0b0001_0000_0000_0000; const KEEPALIVE = 0b0100_0000_0000_0000;
} }
} }
@ -556,6 +558,7 @@ impl<F> Io<F> {
} }
self.0 .0.read_task.wake(); self.0 .0.read_task.wake();
self.0 .0.write_task.wake();
self.0 .0.dispatch_task.register(cx.waker()); self.0 .0.dispatch_task.register(cx.waker());
Poll::Pending Poll::Pending
} }

View file

@ -145,9 +145,7 @@ impl IoRef {
let flags = self.0.flags.get(); let flags = self.0.flags.get();
if !flags.intersects(Flags::IO_STOPPING) { if !flags.intersects(Flags::IO_STOPPING) {
self.with_write_buf(|buf| { self.with_write_buf(|buf| buf.extend_from_slice(src))
buf.extend_from_slice(src);
})
} else { } else {
Ok(()) Ok(())
} }
@ -160,17 +158,12 @@ impl IoRef {
F: FnOnce(&mut BytesVec) -> R, F: FnOnce(&mut BytesVec) -> R,
{ {
let mut buffer = self.0.buffer.borrow_mut(); let mut buffer = self.0.buffer.borrow_mut();
let is_write_sleep = buffer.last_write_buf_size() == 0;
let result = f(buffer.first_write_buf(self)); let result = f(buffer.first_write_buf(self));
self.0 self.0
.filter .filter
.get() .get()
.process_write_buf(self, &mut buffer, 0)?; .process_write_buf(self, &mut buffer, 0)?;
if is_write_sleep && buffer.last_write_buf_size() != 0 {
self.0.write_task.wake();
}
Ok(result) Ok(result)
} }
@ -183,7 +176,6 @@ impl IoRef {
let mut b = self.0.buffer.borrow_mut(); let mut b = self.0.buffer.borrow_mut();
let result = b.write_buf(self, 0, f); let result = b.write_buf(self, 0, f);
self.0.filter.get().process_write_buf(self, &mut b, 0)?; self.0.filter.get().process_write_buf(self, &mut b, 0)?;
self.0.write_task.wake();
Ok(result) Ok(result)
} }

View file

@ -25,7 +25,6 @@ impl ReadContext {
{ {
let inner = &self.0 .0; let inner = &self.0 .0;
let mut stack = inner.buffer.borrow_mut(); let mut stack = inner.buffer.borrow_mut();
let is_write_sleep = stack.last_write_buf_size() == 0;
let mut buf = stack let mut buf = stack
.last_read_buf() .last_read_buf()
.take() .take()
@ -77,11 +76,7 @@ impl ReadContext {
// in that case filters need to process write buffers // in that case filters need to process write buffers
// and potentialy wake write task // and potentialy wake write task
if status.need_write { if status.need_write {
let result = filter.process_write_buf(&self.0, &mut stack, 0); filter.process_write_buf(&self.0, &mut stack, 0)
if is_write_sleep && stack.last_write_buf_size() != 0 {
inner.write_task.wake();
}
result
} else { } else {
Ok(()) Ok(())
} }
@ -134,42 +129,57 @@ impl WriteContext {
self.0.filter().poll_write_ready(cx) self.0.filter().poll_write_ready(cx)
} }
#[inline] /// Get read buffer
/// Get write buffer pub fn with_buf<F>(&self, f: F) -> Poll<io::Result<()>>
pub fn get_write_buf(&self) -> Option<BytesVec> { where
self.0 .0.buffer.borrow_mut().last_write_buf().take() F: FnOnce(&mut Option<BytesVec>) -> Poll<io::Result<()>>,
} {
let inner = &self.0 .0;
let mut stack = inner.buffer.borrow_mut();
let buf = stack.last_write_buf();
#[inline] // call provided callback
/// Release write buffer after io write operations let result = f(buf);
pub fn release_write_buf(&self, buf: BytesVec) -> Result<(), io::Error> {
let pool = self.0.memory_pool();
let mut flags = self.0.flags();
if buf.is_empty() { let mut len = 0;
pool.release_write_buf(buf); if let Some(b) = buf {
if flags.intersects(Flags::WR_WAIT | Flags::WR_BACKPRESSURE) { if b.is_empty() {
flags.remove(Flags::WR_WAIT | Flags::WR_BACKPRESSURE); inner.pool.get().release_write_buf(buf.take().unwrap());
self.0.set_flags(flags); } else {
self.0 .0.dispatch_task.wake(); len = b.len();
} }
} else {
// if write buffer is smaller than high watermark value, turn off back-pressure
if flags.contains(Flags::WR_BACKPRESSURE)
&& buf.len() < pool.write_params_high() << 1
{
flags.remove(Flags::WR_BACKPRESSURE);
self.0.set_flags(flags);
self.0 .0.dispatch_task.wake();
}
self.0 .0.buffer.borrow_mut().set_last_write_buf(buf);
} }
Ok(()) // if write buffer is smaller than high watermark value, turn off back-pressure
let mut flags = inner.flags.get();
let mut wake_dispatcher = false;
if flags.contains(Flags::WR_BACKPRESSURE)
&& len < inner.pool.get().write_params_high() << 1
{
flags.remove(Flags::WR_BACKPRESSURE);
wake_dispatcher = true;
}
if flags.contains(Flags::WR_WAIT) && len == 0 {
flags.remove(Flags::WR_WAIT);
wake_dispatcher = true;
}
match result {
Poll::Pending => flags.remove(Flags::WR_PAUSED),
Poll::Ready(Ok(())) => flags.insert(Flags::WR_PAUSED),
Poll::Ready(Err(_)) => {}
}
inner.flags.set(flags);
if wake_dispatcher {
inner.dispatch_task.wake();
}
result
} }
#[inline] #[inline]
/// Indicate that io task is stopped /// Indicate that write io task is stopped
pub fn close(&self, err: Option<io::Error>) { pub fn close(&self, err: Option<io::Error>) {
self.0 .0.io_stopped(err); self.0 .0.io_stopped(err);
} }
@ -180,8 +190,9 @@ fn shutdown_filters(io: &IoRef) {
let flags = st.flags.get(); let flags = st.flags.get();
if !flags.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING) { if !flags.intersects(Flags::IO_STOPPED | Flags::IO_STOPPING) {
let filter = io.filter();
let mut buffer = st.buffer.borrow_mut(); let mut buffer = st.buffer.borrow_mut();
match io.filter().shutdown(io, &mut buffer, 0) { match filter.shutdown(io, &mut buffer, 0) {
Ok(Poll::Ready(())) => { Ok(Poll::Ready(())) => {
st.dispatch_task.wake(); st.dispatch_task.wake();
st.insert_flags(Flags::IO_STOPPING); st.insert_flags(Flags::IO_STOPPING);
@ -200,6 +211,8 @@ fn shutdown_filters(io: &IoRef) {
st.io_stopped(Some(err)); st.io_stopped(Some(err));
} }
} }
st.write_task.wake(); if let Err(err) = filter.process_write_buf(io, &mut buffer, 0) {
st.io_stopped(Some(err));
}
} }
} }

View file

@ -1,7 +1,7 @@
//! utilities and helpers for testing //! utilities and helpers for testing
use std::cell::{Cell, RefCell}; use std::cell::{Cell, RefCell};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker}; use std::task::{ready, Context, Poll, Waker};
use std::{any, cmp, fmt, future::Future, io, mem, net, pin::Pin, rc::Rc}; use std::{any, cmp, fmt, future::Future, io, mem, net, pin::Pin, rc::Rc};
use ntex_bytes::{Buf, BufMut, Bytes, BytesVec}; use ntex_bytes::{Buf, BufMut, Bytes, BytesVec};
@ -480,9 +480,12 @@ impl Future for WriteTask {
match this.state.poll_ready(cx) { match this.state.poll_ready(cx) {
Poll::Ready(WriteStatus::Ready) => { Poll::Ready(WriteStatus::Ready) => {
// flush framed instance // flush framed instance
match flush_io(&this.io, &this.state, cx) { match ready!(flush_io(&this.io, &this.state, cx)) {
Poll::Pending | Poll::Ready(true) => Poll::Pending, Ok(()) => Poll::Pending,
Poll::Ready(false) => Poll::Ready(()), Err(e) => {
this.state.close(Some(e));
Poll::Ready(())
}
} }
} }
Poll::Ready(WriteStatus::Timeout(time)) => { Poll::Ready(WriteStatus::Timeout(time)) => {
@ -527,18 +530,19 @@ impl Future for WriteTask {
Shutdown::None => { Shutdown::None => {
// flush write buffer // flush write buffer
match flush_io(&this.io, &this.state, cx) { match flush_io(&this.io, &this.state, cx) {
Poll::Ready(true) => { Poll::Ready(Ok(())) => {
*st = Shutdown::Flushed; *st = Shutdown::Flushed;
continue; continue;
} }
Poll::Ready(false) => { Poll::Ready(Err(err)) => {
log::trace!( log::trace!(
"write task is closed with err during flush" "write task is closed with err during flush {:?}",
err
); );
this.state.close(None); this.state.close(Some(err));
return Poll::Ready(()); return Poll::Ready(());
} }
_ => (), Poll::Pending => (),
} }
} }
Shutdown::Flushed => { Shutdown::Flushed => {
@ -596,58 +600,54 @@ pub(super) fn flush_io(
io: &IoTest, io: &IoTest,
state: &WriteContext, state: &WriteContext,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<bool> { ) -> Poll<io::Result<()>> {
let mut buf = if let Some(buf) = state.get_write_buf() { state.with_buf(|buf| {
buf if let Some(buf) = buf {
} else { let len = buf.len();
return Poll::Ready(true);
};
let len = buf.len();
if len != 0 { if len != 0 {
log::trace!("flushing framed transport: {}", len); log::trace!("flushing framed transport: {}", len);
let mut written = 0; let mut written = 0;
while written < len { let result = loop {
match io.poll_write_buf(cx, &buf[written..]) { break match io.poll_write_buf(cx, &buf[written..]) {
Poll::Ready(Ok(n)) => { Poll::Ready(Ok(n)) => {
if n == 0 { if n == 0 {
log::trace!("disconnected during flush, written {}", written); log::trace!(
let _ = state.release_write_buf(buf); "disconnected during flush, written {}",
state.close(Some(io::Error::new( written
io::ErrorKind::WriteZero, );
"failed to write frame to transport", Poll::Ready(Err(io::Error::new(
))); io::ErrorKind::WriteZero,
return Poll::Ready(false); "failed to write frame to transport",
} else { )))
written += n } else {
} written += n;
} if written == len {
Poll::Pending => break, buf.clear();
Poll::Ready(Err(e)) => { Poll::Ready(Ok(()))
log::trace!("error during flush: {}", e); } else {
let _ = state.release_write_buf(buf); continue;
state.close(Some(e)); }
return Poll::Ready(false); }
} }
Poll::Pending => {
// remove written data
buf.advance(written);
Poll::Pending
}
Poll::Ready(Err(e)) => {
log::trace!("error during flush: {}", e);
Poll::Ready(Err(e))
}
};
};
log::trace!("flushed {} bytes", written);
return result;
} }
} }
log::trace!("flushed {} bytes", written); Poll::Ready(Ok(()))
})
// remove written data
if written == len {
buf.clear();
let _ = state.release_write_buf(buf);
Poll::Ready(true)
} else {
buf.advance(written);
let _ = state.release_write_buf(buf);
Poll::Pending
}
} else {
let _ = state.release_write_buf(buf);
Poll::Ready(true)
}
} }
#[cfg(test)] #[cfg(test)]

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.2.2] - 2023-01-26
* Update io api usage
## [0.2.0] - 2023-01-04 ## [0.2.0] - 2023-01-04
* Release * Release

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-tokio" name = "ntex-tokio"
version = "0.2.1" version = "0.2.2"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "tokio intergration for ntex framework" description = "tokio intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -17,7 +17,7 @@ path = "src/lib.rs"
[dependencies] [dependencies]
ntex-bytes = "0.1.19" ntex-bytes = "0.1.19"
ntex-io = "0.2.1" ntex-io = "0.2.4"
ntex-util = "0.2.0" ntex-util = "0.2.0"
log = "0.4" log = "0.4"
pin-project-lite = "0.2" pin-project-lite = "0.2"

View file

@ -144,10 +144,17 @@ impl Future for WriteTask {
} }
} }
// flush framed instance // flush io stream
match flush_io(&mut *this.io.borrow_mut(), &this.state, cx) { match ready!(this.state.with_buf(|buf| flush_io(
Poll::Pending | Poll::Ready(true) => Poll::Pending, &mut *this.io.borrow_mut(),
Poll::Ready(false) => Poll::Ready(()), buf,
cx
))) {
Ok(()) => Poll::Pending,
Err(e) => {
this.state.close(Some(e));
Poll::Ready(())
}
} }
} }
Poll::Ready(WriteStatus::Timeout(time)) => { Poll::Ready(WriteStatus::Timeout(time)) => {
@ -194,19 +201,21 @@ impl Future for WriteTask {
match st { match st {
Shutdown::None => { Shutdown::None => {
// flush write buffer // flush write buffer
match flush_io(&mut *this.io.borrow_mut(), &this.state, cx) { let mut io = this.io.borrow_mut();
Poll::Ready(true) => { match this.state.with_buf(|buf| flush_io(&mut *io, buf, cx)) {
Poll::Ready(Ok(())) => {
*st = Shutdown::Flushed; *st = Shutdown::Flushed;
continue; continue;
} }
Poll::Ready(false) => { Poll::Ready(Err(err)) => {
log::trace!( log::trace!(
"write task is closed with err during flush" "write task is closed with err during flush, {:?}",
err
); );
this.state.close(None); this.state.close(Some(err));
return Poll::Ready(()); return Poll::Ready(());
} }
_ => (), Poll::Pending => (),
} }
} }
Shutdown::Flushed => { Shutdown::Flushed => {
@ -272,80 +281,64 @@ impl Future for WriteTask {
/// Flush write buffer to underlying I/O stream. /// Flush write buffer to underlying I/O stream.
pub(super) fn flush_io<T: AsyncRead + AsyncWrite + Unpin>( pub(super) fn flush_io<T: AsyncRead + AsyncWrite + Unpin>(
io: &mut T, io: &mut T,
state: &WriteContext, buf: &mut Option<BytesVec>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<bool> { ) -> Poll<io::Result<()>> {
let mut buf = if let Some(buf) = state.get_write_buf() { if let Some(buf) = buf {
buf let len = buf.len();
} else {
return Poll::Ready(true);
};
let len = buf.len();
let pool = state.memory_pool();
if len != 0 { if len != 0 {
// log::trace!("flushing framed transport: {:?}", buf.len()); // log::trace!("flushing framed transport: {:?}", buf.len());
let mut written = 0; let mut written = 0;
while written < len { let result = loop {
match Pin::new(&mut *io).poll_write(cx, &buf[written..]) { break match Pin::new(&mut *io).poll_write(cx, &buf[written..]) {
Poll::Pending => break, Poll::Ready(Ok(n)) => {
Poll::Ready(Ok(n)) => { if n == 0 {
if n == 0 { log::trace!("Disconnected during flush, written {}", written);
log::trace!("Disconnected during flush, written {}", written); Poll::Ready(Err(io::Error::new(
pool.release_write_buf(buf); io::ErrorKind::WriteZero,
state.close(Some(io::Error::new( "failed to write frame to transport",
io::ErrorKind::WriteZero, )))
"failed to write frame to transport", } else {
))); written += n;
return Poll::Ready(false); if written == len {
} else { buf.clear();
written += n Poll::Ready(Ok(()))
} else {
continue;
}
}
}
Poll::Pending => {
// remove written data
buf.advance(written);
Poll::Pending
}
Poll::Ready(Err(e)) => {
log::trace!("Error during flush: {}", e);
Poll::Ready(Err(e))
}
};
};
// log::trace!("flushed {} bytes", written);
// flush
return if written > 0 {
match Pin::new(&mut *io).poll_flush(cx) {
Poll::Ready(Ok(_)) => result,
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => {
log::trace!("error during flush: {}", e);
Poll::Ready(Err(e))
} }
} }
Poll::Ready(Err(e)) => { } else {
log::trace!("Error during flush: {}", e); result
pool.release_write_buf(buf); };
state.close(Some(e));
return Poll::Ready(false);
}
}
} }
// log::trace!("flushed {} bytes", written);
// remove written data
let result = if written == len {
buf.clear();
if let Err(e) = state.release_write_buf(buf) {
state.close(Some(e));
return Poll::Ready(false);
}
Poll::Ready(true)
} else {
buf.advance(written);
if let Err(e) = state.release_write_buf(buf) {
state.close(Some(e));
return Poll::Ready(false);
}
Poll::Pending
};
// flush
match Pin::new(&mut *io).poll_flush(cx) {
Poll::Ready(Ok(_)) => result,
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => {
log::trace!("error during flush: {}", e);
state.close(Some(e));
Poll::Ready(false)
}
}
} else if let Err(e) = state.release_write_buf(buf) {
state.close(Some(e));
Poll::Ready(false)
} else {
Poll::Ready(true)
} }
Poll::Ready(Ok(()))
} }
pub struct TokioIoBoxed(IoBoxed); pub struct TokioIoBoxed(IoBoxed);
@ -546,10 +539,17 @@ mod unixstream {
} }
} }
// flush framed instance // flush io stream
match flush_io(&mut *this.io.borrow_mut(), &this.state, cx) { match ready!(this.state.with_buf(|buf| flush_io(
Poll::Pending | Poll::Ready(true) => Poll::Pending, &mut *this.io.borrow_mut(),
Poll::Ready(false) => Poll::Ready(()), buf,
cx
))) {
Ok(()) => Poll::Pending,
Err(e) => {
this.state.close(Some(e));
Poll::Ready(())
}
} }
} }
Poll::Ready(WriteStatus::Timeout(time)) => { Poll::Ready(WriteStatus::Timeout(time)) => {
@ -587,20 +587,22 @@ mod unixstream {
match st { match st {
Shutdown::None => { Shutdown::None => {
// flush write buffer // flush write buffer
match flush_io(&mut *this.io.borrow_mut(), &this.state, cx) let mut io = this.io.borrow_mut();
match this.state.with_buf(|buf| flush_io(&mut *io, buf, cx))
{ {
Poll::Ready(true) => { Poll::Ready(Ok(())) => {
*st = Shutdown::Flushed; *st = Shutdown::Flushed;
continue; continue;
} }
Poll::Ready(false) => { Poll::Ready(Err(err)) => {
log::trace!( log::trace!(
"write task is closed with err during flush" "write task is closed with err during flush, {:?}",
err
); );
this.state.close(None); this.state.close(Some(err));
return Poll::Ready(()); return Poll::Ready(());
} }
_ => (), Poll::Pending => (),
} }
} }
Shutdown::Flushed => { Shutdown::Flushed => {

View file

@ -646,7 +646,7 @@ where
} }
} }
None => { None => {
trace!("response payload eof"); trace!("response payload eof {:?}", self.flags);
if let Err(err) = self.io.encode(Message::Chunk(None), &self.codec) { if let Err(err) = self.io.encode(Message::Chunk(None), &self.codec) {
self.error = Some(DispatchError::Encode(err)); self.error = Some(DispatchError::Encode(err));
Some(State::Stop) Some(State::Stop)
@ -785,7 +785,7 @@ mod tests {
use crate::http::{body, Request, ResponseHead, StatusCode}; use crate::http::{body, Request, ResponseHead, StatusCode};
use crate::io::{self as nio, Base}; use crate::io::{self as nio, Base};
use crate::service::{boxed, fn_service, IntoService}; use crate::service::{boxed, fn_service, IntoService};
use crate::util::{lazy, stream_recv, Bytes, BytesMut}; use crate::util::{lazy, poll_fn, stream_recv, Bytes, BytesMut};
use crate::{codec::Decoder, testing::Io, time::sleep, time::Millis, time::Seconds}; use crate::{codec::Decoder, testing::Io, time::sleep, time::Millis, time::Seconds};
const BUFFER_SIZE: usize = 32_768; const BUFFER_SIZE: usize = 32_768;
@ -847,7 +847,6 @@ mod tests {
decoder.decode(buf).unwrap().unwrap() decoder.decode(buf).unwrap().unwrap()
} }
#[cfg(feature = "tokio")]
#[crate::rt_test] #[crate::rt_test]
async fn test_on_request() { async fn test_on_request() {
let (client, server) = Io::create(); let (client, server) = Io::create();
@ -882,13 +881,12 @@ mod tests {
); );
sleep(Millis(50)).await; sleep(Millis(50)).await;
let _ = lazy(|cx| Pin::new(&mut h1).poll(cx)).await; let _ = lazy(|cx| Pin::new(&mut h1).poll(cx)).await;
sleep(Millis(50)).await;
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready());
sleep(Millis(50)).await; sleep(Millis(50)).await;
client.local_buffer(|buf| assert_eq!(&buf[..15], b"HTTP/1.0 200 OK")); client.local_buffer(|buf| assert_eq!(&buf[..15], b"HTTP/1.0 200 OK"));
client.close().await; client.close().await;
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready());
assert!(data.get()); assert!(data.get());
} }
@ -906,7 +904,7 @@ mod tests {
let _ = lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready(); let _ = lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready();
sleep(Millis(50)).await; sleep(Millis(50)).await;
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready()); assert!(poll_fn(|cx| Pin::new(&mut h1).poll(cx)).await.is_ok());
assert!(h1.inner.io.is_closed()); assert!(h1.inner.io.is_closed());
sleep(Millis(50)).await; sleep(Millis(50)).await;
@ -1233,13 +1231,9 @@ mod tests {
Err::<Response<()>, _>(io::Error::new(io::ErrorKind::Other, "error")) Err::<Response<()>, _>(io::Error::new(io::ErrorKind::Other, "error"))
}) })
}); });
sleep(Millis(50)).await;
// required because io shutdown is async oper // required because io shutdown is async oper
let _ = lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready(); assert!(poll_fn(|cx| Pin::new(&mut h1).poll(cx)).await.is_ok());
sleep(Millis(50)).await;
assert!(lazy(|cx| Pin::new(&mut h1).poll(cx)).await.is_ready());
sleep(Millis(50)).await;
assert!(h1.inner.io.is_closed()); assert!(h1.inner.io.is_closed());
let buf = client.local_buffer(|buf| buf.split()); let buf = client.local_buffer(|buf| buf.split());
assert_eq!(&buf[..28], b"HTTP/1.1 500 Internal Server"); assert_eq!(&buf[..28], b"HTTP/1.1 500 Internal Server");