Add Io::take() helper method

This commit is contained in:
Nikolay Kim 2022-01-16 12:06:06 +06:00
parent a563aca46b
commit 429073f9ff
6 changed files with 57 additions and 25 deletions

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.1.4] - 2022-01-xx
* Add Io::take() method
## [0.1.3] - 2022-01-12 ## [0.1.3] - 2022-01-12
* Refactor Filter trait, fix read buffer processing * Refactor Filter trait, fix read buffer processing

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-io" name = "ntex-io"
version = "0.1.3" version = "0.1.4"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames" description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View file

@ -296,6 +296,37 @@ impl<F> Io<F> {
pub fn set_disconnect_timeout(&self, timeout: Millis) { pub fn set_disconnect_timeout(&self, timeout: Millis) {
self.0 .0.disconnect_timeout.set(timeout); self.0 .0.disconnect_timeout.set(timeout);
} }
#[inline]
/// Clone current io object.
///
/// Current io object becomes closed.
pub fn take(&mut self) -> Self {
let inner = Rc::new(IoState {
pool: self.0 .0.pool.clone(),
flags: Cell::new(
Flags::DSP_STOP
| Flags::IO_STOPPED
| Flags::IO_STOPPING
| Flags::IO_STOPPING_FILTERS,
),
error: Cell::new(None),
disconnect_timeout: Cell::new(Millis::ONE_SEC),
dispatch_task: LocalWaker::new(),
read_task: LocalWaker::new(),
write_task: LocalWaker::new(),
read_buf: Cell::new(None),
write_buf: Cell::new(None),
filter: Cell::new(NullFilter::get()),
handle: Cell::new(None),
on_disconnect: RefCell::new(Vec::new()),
keepalive: Cell::new(None),
});
let state = mem::replace(&mut self.0, IoRef(inner));
let filter = mem::replace(&mut self.1, FilterItem::Ptr(ptr::null_mut()));
Self(state, filter)
}
} }
impl<F> Io<F> { impl<F> Io<F> {

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.5.10] - 2022-01-xx
* http: Use Io::take() method for http/1 dispatcher
## [0.5.9] - 2022-01-12 ## [0.5.9] - 2022-01-12
* Update ws::WsTransport * Update ws::WsTransport

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex" name = "ntex"
version = "0.5.9" version = "0.5.10"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services" description = "Framework for composable network services"
readme = "README.md" readme = "README.md"
@ -53,7 +53,7 @@ ntex-util = "0.1.9"
ntex-bytes = "0.1.9" ntex-bytes = "0.1.9"
ntex-tls = "0.1.2" ntex-tls = "0.1.2"
ntex-rt = "0.4.1" ntex-rt = "0.4.1"
ntex-io = "0.1.3" ntex-io = "0.1.4"
ntex-tokio = "0.1.2" ntex-tokio = "0.1.2"
ntex-async-std = { version = "0.1.0", optional = true } ntex-async-std = { version = "0.1.0", optional = true }

View file

@ -66,7 +66,7 @@ pin_project_lite::pin_project! {
} }
struct DispatcherInner<F, S, B, X, U> { struct DispatcherInner<F, S, B, X, U> {
io: Option<Io<F>>, io: Io<F>,
flags: Flags, flags: Flags,
codec: Codec, codec: Codec,
state: IoRef, state: IoRef,
@ -100,10 +100,10 @@ where
call: CallState::None, call: CallState::None,
st: State::ReadRequest, st: State::ReadRequest,
inner: DispatcherInner { inner: DispatcherInner {
io,
codec, codec,
state, state,
config, config,
io: Some(io),
flags: Flags::KEEPALIVE_REG, flags: Flags::KEEPALIVE_REG,
error: None, error: None,
payload: None, payload: None,
@ -231,7 +231,7 @@ where
log::trace!("trying to read http message"); log::trace!("trying to read http message");
// decode incoming bytes stream // decode incoming bytes stream
match this.inner.io().poll_recv(&this.inner.codec, cx) { match this.inner.io.poll_recv(&this.inner.codec, cx) {
Poll::Ready(Ok((mut req, pl))) => { Poll::Ready(Ok((mut req, pl))) => {
log::trace!( log::trace!(
"http message is received: {:?} and payload {:?}", "http message is received: {:?} and payload {:?}",
@ -265,7 +265,7 @@ where
// slow-request first request // slow-request first request
this.inner.flags.insert(Flags::STARTED); this.inner.flags.insert(Flags::STARTED);
this.inner.flags.remove(Flags::KEEPALIVE_REG); this.inner.flags.remove(Flags::KEEPALIVE_REG);
this.inner.io().remove_keepalive_timer(); this.inner.io.remove_keepalive_timer();
if upgrade { if upgrade {
// Handle UPGRADE request // Handle UPGRADE request
@ -294,8 +294,7 @@ where
} }
} }
Poll::Ready(Err(RecvError::WriteBackpressure)) => { Poll::Ready(Err(RecvError::WriteBackpressure)) => {
if let Err(err) = ready!(this.inner.io().poll_flush(cx, false)) if let Err(err) = ready!(this.inner.io.poll_flush(cx, false)) {
{
log::trace!("peer is gone with {:?}", err); log::trace!("peer is gone with {:?}", err);
*this.st = State::Stop; *this.st = State::Stop;
this.inner.error = Some(DispatchError::PeerGone(Some(err))); this.inner.error = Some(DispatchError::PeerGone(Some(err)));
@ -337,7 +336,7 @@ where
{ {
this.inner.flags.insert(Flags::KEEPALIVE_REG); this.inner.flags.insert(Flags::KEEPALIVE_REG);
this.inner this.inner
.io() .io
.start_keepalive_timer(this.inner.config.keep_alive); .start_keepalive_timer(this.inner.config.keep_alive);
} }
return Poll::Pending; return Poll::Pending;
@ -355,7 +354,7 @@ where
} }
// send response body // send response body
State::SendPayload { ref mut body } => { State::SendPayload { ref mut body } => {
if this.inner.io().is_closed() { if this.inner.io.is_closed() {
*this.st = State::Stop; *this.st = State::Stop;
} else { } else {
if let Poll::Ready(Err(err)) = this.inner.poll_request_payload(cx) { if let Poll::Ready(Err(err)) = this.inner.poll_request_payload(cx) {
@ -363,7 +362,7 @@ where
this.inner.flags.insert(Flags::SENDPAYLOAD_AND_STOP); this.inner.flags.insert(Flags::SENDPAYLOAD_AND_STOP);
} }
loop { loop {
let _ = ready!(this.inner.io().poll_flush(cx, false)); let _ = ready!(this.inner.io.poll_flush(cx, false));
let item = ready!(body.poll_next_chunk(cx)); let item = ready!(body.poll_next_chunk(cx));
if let Some(st) = this.inner.send_payload(item) { if let Some(st) = this.inner.send_payload(item) {
*this.st = st; *this.st = st;
@ -376,7 +375,7 @@ where
State::Upgrade(ref mut req) => { State::Upgrade(ref mut req) => {
log::trace!("switching to upgrade service"); log::trace!("switching to upgrade service");
let io = this.inner.io.take().unwrap(); let io = this.inner.io.take();
let req = req.take().unwrap(); let req = req.take().unwrap();
// Handle UPGRADE request // Handle UPGRADE request
@ -391,9 +390,7 @@ where
State::Stop => { State::Stop => {
this.inner.unregister_keepalive(); this.inner.unregister_keepalive();
return if let Err(e) = return if let Err(e) = ready!(this.inner.io.poll_shutdown(cx)) {
ready!(this.inner.io.as_ref().unwrap().poll_shutdown(cx))
{
// get io error // get io error
if let Some(e) = this.inner.error.take() { if let Some(e) = this.inner.error.take() {
Poll::Ready(Err(e)) Poll::Ready(Err(e))
@ -416,10 +413,6 @@ where
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
B: MessageBody, B: MessageBody,
{ {
fn io(&self) -> &Io<T> {
self.io.as_ref().unwrap()
}
fn switch_to_read_request(&mut self) -> State<B> { fn switch_to_read_request(&mut self) -> State<B> {
// connection is not keep-alive, disconnect // connection is not keep-alive, disconnect
if !self.flags.contains(Flags::KEEPALIVE) || !self.codec.keepalive_enabled() { if !self.flags.contains(Flags::KEEPALIVE) || !self.codec.keepalive_enabled() {
@ -432,7 +425,7 @@ where
fn unregister_keepalive(&mut self) { fn unregister_keepalive(&mut self) {
if self.flags.contains(Flags::KEEPALIVE) { if self.flags.contains(Flags::KEEPALIVE) {
self.io().remove_keepalive_timer(); self.io.remove_keepalive_timer();
self.flags.remove(Flags::KEEPALIVE); self.flags.remove(Flags::KEEPALIVE);
} }
} }
@ -468,7 +461,7 @@ where
State::Stop State::Stop
} else { } else {
let result = self let result = self
.io() .io
.encode(Message::Item((msg, body.size())), &self.codec) .encode(Message::Item((msg, body.size())), &self.codec)
.map_err(|err| { .map_err(|err| {
if let Some(mut payload) = self.payload.take() { if let Some(mut payload) = self.payload.take() {
@ -505,7 +498,7 @@ where
match item { match item {
Some(Ok(item)) => { Some(Ok(item)) => {
trace!("got response chunk: {:?}", item.len()); trace!("got response chunk: {:?}", item.len());
match self.io().encode(Message::Chunk(Some(item)), &self.codec) { match self.io.encode(Message::Chunk(Some(item)), &self.codec) {
Ok(_) => None, Ok(_) => None,
Err(err) => { Err(err) => {
self.error = Some(DispatchError::Encode(err)); self.error = Some(DispatchError::Encode(err));
@ -515,7 +508,7 @@ where
} }
None => { None => {
trace!("response payload eof"); trace!("response payload eof");
if let Err(err) = self.io().encode(Message::Chunk(None), &self.codec) { if let Err(err) = self.io.encode(Message::Chunk(None), &self.codec) {
self.error = Some(DispatchError::Encode(err)); self.error = Some(DispatchError::Encode(err));
Some(State::Stop) Some(State::Stop)
} else if self.flags.contains(Flags::SENDPAYLOAD_AND_STOP) { } else if self.flags.contains(Flags::SENDPAYLOAD_AND_STOP) {
@ -547,7 +540,7 @@ where
}; };
match payload.1.poll_data_required(cx) { match payload.1.poll_data_required(cx) {
PayloadStatus::Read => { PayloadStatus::Read => {
let io = self.io.as_ref().unwrap(); let io = &self.io;
// read request payload // read request payload
let mut updated = false; let mut updated = false;