diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index a85fd21c..bdcc1500 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.1.4] - 2022-01-xx + +* Add Io::take() method + ## [0.1.3] - 2022-01-12 * Refactor Filter trait, fix read buffer processing diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 517378a1..cc1f1535 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "0.1.3" +version = "0.1.4" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-io/src/io.rs b/ntex-io/src/io.rs index f63dac1a..50134fdd 100644 --- a/ntex-io/src/io.rs +++ b/ntex-io/src/io.rs @@ -296,6 +296,37 @@ impl Io { pub fn set_disconnect_timeout(&self, timeout: Millis) { self.0 .0.disconnect_timeout.set(timeout); } + + #[inline] + /// Clone current io object. + /// + /// Current io object becomes closed. + pub fn take(&mut self) -> Self { + let inner = Rc::new(IoState { + pool: self.0 .0.pool.clone(), + flags: Cell::new( + Flags::DSP_STOP + | Flags::IO_STOPPED + | Flags::IO_STOPPING + | Flags::IO_STOPPING_FILTERS, + ), + error: Cell::new(None), + disconnect_timeout: Cell::new(Millis::ONE_SEC), + dispatch_task: LocalWaker::new(), + read_task: LocalWaker::new(), + write_task: LocalWaker::new(), + read_buf: Cell::new(None), + write_buf: Cell::new(None), + filter: Cell::new(NullFilter::get()), + handle: Cell::new(None), + on_disconnect: RefCell::new(Vec::new()), + keepalive: Cell::new(None), + }); + + let state = mem::replace(&mut self.0, IoRef(inner)); + let filter = mem::replace(&mut self.1, FilterItem::Ptr(ptr::null_mut())); + Self(state, filter) + } } impl Io { diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index f2236880..8b52c599 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.5.10] - 2022-01-xx + +* http: Use Io::take() method for http/1 dispatcher + ## [0.5.9] - 2022-01-12 * Update ws::WsTransport diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 805f72ee..6cbb437a 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "0.5.9" +version = "0.5.10" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" @@ -53,7 +53,7 @@ ntex-util = "0.1.9" ntex-bytes = "0.1.9" ntex-tls = "0.1.2" ntex-rt = "0.4.1" -ntex-io = "0.1.3" +ntex-io = "0.1.4" ntex-tokio = "0.1.2" ntex-async-std = { version = "0.1.0", optional = true } diff --git a/ntex/src/http/h1/dispatcher.rs b/ntex/src/http/h1/dispatcher.rs index 1414ea2d..7ac50b85 100644 --- a/ntex/src/http/h1/dispatcher.rs +++ b/ntex/src/http/h1/dispatcher.rs @@ -66,7 +66,7 @@ pin_project_lite::pin_project! { } struct DispatcherInner { - io: Option>, + io: Io, flags: Flags, codec: Codec, state: IoRef, @@ -100,10 +100,10 @@ where call: CallState::None, st: State::ReadRequest, inner: DispatcherInner { + io, codec, state, config, - io: Some(io), flags: Flags::KEEPALIVE_REG, error: None, payload: None, @@ -231,7 +231,7 @@ where log::trace!("trying to read http message"); // decode incoming bytes stream - match this.inner.io().poll_recv(&this.inner.codec, cx) { + match this.inner.io.poll_recv(&this.inner.codec, cx) { Poll::Ready(Ok((mut req, pl))) => { log::trace!( "http message is received: {:?} and payload {:?}", @@ -265,7 +265,7 @@ where // slow-request first request this.inner.flags.insert(Flags::STARTED); this.inner.flags.remove(Flags::KEEPALIVE_REG); - this.inner.io().remove_keepalive_timer(); + this.inner.io.remove_keepalive_timer(); if upgrade { // Handle UPGRADE request @@ -294,8 +294,7 @@ where } } Poll::Ready(Err(RecvError::WriteBackpressure)) => { - if let Err(err) = ready!(this.inner.io().poll_flush(cx, false)) - { + if let Err(err) = ready!(this.inner.io.poll_flush(cx, false)) { log::trace!("peer is gone with {:?}", err); *this.st = State::Stop; this.inner.error = Some(DispatchError::PeerGone(Some(err))); @@ -337,7 +336,7 @@ where { this.inner.flags.insert(Flags::KEEPALIVE_REG); this.inner - .io() + .io .start_keepalive_timer(this.inner.config.keep_alive); } return Poll::Pending; @@ -355,7 +354,7 @@ where } // send response body State::SendPayload { ref mut body } => { - if this.inner.io().is_closed() { + if this.inner.io.is_closed() { *this.st = State::Stop; } else { if let Poll::Ready(Err(err)) = this.inner.poll_request_payload(cx) { @@ -363,7 +362,7 @@ where this.inner.flags.insert(Flags::SENDPAYLOAD_AND_STOP); } loop { - let _ = ready!(this.inner.io().poll_flush(cx, false)); + let _ = ready!(this.inner.io.poll_flush(cx, false)); let item = ready!(body.poll_next_chunk(cx)); if let Some(st) = this.inner.send_payload(item) { *this.st = st; @@ -376,7 +375,7 @@ where State::Upgrade(ref mut req) => { log::trace!("switching to upgrade service"); - let io = this.inner.io.take().unwrap(); + let io = this.inner.io.take(); let req = req.take().unwrap(); // Handle UPGRADE request @@ -391,9 +390,7 @@ where State::Stop => { this.inner.unregister_keepalive(); - return if let Err(e) = - ready!(this.inner.io.as_ref().unwrap().poll_shutdown(cx)) - { + return if let Err(e) = ready!(this.inner.io.poll_shutdown(cx)) { // get io error if let Some(e) = this.inner.error.take() { Poll::Ready(Err(e)) @@ -416,10 +413,6 @@ where S::Response: Into>, B: MessageBody, { - fn io(&self) -> &Io { - self.io.as_ref().unwrap() - } - fn switch_to_read_request(&mut self) -> State { // connection is not keep-alive, disconnect if !self.flags.contains(Flags::KEEPALIVE) || !self.codec.keepalive_enabled() { @@ -432,7 +425,7 @@ where fn unregister_keepalive(&mut self) { if self.flags.contains(Flags::KEEPALIVE) { - self.io().remove_keepalive_timer(); + self.io.remove_keepalive_timer(); self.flags.remove(Flags::KEEPALIVE); } } @@ -468,7 +461,7 @@ where State::Stop } else { let result = self - .io() + .io .encode(Message::Item((msg, body.size())), &self.codec) .map_err(|err| { if let Some(mut payload) = self.payload.take() { @@ -505,7 +498,7 @@ where match item { Some(Ok(item)) => { trace!("got response chunk: {:?}", item.len()); - match self.io().encode(Message::Chunk(Some(item)), &self.codec) { + match self.io.encode(Message::Chunk(Some(item)), &self.codec) { Ok(_) => None, Err(err) => { self.error = Some(DispatchError::Encode(err)); @@ -515,7 +508,7 @@ where } None => { trace!("response payload eof"); - if let Err(err) = self.io().encode(Message::Chunk(None), &self.codec) { + if let Err(err) = self.io.encode(Message::Chunk(None), &self.codec) { self.error = Some(DispatchError::Encode(err)); Some(State::Stop) } else if self.flags.contains(Flags::SENDPAYLOAD_AND_STOP) { @@ -547,7 +540,7 @@ where }; match payload.1.poll_data_required(cx) { PayloadStatus::Read => { - let io = self.io.as_ref().unwrap(); + let io = &self.io; // read request payload let mut updated = false;