From aa5f6e4b559b3b2a6a9d1c1b4e7ccd704d6af19f Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 18 Dec 2021 00:49:27 +0600 Subject: [PATCH] Replace mio with polling for accept loop (#71) * replace mio with poller for accept loop --- ntex-bytes/Cargo.toml | 4 +- ntex-io/src/state.rs | 1 + ntex-macros/Cargo.toml | 2 +- ntex-tls/src/counter.rs | 1 + ntex-tls/src/lib.rs | 3 +- ntex-util/Cargo.toml | 8 +- ntex/CHANGES.md | 2 + ntex/Cargo.toml | 6 +- ntex/examples/hello-world.rs | 2 +- ntex/src/http/h1/dispatcher.rs | 4 +- ntex/src/http/test.rs | 6 +- ntex/src/server/accept.rs | 351 +++++++++++++------------- ntex/src/server/socket.rs | 168 ++++++------ ntex/src/server/worker.rs | 4 +- ntex/src/web/test.rs | 10 +- ntex/tests/http_awc_openssl_client.rs | 9 +- ntex/tests/http_server.rs | 8 +- 17 files changed, 291 insertions(+), 298 deletions(-) diff --git a/ntex-bytes/Cargo.toml b/ntex-bytes/Cargo.toml index ace9477f..59ba37e2 100644 --- a/ntex-bytes/Cargo.toml +++ b/ntex-bytes/Cargo.toml @@ -15,9 +15,9 @@ edition = "2018" bitflags = "1.3" bytes = "1.0.0" serde = "1.0.0" -futures-core = { version = "0.3.17", default-features = false, features = ["alloc"] } +futures-core = { version = "0.3", default-features = false, features = ["alloc"] } [dev-dependencies] serde_test = "1.0" serde_json = "1.0" -ntex = "0.4.10" +ntex = "0.5.0-b.0" diff --git a/ntex-io/src/state.rs b/ntex-io/src/state.rs index 9144a961..f731785f 100644 --- a/ntex-io/src/state.rs +++ b/ntex-io/src/state.rs @@ -894,6 +894,7 @@ impl<'a> ReadRef<'a> { self.0.flags.set(flags); self.0.read_task.wake(); } else if flags.contains(Flags::RD_READY) { + log::trace!("waking up io read task"); flags.remove(Flags::RD_READY); self.0.flags.set(flags); self.0.read_task.wake(); diff --git a/ntex-macros/Cargo.toml b/ntex-macros/Cargo.toml index 0670f86f..0c87c214 100644 --- a/ntex-macros/Cargo.toml +++ b/ntex-macros/Cargo.toml @@ -17,4 +17,4 @@ proc-macro2 = "^1" [dev-dependencies] ntex = "0.5.0-b.0" -futures = "0.3.13" +futures = "0.3" diff --git a/ntex-tls/src/counter.rs b/ntex-tls/src/counter.rs index 7dd4c48d..a82f2ff1 100644 --- a/ntex-tls/src/counter.rs +++ b/ntex-tls/src/counter.rs @@ -1,3 +1,4 @@ +#![allow(dead_code)] use std::{cell::Cell, rc::Rc, task}; use ntex_util::task::LocalWaker; diff --git a/ntex-tls/src/lib.rs b/ntex-tls/src/lib.rs index 5588f546..135b454a 100644 --- a/ntex-tls/src/lib.rs +++ b/ntex-tls/src/lib.rs @@ -1,7 +1,6 @@ //! An implementations of SSL streams for ntex ecosystem use std::sync::atomic::{AtomicUsize, Ordering}; -mod counter; pub mod types; #[cfg(feature = "openssl")] @@ -10,6 +9,8 @@ pub mod openssl; #[cfg(feature = "rustls")] pub mod rustls; +mod counter; + /// Sets the maximum per-worker concurrent ssl connection establish process. /// /// All listeners will stop accepting connections when this limit is diff --git a/ntex-util/Cargo.toml b/ntex-util/Cargo.toml index 9b10ecdd..ec251ab7 100644 --- a/ntex-util/Cargo.toml +++ b/ntex-util/Cargo.toml @@ -20,12 +20,12 @@ bitflags = "1.2" log = "0.4" slab = "0.4" futures-timer = "3.0.2" -futures-core = { version = "0.3.17", default-features = false, features = ["alloc"] } -futures-sink = { version = "0.3.17", default-features = false, features = ["alloc"] } +futures-core = { version = "0.3", default-features = false, features = ["alloc"] } +futures-sink = { version = "0.3", default-features = false, features = ["alloc"] } pin-project-lite = "0.2.6" [dev-dependencies] -ntex = "0.4.10" +ntex = "0.5.0-b.0" ntex-rt = "0.3.2" ntex-macros = "0.1.3" -futures-util = { version = "0.3.17", default-features = false, features = ["alloc"] } +futures-util = { version = "0.3", default-features = false, features = ["alloc"] } diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index a3464bbc..a293cf50 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -6,6 +6,8 @@ * Move ntex::time to ntex-util crate +* Replace mio with poller for accept loop + ## [0.4.13] - 2021-12-07 * server: Rename .apply/.apply_async to .on_worker_start() diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 2ce09fc6..17dabe74 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -57,12 +57,12 @@ base64 = "0.13" bitflags = "1.3" derive_more = "0.99.14" fxhash = "0.2.1" -futures-core = { version = "0.3.17", default-features = false, features = ["alloc"] } -futures-sink = { version = "0.3.17", default-features = false, features = ["alloc"] } +futures-core = { version = "0.3", default-features = false, features = ["alloc"] } +futures-sink = { version = "0.3", default-features = false, features = ["alloc"] } log = "0.4" -mio = "0.7.11" num_cpus = "1.13" nanorand = { version = "0.6.1", default-features = false, features = ["std", "wyrand"] } +polling = "2.2.0" pin-project-lite = "0.2" regex = { version = "1.5.4", default-features = false, features = ["std"] } sha-1 = "0.9" diff --git a/ntex/examples/hello-world.rs b/ntex/examples/hello-world.rs index b8361dac..89f727e2 100644 --- a/ntex/examples/hello-world.rs +++ b/ntex/examples/hello-world.rs @@ -7,7 +7,7 @@ use ntex::{server::Server, time::Seconds, util::Ready}; #[ntex::main] async fn main() -> io::Result<()> { - env::set_var("RUST_LOG", "hello_world=info"); + env::set_var("RUST_LOG", "ntex=trace,hello_world=info"); env_logger::init(); Server::build() diff --git a/ntex/src/http/h1/dispatcher.rs b/ntex/src/http/h1/dispatcher.rs index 9a88229a..80763ee3 100644 --- a/ntex/src/http/h1/dispatcher.rs +++ b/ntex/src/http/h1/dispatcher.rs @@ -259,6 +259,8 @@ where } } State::ReadRequest => { + log::trace!("trying to read http message"); + // stop dispatcher if this.inner.state.is_dispatcher_stopped() { log::trace!("dispatcher is instructed to stop"); @@ -358,7 +360,7 @@ where } } Ok(None) => { - log::trace!("not enough data to decode next frame, register dispatch task"); + log::trace!("not enough data to decode http message"); // if io error occured or connection is not keep-alive // then disconnect diff --git a/ntex/src/http/test.rs b/ntex/src/http/test.rs index 6dbd1cdb..e70917af 100644 --- a/ntex/src/http/test.rs +++ b/ntex/src/http/test.rs @@ -245,18 +245,18 @@ pub fn server(factory: F) -> TestServer { .set_alpn_protos(b"\x02h2\x08http/1.1") .map_err(|e| log::error!("Cannot set alpn protocol: {:?}", e)); Connector::default() - .timeout(Millis(5_000)) + .timeout(Millis(30_000)) .openssl(builder.build()) .finish() } #[cfg(not(feature = "openssl"))] { - Connector::default().timeout(Millis(5_000)).finish() + Connector::default().timeout(Millis(30_000)).finish() } }; Client::build() - .timeout(Seconds(5)) + .timeout(Seconds(30)) .connector(connector) .finish() }; diff --git a/ntex/src/server/accept.rs b/ntex/src/server/accept.rs index 72bd05e2..4b3c5c3e 100644 --- a/ntex/src/server/accept.rs +++ b/ntex/src/server/accept.rs @@ -1,9 +1,8 @@ use std::{ - io, sync::mpsc as sync_mpsc, sync::Arc, thread, time::Duration, time::Instant, + cell::Cell, io, sync::mpsc, sync::Arc, thread, time::Duration, time::Instant, }; -use log::{error, info}; -use slab::Slab; +use polling::{Event, Poller}; use crate::rt::System; use crate::time::{sleep, Millis}; @@ -12,16 +11,14 @@ use super::socket::{Listener, SocketAddr}; use super::worker::{Connection, WorkerClient}; use super::{Server, ServerStatus, Token}; -const DELTA: usize = 100; -const NOTIFY: mio::Token = mio::Token(0); const ERR_TIMEOUT: Duration = Duration::from_millis(500); const ERR_SLEEP_TIMEOUT: Millis = Millis(525); #[derive(Debug)] pub(super) enum Command { + Stop, Pause, Resume, - Stop, Worker(WorkerClient), Timer, WorkerAvailable, @@ -31,43 +28,41 @@ struct ServerSocketInfo { addr: SocketAddr, token: Token, sock: Listener, - timeout: Option, + registered: Cell, + timeout: Cell>, } #[derive(Debug, Clone)] -pub(super) struct AcceptNotify(Arc, sync_mpsc::Sender); +pub(super) struct AcceptNotify(Arc, mpsc::Sender); impl AcceptNotify { - pub(super) fn new(waker: Arc, tx: sync_mpsc::Sender) -> Self { + pub(super) fn new(waker: Arc, tx: mpsc::Sender) -> Self { AcceptNotify(waker, tx) } pub(super) fn send(&self, cmd: Command) { let _ = self.1.send(cmd); - let _ = self.0.wake(); + let _ = self.0.notify(); } } pub(super) struct AcceptLoop { notify: AcceptNotify, - inner: Option<(sync_mpsc::Receiver, mio::Poll, Server)>, + inner: Option<(mpsc::Receiver, Arc, Server)>, status_handler: Option>, } impl AcceptLoop { pub(super) fn new(srv: Server) -> AcceptLoop { - // Create a poll instance - let poll = mio::Poll::new() - .map_err(|e| panic!("Cannot create mio::Poll {}", e)) - .unwrap(); - - let (tx, rx) = sync_mpsc::channel(); - let waker = Arc::new( - mio::Waker::new(poll.registry(), NOTIFY) - .map_err(|e| panic!("Cannot create mio::Waker {}", e)) + // Create a poller instance + let poll = Arc::new( + Poller::new() + .map_err(|e| panic!("Cannot create Polller {}", e)) .unwrap(), ); - let notify = AcceptNotify::new(waker, tx); + + let (tx, rx) = mpsc::channel(); + let notify = AcceptNotify::new(poll.clone(), tx); AcceptLoop { notify, @@ -115,9 +110,9 @@ impl AcceptLoop { } struct Accept { - poll: mio::Poll, - rx: sync_mpsc::Receiver, - sockets: Slab, + poller: Arc, + rx: mpsc::Receiver, + sockets: Vec, workers: Vec, srv: Server, notify: AcceptNotify, @@ -126,23 +121,10 @@ struct Accept { status_handler: Option>, } -/// This function defines errors that are per-connection. Which basically -/// means that if we get this error from `accept()` system call it means -/// next connection might be ready to be accepted. -/// -/// All other errors will incur a timeout before next `accept()` is performed. -/// The timeout is useful to handle resource exhaustion errors like ENFILE -/// and EMFILE. Otherwise, could enter into tight loop. -fn connection_error(e: &io::Error) -> bool { - e.kind() == io::ErrorKind::ConnectionRefused - || e.kind() == io::ErrorKind::ConnectionAborted - || e.kind() == io::ErrorKind::ConnectionReset -} - impl Accept { fn start( - rx: sync_mpsc::Receiver, - poll: mio::Poll, + rx: mpsc::Receiver, + poller: Arc, socks: Vec<(Token, Listener)>, srv: Server, workers: Vec, @@ -156,45 +138,33 @@ impl Accept { .name("ntex-server accept loop".to_owned()) .spawn(move || { System::set_current(sys); - Accept::new(rx, poll, socks, workers, srv, notify, status_handler).poll() + Accept::new(rx, poller, socks, workers, srv, notify, status_handler) + .poll() }); } fn new( - rx: sync_mpsc::Receiver, - poll: mio::Poll, + rx: mpsc::Receiver, + poller: Arc, socks: Vec<(Token, Listener)>, workers: Vec, srv: Server, notify: AcceptNotify, status_handler: Option>, ) -> Accept { - // Start accept - let mut sockets = Slab::new(); - for (hnd_token, mut lst) in socks.into_iter() { - let addr = lst.local_addr(); - let entry = sockets.vacant_entry(); - let token = entry.key(); - - // Start listening for incoming connections - if let Err(err) = poll.registry().register( - &mut lst, - mio::Token(token + DELTA), - mio::Interest::READABLE, - ) { - panic!("Cannot register io: {}", err); - } - - entry.insert(ServerSocketInfo { - addr, + let mut sockets = Vec::new(); + for (hnd_token, lst) in socks.into_iter() { + sockets.push(ServerSocketInfo { + addr: lst.local_addr(), sock: lst, token: hnd_token, - timeout: None, + registered: Cell::new(false), + timeout: Cell::new(None), }); } Accept { - poll, + poller, rx, sockets, workers, @@ -213,62 +183,105 @@ impl Accept { } fn poll(&mut self) { - trace!("Starting server accept loop"); + log::trace!("Starting server accept loop"); + + // Add all sources + for (idx, info) in self.sockets.iter().enumerate() { + log::info!("Starting socket listener on {}", info.addr); + self.add_source(idx); + } // Create storage for events - let mut events = mio::Events::with_capacity(128); + let mut events = Vec::with_capacity(128); loop { - if let Err(e) = self.poll.poll(&mut events, None) { - match e.kind() { - std::io::ErrorKind::Interrupted => { - continue; - } - _ => { - panic!("Poll error: {}", e); - } + if let Err(e) = self.poller.wait(&mut events, None) { + if e.kind() == io::ErrorKind::Interrupted { + continue; + } else { + panic!("Cannot wait for events in poller: {}", e) } } for event in events.iter() { - let token = event.token(); - match token { - NOTIFY => { - if !self.process_cmd() { - return; - } - } - _ => { - let token = usize::from(token); - if token < DELTA { - continue; - } - self.accept(token - DELTA); - } + let readd = self.accept(event.key); + if readd { + self.add_source(event.key); } } + + if !self.process_cmd() { + break; + } + + events.clear(); + } + + // cleanup + for info in &self.sockets { + info.sock.remove_source() + } + } + + fn add_source(&self, idx: usize) { + let info = &self.sockets[idx]; + + loop { + // try to register poller source + let result = if info.registered.get() { + self.poller.modify(&info.sock, Event::readable(idx)) + } else { + self.poller.add(&info.sock, Event::readable(idx)) + }; + if let Err(err) = result { + if err.kind() == io::ErrorKind::WouldBlock { + continue; + } + log::error!("Cannot register socket listener: {}", err); + + // sleep after error + info.timeout.set(Some(Instant::now() + ERR_TIMEOUT)); + + let notify = self.notify.clone(); + System::current().arbiter().spawn(Box::pin(async move { + sleep(ERR_SLEEP_TIMEOUT).await; + notify.send(Command::Timer); + })); + } else { + info.registered.set(true); + } + + break; + } + } + + fn remove_source(&self, key: usize) { + let info = &self.sockets[key]; + + let result = if info.registered.get() { + self.poller.modify(&info.sock, Event::none(key)) + } else { + return; + }; + + // stop listening for incoming connections + if let Err(err) = result { + log::error!("Cannot stop socket listener for {} err: {}", info.addr, err); } } fn process_timer(&mut self) { let now = Instant::now(); - for (token, info) in self.sockets.iter_mut() { - if let Some(inst) = info.timeout.take() { - if now > inst { - if !self.backpressure { - if let Err(err) = self.poll.registry().register( - &mut info.sock, - mio::Token(token + DELTA), - mio::Interest::READABLE, - ) { - error!("Cannot register server socket {}", err); - } else { - info!("Resume accepting connections on {}", info.addr); - } - } - } else { - info.timeout = Some(inst); - break; + for key in 0..self.sockets.len() { + let info = &mut self.sockets[key]; + if let Some(inst) = info.timeout.get() { + if now > inst && !self.backpressure { + log::info!( + "Resuming socket listener on {} after timeout", + info.addr + ); + info.timeout.take(); + self.add_source(key); } } } @@ -278,44 +291,33 @@ impl Accept { loop { match self.rx.try_recv() { Ok(cmd) => match cmd { - Command::Pause => { - for (_, info) in self.sockets.iter_mut() { - if let Err(err) = - self.poll.registry().deregister(&mut info.sock) - { - error!("Cannot deregister server socket {}", err); - } else { - info!("Paused accepting connections on {}", info.addr); - } - } - self.update_status(ServerStatus::NotReady); - } - Command::Resume => { - for (token, info) in self.sockets.iter_mut() { - if let Err(err) = self.poll.registry().register( - &mut info.sock, - mio::Token(token + DELTA), - mio::Interest::READABLE, - ) { - error!("Cannot resume socket accept process: {}", err); - } else { - info!( - "Accepting connections on {} has been resumed", - info.addr - ); - } - } - self.update_status(ServerStatus::Ready); - } Command::Stop => { - for (_, info) in self.sockets.iter_mut() { - trace!("Stopping socket listener: {}", info.addr); - let _ = self.poll.registry().deregister(&mut info.sock); + log::trace!("Stopping accept loop"); + for (key, info) in self.sockets.iter().enumerate() { + log::info!("Stopping socket listener on {}", info.addr); + self.remove_source(key); } self.update_status(ServerStatus::NotReady); return false; } + Command::Pause => { + log::trace!("Pausing accept loop"); + for (key, info) in self.sockets.iter().enumerate() { + log::info!("Stopping socket listener on {}", info.addr); + self.remove_source(key); + } + self.update_status(ServerStatus::NotReady); + } + Command::Resume => { + log::trace!("Resuming accept loop"); + for (key, info) in self.sockets.iter().enumerate() { + log::info!("Resuming socket listener on {}", info.addr); + self.add_source(key); + } + self.update_status(ServerStatus::Ready); + } Command::Worker(worker) => { + log::trace!("Adding new worker to accept loop"); self.backpressure(false); self.workers.push(worker); } @@ -323,14 +325,16 @@ impl Accept { self.process_timer(); } Command::WorkerAvailable => { + log::trace!("Worker is available"); self.backpressure(false); } }, Err(err) => match err { - sync_mpsc::TryRecvError::Empty => break, - sync_mpsc::TryRecvError::Disconnected => { - for (_, info) in self.sockets.iter_mut() { - let _ = self.poll.registry().deregister(&mut info.sock); + mpsc::TryRecvError::Empty => break, + mpsc::TryRecvError::Disconnected => { + for (key, info) in self.sockets.iter().enumerate() { + log::info!("Stopping socket listener on {}", info.addr); + self.remove_source(key); } return false; } @@ -350,36 +354,32 @@ impl Accept { if self.backpressure { if !on { self.backpressure = false; - for (token, info) in self.sockets.iter_mut() { - if info.timeout.is_some() { - // socket will re-register itself after timeout - continue; - } - if let Err(err) = self.poll.registry().register( - &mut info.sock, - mio::Token(token + DELTA), - mio::Interest::READABLE, - ) { - error!("Cannot resume socket accept process: {}", err); - } else { - info!("Accepting connections on {} has been resumed", info.addr); + for (key, info) in self.sockets.iter().enumerate() { + if info.timeout.get().is_none() { + // socket with timeout will re-register itself after timeout + log::info!( + "Resuming socket listener on {} after back-pressure", + info.addr + ); + self.add_source(key); } } } } else if on { self.backpressure = true; - for (_, info) in self.sockets.iter_mut() { + for key in 0..self.sockets.len() { // disable err timeout + let info = &mut self.sockets[key]; if info.timeout.take().is_none() { - trace!("Enabling backpressure for {}", info.addr); - let _ = self.poll.registry().deregister(&mut info.sock); + log::trace!("Enabling back-pressure for {}", info.addr); + self.remove_source(key); } } } } fn accept_one(&mut self, mut msg: Connection) { - trace!("Accepting connection: {:?}", msg.io); + log::trace!("Accepting connection: {:?}", msg.io); if self.backpressure { while !self.workers.is_empty() { @@ -391,7 +391,7 @@ impl Accept { msg = tmp; self.workers.swap_remove(self.next); if self.workers.is_empty() { - error!("No workers"); + log::error!("No workers"); return; } else if self.workers.len() <= self.next { self.next = 0; @@ -413,13 +413,13 @@ impl Accept { return; } Err(tmp) => { - trace!("Worker failed while processing connection"); + log::trace!("Worker failed while processing connection"); self.update_status(ServerStatus::WorkerFailed); self.srv.worker_faulted(self.workers[self.next].idx); msg = tmp; self.workers.swap_remove(self.next); if self.workers.is_empty() { - error!("No workers"); + log::error!("No workers"); self.backpressure(true); return; } else if self.workers.len() <= self.next { @@ -432,13 +432,13 @@ impl Accept { self.next = (self.next + 1) % self.workers.len(); } // enable backpressure - trace!("No available workers, enable back-pressure"); + log::trace!("No available workers, enable back-pressure"); self.backpressure(true); self.accept_one(msg); } } - fn accept(&mut self, token: usize) { + fn accept(&mut self, token: usize) -> bool { loop { let msg = if let Some(info) = self.sockets.get_mut(token) { match info.sock.accept() { @@ -446,32 +446,41 @@ impl Accept { io, token: info.token, }, - Ok(None) => return, - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, + Ok(None) => return true, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return true, Err(ref e) if connection_error(e) => continue, Err(e) => { - error!("Error accepting connection: {}", e); - if let Err(err) = self.poll.registry().deregister(&mut info.sock) - { - error!("Cannot deregister server socket {}", err); - } + log::error!("Error accepting socket: {}", e); // sleep after error - info.timeout = Some(Instant::now() + ERR_TIMEOUT); + info.timeout.set(Some(Instant::now() + ERR_TIMEOUT)); let notify = self.notify.clone(); System::current().arbiter().spawn(Box::pin(async move { sleep(ERR_SLEEP_TIMEOUT).await; notify.send(Command::Timer); })); - return; + return false; } } } else { - return; + return false; }; self.accept_one(msg); } } } + +/// This function defines errors that are per-connection. Which basically +/// means that if we get this error from `accept()` system call it means +/// next connection might be ready to be accepted. +/// +/// All other errors will incur a timeout before next `accept()` is performed. +/// The timeout is useful to handle resource exhaustion errors like ENFILE +/// and EMFILE. Otherwise, could enter into tight loop. +fn connection_error(e: &io::Error) -> bool { + e.kind() == io::ErrorKind::ConnectionRefused + || e.kind() == io::ErrorKind::ConnectionAborted + || e.kind() == io::ErrorKind::ConnectionReset +} diff --git a/ntex/src/server/socket.rs b/ntex/src/server/socket.rs index 32d68f89..e34250f1 100644 --- a/ntex/src/server/socket.rs +++ b/ntex/src/server/socket.rs @@ -1,38 +1,11 @@ use std::{convert::TryFrom, fmt, io, net}; -use crate::io::Io; -use crate::rt::net::TcpStream; +use crate::{io::Io, rt::net::TcpStream}; pub(crate) enum Listener { - Tcp(mio::net::TcpListener), + Tcp(net::TcpListener), #[cfg(unix)] - Uds(mio::net::UnixListener), -} - -pub(crate) enum SocketAddr { - Tcp(net::SocketAddr), - #[cfg(unix)] - Uds(mio::net::SocketAddr), -} - -impl fmt::Display for SocketAddr { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - SocketAddr::Tcp(ref addr) => write!(f, "{}", addr), - #[cfg(unix)] - SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), - } - } -} - -impl fmt::Debug for SocketAddr { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr), - #[cfg(unix)] - SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), - } - } + Uds(std::os::unix::net::UnixListener), } impl fmt::Debug for Listener { @@ -57,16 +30,42 @@ impl fmt::Display for Listener { } } +pub(crate) enum SocketAddr { + Tcp(net::SocketAddr), + #[cfg(unix)] + Uds(std::os::unix::net::SocketAddr), +} + +impl fmt::Display for SocketAddr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + SocketAddr::Tcp(ref addr) => write!(f, "{}", addr), + #[cfg(unix)] + SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), + } + } +} + +impl fmt::Debug for SocketAddr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr), + #[cfg(unix)] + SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), + } + } +} + impl Listener { pub(super) fn from_tcp(lst: net::TcpListener) -> Self { let _ = lst.set_nonblocking(true); - Listener::Tcp(mio::net::TcpListener::from_std(lst)) + Listener::Tcp(lst) } #[cfg(unix)] pub(super) fn from_uds(lst: std::os::unix::net::UnixListener) -> Self { let _ = lst.set_nonblocking(true); - Listener::Uds(mio::net::UnixListener::from_std(lst)) + Listener::Uds(lst) } pub(crate) fn local_addr(&self) -> SocketAddr { @@ -88,52 +87,47 @@ impl Listener { } } } -} -impl mio::event::Source for Listener { - #[inline] - fn register( - &mut self, - poll: &mio::Registry, - token: mio::Token, - interest: mio::Interest, - ) -> io::Result<()> { + pub(crate) fn remove_source(&self) { match *self { - Listener::Tcp(ref mut lst) => lst.register(poll, token, interest), + Listener::Tcp(_) => (), #[cfg(unix)] - Listener::Uds(ref mut lst) => lst.register(poll, token, interest), - } - } - - #[inline] - fn reregister( - &mut self, - poll: &mio::Registry, - token: mio::Token, - interest: mio::Interest, - ) -> io::Result<()> { - match *self { - Listener::Tcp(ref mut lst) => lst.reregister(poll, token, interest), - #[cfg(unix)] - Listener::Uds(ref mut lst) => lst.reregister(poll, token, interest), - } - } - - #[inline] - fn deregister(&mut self, poll: &mio::Registry) -> io::Result<()> { - match *self { - Listener::Tcp(ref mut lst) => lst.deregister(poll), - #[cfg(unix)] - Listener::Uds(ref mut lst) => { - let res = lst.deregister(poll); - + Listener::Uds(ref lst) => { // cleanup file path if let Ok(addr) = lst.local_addr() { if let Some(path) = addr.as_pathname() { let _ = std::fs::remove_file(path); } } - res + } + } + } +} + +#[cfg(unix)] +mod listener_impl { + use super::*; + use std::os::unix::io::{AsRawFd, RawFd}; + + impl AsRawFd for Listener { + fn as_raw_fd(&self) -> RawFd { + match *self { + Listener::Tcp(ref lst) => lst.as_raw_fd(), + Listener::Uds(ref lst) => lst.as_raw_fd(), + } + } + } +} + +#[cfg(windows)] +mod listener_impl { + use super::*; + use std::os::windows::io::{AsRawSocket, RawSocket}; + + impl AsRawSocket for Listener { + fn as_raw_socket(&self) -> RawSocket { + match *self { + Listener::Tcp(ref lst) => lst.as_raw_socket(), } } } @@ -141,42 +135,26 @@ impl mio::event::Source for Listener { #[derive(Debug)] pub enum Stream { - Tcp(mio::net::TcpStream), + Tcp(net::TcpStream), #[cfg(unix)] - Uds(mio::net::UnixStream), + Uds(std::os::unix::net::UnixStream), } impl TryFrom for Io { type Error = io::Error; fn try_from(sock: Stream) -> Result { - #[cfg(unix)] match sock { Stream::Tcp(stream) => { - use std::os::unix::io::{FromRawFd, IntoRawFd}; - let fd = IntoRawFd::into_raw_fd(stream); - let io = TcpStream::from_std(unsafe { FromRawFd::from_raw_fd(fd) })?; - io.set_nodelay(true)?; - Ok(Io::new(io)) + stream.set_nonblocking(true)?; + stream.set_nodelay(true)?; + Ok(Io::new(TcpStream::from_std(stream)?)) } + #[cfg(unix)] Stream::Uds(stream) => { use crate::rt::net::UnixStream; - use std::os::unix::io::{FromRawFd, IntoRawFd}; - let fd = IntoRawFd::into_raw_fd(stream); - let io = UnixStream::from_std(unsafe { FromRawFd::from_raw_fd(fd) })?; - Ok(Io::new(io)) - } - } - - #[cfg(windows)] - match sock { - Stream::Tcp(stream) => { - use std::os::windows::io::{FromRawSocket, IntoRawSocket}; - let fd = IntoRawSocket::into_raw_socket(stream); - let io = - TcpStream::from_std(unsafe { FromRawSocket::from_raw_socket(fd) })?; - io.set_nodelay(true)?; - Ok(Io::new(io)) + stream.set_nonblocking(true)?; + Ok(Io::new(UnixStream::from_std(stream)?)) } } } @@ -198,8 +176,7 @@ mod tests { let socket = Socket::new(Domain::IPV4, Type::STREAM, None).unwrap(); socket.set_reuse_address(true).unwrap(); socket.bind(&SockAddr::from(addr)).unwrap(); - let tcp = net::TcpListener::from(socket); - let lst = Listener::Tcp(mio::net::TcpListener::from_std(tcp)); + let lst = Listener::Tcp(net::TcpListener::from(socket)); assert!(format!("{:?}", lst).contains("TcpListener")); assert!(format!("{}", lst).contains("127.0.0.1")); } @@ -211,7 +188,6 @@ mod tests { let _ = std::fs::remove_file("/tmp/sock.xxxxx"); if let Ok(lst) = UnixListener::bind("/tmp/sock.xxxxx") { - let lst = mio::net::UnixListener::from_std(lst); let addr = lst.local_addr().expect("Couldn't get local address"); let a = SocketAddr::Uds(addr); assert!(format!("{:?}", a).contains("/tmp/sock.xxxxx")); diff --git a/ntex/src/server/worker.rs b/ntex/src/server/worker.rs index ded8293e..f5132e30 100644 --- a/ntex/src/server/worker.rs +++ b/ntex/src/server/worker.rs @@ -573,8 +573,8 @@ mod tests { let (_tx1, rx1) = unbounded(); let (tx2, rx2) = unbounded(); let (sync_tx, _sync_rx) = std::sync::mpsc::channel(); - let poll = mio::Poll::new().unwrap(); - let waker = Arc::new(mio::Waker::new(poll.registry(), mio::Token(1)).unwrap()); + let poll = Arc::new(polling::Poller::new().unwrap()); + let waker = poll.clone(); let avail = WorkerAvailability::new(AcceptNotify::new(waker.clone(), sync_tx.clone())); diff --git a/ntex/src/web/test.rs b/ntex/src/web/test.rs index 0689bcea..7e4b9850 100644 --- a/ntex/src/web/test.rs +++ b/ntex/src/web/test.rs @@ -724,9 +724,9 @@ where .map_err(|e| log::error!("Cannot set alpn protocol: {:?}", e)); Connector::default() .lifetime(Seconds::ZERO) - .keep_alive(Seconds(10)) - .timeout(Millis(10_000)) - .disconnect_timeout(Millis(3_000)) + .keep_alive(Seconds(30)) + .timeout(Millis(30_000)) + .disconnect_timeout(Millis(5_000)) .openssl(builder.build()) .finish() } @@ -734,14 +734,14 @@ where { Connector::default() .lifetime(Seconds::ZERO) - .timeout(Millis(10_000)) + .timeout(Millis(30_000)) .finish() } }; Client::build() .connector(connector) - .timeout(Seconds(10)) + .timeout(Seconds(30)) .finish() }; diff --git a/ntex/tests/http_awc_openssl_client.rs b/ntex/tests/http_awc_openssl_client.rs index c277226d..3ebddb9c 100644 --- a/ntex/tests/http_awc_openssl_client.rs +++ b/ntex/tests/http_awc_openssl_client.rs @@ -68,11 +68,16 @@ async fn test_connection_reuse_h2() { .map_err(|e| log::error!("Cannot set alpn protocol: {:?}", e)); let client = Client::build() - .connector(Connector::default().openssl(builder.build()).finish()) + .connector( + Connector::default() + .timeout(Seconds(30)) + .openssl(builder.build()) + .finish(), + ) .finish(); // req 1 - let request = client.get(srv.surl("/")).timeout(Seconds(10)).send(); + let request = client.get(srv.surl("/")).timeout(Seconds(30)).send(); let response = request.await.unwrap(); assert!(response.status().is_success()); diff --git a/ntex/tests/http_server.rs b/ntex/tests/http_server.rs index 4f79c1ac..a51a7e40 100644 --- a/ntex/tests/http_server.rs +++ b/ntex/tests/http_server.rs @@ -4,6 +4,7 @@ use futures::future::{self, ready, FutureExt}; use futures::stream::{once, StreamExt}; use regex::Regex; +use ntex::http::header::{HeaderName, HeaderValue}; use ntex::http::test::server as test_server; use ntex::http::{ body, header, HttpService, KeepAlive, Method, Request, Response, StatusCode, @@ -115,7 +116,7 @@ async fn test_chunked_payload() { let returned_size = { let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); let _ = stream - .write_all(b"POST /test HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n"); + .write_all(b"POST /test HTTP/1.0\r\nTransfer-Encoding: chunked\r\n\r\n"); for chunk_size in chunk_sizes.iter() { let mut bytes = Vec::new(); @@ -289,11 +290,6 @@ async fn test_http1_keepalive_disabled() { #[ntex::test] async fn test_content_length() { - use ntex::http::{ - header::{HeaderName, HeaderValue}, - StatusCode, - }; - let srv = test_server(|| { HttpService::build().h1(|req: Request| { let indx: usize = req.uri().path()[1..].parse().unwrap();