Stop timer before handling UPGRADE h1 requests (#270)

This commit is contained in:
Nikolay Kim 2023-12-15 02:17:59 +06:00 committed by GitHub
parent fb1d2a268d
commit a4f9802d6d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 18 additions and 4 deletions

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.7.16] - 2023-12-15
* Stop timer before handling UPGRADE h1 requests
## [0.7.15] - 2023-12-14 ## [0.7.15] - 2023-12-14
* Better io tags handling * Better io tags handling

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex" name = "ntex"
version = "0.7.15" version = "0.7.16"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services" description = "Framework for composable network services"
readme = "README.md" readme = "README.md"

View file

@ -383,8 +383,9 @@ where
} }
// stop io tasks and call upgrade service // stop io tasks and call upgrade service
State::Upgrade(ref mut req) => { State::Upgrade(ref mut req) => {
let io = this.inner.io.take();
let req = req.take().unwrap(); let req = req.take().unwrap();
let io = this.inner.io.take();
io.stop_timer();
log::trace!( log::trace!(
"{}: Switching to upgrade service for {:?}", "{}: Switching to upgrade service for {:?}",
@ -485,6 +486,7 @@ where
fn service_upgrade(&mut self, mut req: Request) -> CallState<S, X> { fn service_upgrade(&mut self, mut req: Request) -> CallState<S, X> {
// Move io into request // Move io into request
let io: IoBoxed = self.io.take().into(); let io: IoBoxed = self.io.take().into();
self.io.stop_timer();
req.head_mut().io = CurrentIo::Io(Rc::new(( req.head_mut().io = CurrentIo::Io(Rc::new((
io.get_ref(), io.get_ref(),
RefCell::new(Some(Box::new((io, self.codec.clone())))), RefCell::new(Some(Box::new((io, self.codec.clone())))),
@ -551,7 +553,7 @@ where
if upgrade { if upgrade {
// Handle UPGRADE request // Handle UPGRADE request
log::trace!("{}: Prepare io for upgrade handler", self.io.tag(),); log::trace!("{}: Prepare io for upgrade handler", self.io.tag());
Poll::Ready(State::Upgrade(Some(req))) Poll::Ready(State::Upgrade(Some(req)))
} else { } else {
if req.upgrade() { if req.upgrade() {

View file

@ -5,6 +5,7 @@ use ntex::http::test::server as test_server;
use ntex::http::{body, h1, test, HttpService, Request, Response, StatusCode}; use ntex::http::{body, h1, test, HttpService, Request, Response, StatusCode};
use ntex::io::{DispatchItem, Dispatcher, Io}; use ntex::io::{DispatchItem, Dispatcher, Io};
use ntex::service::{fn_factory, Service, ServiceCtx}; use ntex::service::{fn_factory, Service, ServiceCtx};
use ntex::time::{sleep, Millis, Seconds};
use ntex::util::{BoxFuture, ByteString, Bytes, Ready}; use ntex::util::{BoxFuture, ByteString, Bytes, Ready};
use ntex::ws::{self, handshake, handshake_response}; use ntex::ws::{self, handshake, handshake_response};
@ -51,11 +52,15 @@ impl Service<(Request, Io, h1::Codec)> for WsService {
io.encode((res, body::BodySize::None).into(), &codec) io.encode((res, body::BodySize::None).into(), &codec)
.unwrap(); .unwrap();
let cfg = ntex_io::DispatcherConfig::default();
cfg.set_keepalive_timeout(Seconds(0));
Dispatcher::with_config( Dispatcher::with_config(
io.seal(), io.seal(),
ws::Codec::new(), ws::Codec::new(),
service, service,
&Default::default(), //&Default::default(),
&cfg,
) )
.await .await
.map_err(|_| panic!()) .map_err(|_| panic!())
@ -90,6 +95,9 @@ async fn test_simple() {
move || { move || {
let ws_service = ws_service.clone(); let ws_service = ws_service.clone();
HttpService::build() HttpService::build()
.keep_alive(1)
.headers_read_rate(Seconds(1), Seconds::ZERO, 16)
.payload_read_rate(Seconds(1), Seconds::ZERO, 16)
.upgrade(fn_factory(move || { .upgrade(fn_factory(move || {
Ready::Ok::<_, io::Error>(ws_service.clone()) Ready::Ok::<_, io::Error>(ws_service.clone())
})) }))