Refactor ntex-io (#164)

* Refactor Io and Filter types
This commit is contained in:
Nikolay Kim 2023-01-23 14:42:00 +06:00 committed by GitHub
parent 8cbd8758a5
commit 83d05d81ef
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
55 changed files with 1615 additions and 1495 deletions

View file

@ -1,4 +1,4 @@
use std::{any, future::Future, io, pin::Pin, task::Context, task::Poll};
use std::{any, cell::RefCell, future::Future, io, pin::Pin, task::Context, task::Poll};
use async_std::io::{Read, Write};
use ntex_bytes::{Buf, BufMut, BytesVec};
@ -30,35 +30,31 @@ impl Handle for TcpStream {
/// Read io task
struct ReadTask {
io: TcpStream,
io: RefCell<TcpStream>,
state: ReadContext,
}
impl ReadTask {
/// Create new read io task
fn new(io: TcpStream, state: ReadContext) -> Self {
Self { io, state }
Self {
state,
io: RefCell::new(io),
}
}
}
impl Future for ReadTask {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_ref();
loop {
this.state.with_buf(|buf, hw, lw| {
match ready!(this.state.poll_ready(cx)) {
ReadStatus::Ready => {
let pool = this.state.memory_pool();
let mut buf = this.state.get_read_buf();
let io = &mut this.io;
let (hw, lw) = pool.read_params().unpack();
// read data from socket
let mut new_bytes = 0;
let mut close = false;
let mut pending = false;
let mut io = self.io.borrow_mut();
loop {
// make sure we've got room
let remaining = buf.remaining_mut();
@ -66,52 +62,31 @@ impl Future for ReadTask {
buf.reserve(hw - remaining);
}
match poll_read_buf(Pin::new(&mut io.0), cx, &mut buf) {
Poll::Pending => {
pending = true;
break;
}
return match poll_read_buf(Pin::new(&mut io.0), cx, buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(n)) => {
if n == 0 {
log::trace!("async-std stream is disconnected");
close = true;
Poll::Ready(Ok(()))
} else if buf.len() < hw {
continue;
} else {
new_bytes += n;
if new_bytes <= hw {
continue;
}
Poll::Pending
}
break;
}
Poll::Ready(Err(err)) => {
log::trace!("read task failed on io {:?}", err);
this.state.release_read_buf(buf, new_bytes);
this.state.close(Some(err));
return Poll::Ready(());
log::trace!("async-std read task failed on io {:?}", err);
Poll::Ready(Err(err))
}
}
};
}
if new_bytes == 0 && close {
this.state.close(None);
return Poll::Ready(());
}
this.state.release_read_buf(buf, new_bytes);
return if close {
this.state.close(None);
Poll::Ready(())
} else if pending {
Poll::Pending
} else {
continue;
};
}
ReadStatus::Terminate => {
log::trace!("read task is instructed to shutdown");
return Poll::Ready(());
Poll::Ready(Ok(()))
}
}
}
})
}
}
@ -358,10 +333,6 @@ pub fn poll_read_buf<T: Read>(
cx: &mut Context<'_>,
buf: &mut BytesVec,
) -> Poll<io::Result<usize>> {
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));
}
let dst = unsafe { &mut *(buf.chunk_mut() as *mut _ as *mut [u8]) };
let n = ready!(io.poll_read(cx, dst))?;
@ -389,35 +360,31 @@ mod unixstream {
/// Read io task
struct ReadTask {
io: UnixStream,
io: RefCell<UnixStream>,
state: ReadContext,
}
impl ReadTask {
/// Create new read io task
fn new(io: UnixStream, state: ReadContext) -> Self {
Self { io, state }
Self {
state,
io: RefCell::new(io),
}
}
}
impl Future for ReadTask {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_ref();
loop {
this.state.with_buf(|buf, hw, lw| {
match ready!(this.state.poll_ready(cx)) {
ReadStatus::Ready => {
let pool = this.state.memory_pool();
let mut buf = this.state.get_read_buf();
let io = &mut this.io;
let (hw, lw) = pool.read_params().unpack();
// read data from socket
let mut new_bytes = 0;
let mut close = false;
let mut pending = false;
let mut io = this.io.borrow_mut();
loop {
// make sure we've got room
let remaining = buf.remaining_mut();
@ -425,52 +392,31 @@ mod unixstream {
buf.reserve(hw - remaining);
}
match poll_read_buf(Pin::new(&mut io.0), cx, &mut buf) {
Poll::Pending => {
pending = true;
break;
}
return match poll_read_buf(Pin::new(&mut io.0), cx, buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(n)) => {
if n == 0 {
log::trace!("async-std stream is disconnected");
close = true;
Poll::Ready(Ok(()))
} else if buf.len() < hw {
continue;
} else {
new_bytes += n;
if new_bytes <= hw {
continue;
}
Poll::Pending
}
break;
}
Poll::Ready(Err(err)) => {
log::trace!("read task failed on io {:?}", err);
this.state.release_read_buf(buf, new_bytes);
this.state.close(Some(err));
return Poll::Ready(());
Poll::Ready(Err(err))
}
}
};
}
if new_bytes == 0 && close {
this.state.close(None);
return Poll::Ready(());
}
this.state.release_read_buf(buf, new_bytes);
return if close {
this.state.close(None);
Poll::Ready(())
} else if pending {
Poll::Pending
} else {
continue;
};
}
ReadStatus::Terminate => {
log::trace!("read task is instructed to shutdown");
return Poll::Ready(());
Poll::Ready(Ok(()))
}
}
}
})
}
}