cleanup framed write task

This commit is contained in:
Nikolay Kim 2021-02-20 10:40:14 +06:00
parent 40b0d5e4ab
commit a1296fc059
5 changed files with 33 additions and 27 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.2.0-b.13] - 2021-02-20
* http: Refactor date service
## [0.2.0-b.12] - 2021-02-18
* http: Fix KeepAlive::Os support for h1 dispatcher

View file

@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.2.0-b.12"
version = "0.2.0-b.13"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"

View file

@ -47,15 +47,9 @@ where
Poll::Pending
} else {
let mut io = self.io.borrow_mut();
let result = self.state.with_read_buf(|buf| read(&mut *io, buf, cx));
match result {
Ok(None) => {
self.state.enable_read_backpressure();
self.state.update_read_task(true, cx.waker());
Poll::Pending
}
Ok(Some(updated)) => {
self.state.update_read_task(updated, cx.waker());
match self.state.with_read_buf(|buf| read(&mut *io, buf, cx)) {
Ok(res) => {
self.state.update_read_task(res, cx.waker());
Poll::Pending
}
Err(err) => {
@ -67,11 +61,18 @@ where
}
}
#[derive(Copy, Clone)]
pub(super) enum ReadResult {
Pending,
Updated,
BackPressure,
}
pub(super) fn read<T>(
io: &mut T,
buf: &mut BytesMut,
cx: &mut Context<'_>,
) -> Result<Option<bool>, Option<io::Error>>
) -> Result<ReadResult, Option<io::Error>>
where
T: AsyncRead + AsyncWrite + Unpin,
{
@ -82,7 +83,7 @@ where
}
// read all data from socket
let mut updated = false;
let mut result = ReadResult::Pending;
loop {
match Pin::new(&mut *io).poll_read_buf(cx, buf) {
Poll::Pending => break,
@ -93,10 +94,10 @@ where
} else {
if buf.len() > HW {
log::trace!("buffer is too large {}, pause", buf.len());
return Ok(None);
return Ok(ReadResult::BackPressure);
}
updated = true;
result = ReadResult::Updated;
}
}
Poll::Ready(Err(err)) => {
@ -106,5 +107,5 @@ where
}
}
Ok(Some(updated))
Ok(result)
}

View file

@ -7,6 +7,7 @@ use either::Either;
use futures::{future::poll_fn, ready};
use crate::codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed, FramedParts};
use crate::framed::read::ReadResult;
use crate::framed::write::flush;
use crate::task::LocalWaker;
@ -240,13 +241,6 @@ impl State {
self.insert_flags(Flags::WR_NOT_READY)
}
#[inline]
/// Enable read back-persurre
pub(crate) fn enable_read_backpressure(&self) {
log::trace!("enable read back-pressure");
self.insert_flags(Flags::RD_BUF_FULL)
}
#[inline]
/// Check if keep-alive timeout occured
pub fn is_keepalive(&self) -> bool {
@ -325,11 +319,19 @@ impl State {
self.0.write_task.register(waker);
}
pub(super) fn update_read_task(&self, updated: bool, waker: &Waker) {
if updated {
pub(super) fn update_read_task(&self, result: ReadResult, waker: &Waker) {
match result {
ReadResult::Updated => {
self.insert_flags(Flags::RD_READY);
self.0.dispatch_task.wake();
}
ReadResult::BackPressure => {
log::trace!("enable read back-pressure");
self.insert_flags(Flags::RD_READY | Flags::RD_BUF_FULL);
self.0.dispatch_task.wake();
}
ReadResult::Pending => {}
}
self.0.read_task.register(waker);
}

View file

@ -150,7 +150,6 @@ impl<S, X, U> DispatcherConfig<S, X, U> {
}
}
// "Sun, 06 Nov 1994 08:49:37 GMT".len()
const DATE_VALUE_LENGTH_HDR: usize = 39;
const DATE_VALUE_DEFAULT: [u8; DATE_VALUE_LENGTH_HDR] = [
b'd', b'a', b't', b'e', b':', b' ', b'0', b'0', b'0', b'0', b'0', b'0', b'0', b'0',