Replace mio with polling for accept loop (#71)

* replace mio with poller for accept loop
This commit is contained in:
Nikolay Kim 2021-12-18 00:49:27 +06:00 committed by GitHub
parent 7825e0a15f
commit aa5f6e4b55
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 291 additions and 298 deletions

View file

@ -15,9 +15,9 @@ edition = "2018"
bitflags = "1.3"
bytes = "1.0.0"
serde = "1.0.0"
futures-core = { version = "0.3.17", default-features = false, features = ["alloc"] }
futures-core = { version = "0.3", default-features = false, features = ["alloc"] }
[dev-dependencies]
serde_test = "1.0"
serde_json = "1.0"
ntex = "0.4.10"
ntex = "0.5.0-b.0"

View file

@ -894,6 +894,7 @@ impl<'a> ReadRef<'a> {
self.0.flags.set(flags);
self.0.read_task.wake();
} else if flags.contains(Flags::RD_READY) {
log::trace!("waking up io read task");
flags.remove(Flags::RD_READY);
self.0.flags.set(flags);
self.0.read_task.wake();

View file

@ -17,4 +17,4 @@ proc-macro2 = "^1"
[dev-dependencies]
ntex = "0.5.0-b.0"
futures = "0.3.13"
futures = "0.3"

View file

@ -1,3 +1,4 @@
#![allow(dead_code)]
use std::{cell::Cell, rc::Rc, task};
use ntex_util::task::LocalWaker;

View file

@ -1,7 +1,6 @@
//! An implementations of SSL streams for ntex ecosystem
use std::sync::atomic::{AtomicUsize, Ordering};
mod counter;
pub mod types;
#[cfg(feature = "openssl")]
@ -10,6 +9,8 @@ pub mod openssl;
#[cfg(feature = "rustls")]
pub mod rustls;
mod counter;
/// Sets the maximum per-worker concurrent ssl connection establish process.
///
/// All listeners will stop accepting connections when this limit is

View file

@ -20,12 +20,12 @@ bitflags = "1.2"
log = "0.4"
slab = "0.4"
futures-timer = "3.0.2"
futures-core = { version = "0.3.17", default-features = false, features = ["alloc"] }
futures-sink = { version = "0.3.17", default-features = false, features = ["alloc"] }
futures-core = { version = "0.3", default-features = false, features = ["alloc"] }
futures-sink = { version = "0.3", default-features = false, features = ["alloc"] }
pin-project-lite = "0.2.6"
[dev-dependencies]
ntex = "0.4.10"
ntex = "0.5.0-b.0"
ntex-rt = "0.3.2"
ntex-macros = "0.1.3"
futures-util = { version = "0.3.17", default-features = false, features = ["alloc"] }
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }

View file

@ -6,6 +6,8 @@
* Move ntex::time to ntex-util crate
* Replace mio with poller for accept loop
## [0.4.13] - 2021-12-07
* server: Rename .apply/.apply_async to .on_worker_start()

View file

@ -57,12 +57,12 @@ base64 = "0.13"
bitflags = "1.3"
derive_more = "0.99.14"
fxhash = "0.2.1"
futures-core = { version = "0.3.17", default-features = false, features = ["alloc"] }
futures-sink = { version = "0.3.17", default-features = false, features = ["alloc"] }
futures-core = { version = "0.3", default-features = false, features = ["alloc"] }
futures-sink = { version = "0.3", default-features = false, features = ["alloc"] }
log = "0.4"
mio = "0.7.11"
num_cpus = "1.13"
nanorand = { version = "0.6.1", default-features = false, features = ["std", "wyrand"] }
polling = "2.2.0"
pin-project-lite = "0.2"
regex = { version = "1.5.4", default-features = false, features = ["std"] }
sha-1 = "0.9"

View file

@ -7,7 +7,7 @@ use ntex::{server::Server, time::Seconds, util::Ready};
#[ntex::main]
async fn main() -> io::Result<()> {
env::set_var("RUST_LOG", "hello_world=info");
env::set_var("RUST_LOG", "ntex=trace,hello_world=info");
env_logger::init();
Server::build()

View file

@ -259,6 +259,8 @@ where
}
}
State::ReadRequest => {
log::trace!("trying to read http message");
// stop dispatcher
if this.inner.state.is_dispatcher_stopped() {
log::trace!("dispatcher is instructed to stop");
@ -358,7 +360,7 @@ where
}
}
Ok(None) => {
log::trace!("not enough data to decode next frame, register dispatch task");
log::trace!("not enough data to decode http message");
// if io error occured or connection is not keep-alive
// then disconnect

View file

@ -245,18 +245,18 @@ pub fn server<F: StreamServiceFactory>(factory: F) -> TestServer {
.set_alpn_protos(b"\x02h2\x08http/1.1")
.map_err(|e| log::error!("Cannot set alpn protocol: {:?}", e));
Connector::default()
.timeout(Millis(5_000))
.timeout(Millis(30_000))
.openssl(builder.build())
.finish()
}
#[cfg(not(feature = "openssl"))]
{
Connector::default().timeout(Millis(5_000)).finish()
Connector::default().timeout(Millis(30_000)).finish()
}
};
Client::build()
.timeout(Seconds(5))
.timeout(Seconds(30))
.connector(connector)
.finish()
};

View file

@ -1,9 +1,8 @@
use std::{
io, sync::mpsc as sync_mpsc, sync::Arc, thread, time::Duration, time::Instant,
cell::Cell, io, sync::mpsc, sync::Arc, thread, time::Duration, time::Instant,
};
use log::{error, info};
use slab::Slab;
use polling::{Event, Poller};
use crate::rt::System;
use crate::time::{sleep, Millis};
@ -12,16 +11,14 @@ use super::socket::{Listener, SocketAddr};
use super::worker::{Connection, WorkerClient};
use super::{Server, ServerStatus, Token};
const DELTA: usize = 100;
const NOTIFY: mio::Token = mio::Token(0);
const ERR_TIMEOUT: Duration = Duration::from_millis(500);
const ERR_SLEEP_TIMEOUT: Millis = Millis(525);
#[derive(Debug)]
pub(super) enum Command {
Stop,
Pause,
Resume,
Stop,
Worker(WorkerClient),
Timer,
WorkerAvailable,
@ -31,43 +28,41 @@ struct ServerSocketInfo {
addr: SocketAddr,
token: Token,
sock: Listener,
timeout: Option<Instant>,
registered: Cell<bool>,
timeout: Cell<Option<Instant>>,
}
#[derive(Debug, Clone)]
pub(super) struct AcceptNotify(Arc<mio::Waker>, sync_mpsc::Sender<Command>);
pub(super) struct AcceptNotify(Arc<Poller>, mpsc::Sender<Command>);
impl AcceptNotify {
pub(super) fn new(waker: Arc<mio::Waker>, tx: sync_mpsc::Sender<Command>) -> Self {
pub(super) fn new(waker: Arc<Poller>, tx: mpsc::Sender<Command>) -> Self {
AcceptNotify(waker, tx)
}
pub(super) fn send(&self, cmd: Command) {
let _ = self.1.send(cmd);
let _ = self.0.wake();
let _ = self.0.notify();
}
}
pub(super) struct AcceptLoop {
notify: AcceptNotify,
inner: Option<(sync_mpsc::Receiver<Command>, mio::Poll, Server)>,
inner: Option<(mpsc::Receiver<Command>, Arc<Poller>, Server)>,
status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
}
impl AcceptLoop {
pub(super) fn new(srv: Server) -> AcceptLoop {
// Create a poll instance
let poll = mio::Poll::new()
.map_err(|e| panic!("Cannot create mio::Poll {}", e))
.unwrap();
let (tx, rx) = sync_mpsc::channel();
let waker = Arc::new(
mio::Waker::new(poll.registry(), NOTIFY)
.map_err(|e| panic!("Cannot create mio::Waker {}", e))
// Create a poller instance
let poll = Arc::new(
Poller::new()
.map_err(|e| panic!("Cannot create Polller {}", e))
.unwrap(),
);
let notify = AcceptNotify::new(waker, tx);
let (tx, rx) = mpsc::channel();
let notify = AcceptNotify::new(poll.clone(), tx);
AcceptLoop {
notify,
@ -115,9 +110,9 @@ impl AcceptLoop {
}
struct Accept {
poll: mio::Poll,
rx: sync_mpsc::Receiver<Command>,
sockets: Slab<ServerSocketInfo>,
poller: Arc<Poller>,
rx: mpsc::Receiver<Command>,
sockets: Vec<ServerSocketInfo>,
workers: Vec<WorkerClient>,
srv: Server,
notify: AcceptNotify,
@ -126,23 +121,10 @@ struct Accept {
status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
}
/// This function defines errors that are per-connection. Which basically
/// means that if we get this error from `accept()` system call it means
/// next connection might be ready to be accepted.
///
/// All other errors will incur a timeout before next `accept()` is performed.
/// The timeout is useful to handle resource exhaustion errors like ENFILE
/// and EMFILE. Otherwise, could enter into tight loop.
fn connection_error(e: &io::Error) -> bool {
e.kind() == io::ErrorKind::ConnectionRefused
|| e.kind() == io::ErrorKind::ConnectionAborted
|| e.kind() == io::ErrorKind::ConnectionReset
}
impl Accept {
fn start(
rx: sync_mpsc::Receiver<Command>,
poll: mio::Poll,
rx: mpsc::Receiver<Command>,
poller: Arc<Poller>,
socks: Vec<(Token, Listener)>,
srv: Server,
workers: Vec<WorkerClient>,
@ -156,45 +138,33 @@ impl Accept {
.name("ntex-server accept loop".to_owned())
.spawn(move || {
System::set_current(sys);
Accept::new(rx, poll, socks, workers, srv, notify, status_handler).poll()
Accept::new(rx, poller, socks, workers, srv, notify, status_handler)
.poll()
});
}
fn new(
rx: sync_mpsc::Receiver<Command>,
poll: mio::Poll,
rx: mpsc::Receiver<Command>,
poller: Arc<Poller>,
socks: Vec<(Token, Listener)>,
workers: Vec<WorkerClient>,
srv: Server,
notify: AcceptNotify,
status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
) -> Accept {
// Start accept
let mut sockets = Slab::new();
for (hnd_token, mut lst) in socks.into_iter() {
let addr = lst.local_addr();
let entry = sockets.vacant_entry();
let token = entry.key();
// Start listening for incoming connections
if let Err(err) = poll.registry().register(
&mut lst,
mio::Token(token + DELTA),
mio::Interest::READABLE,
) {
panic!("Cannot register io: {}", err);
}
entry.insert(ServerSocketInfo {
addr,
let mut sockets = Vec::new();
for (hnd_token, lst) in socks.into_iter() {
sockets.push(ServerSocketInfo {
addr: lst.local_addr(),
sock: lst,
token: hnd_token,
timeout: None,
registered: Cell::new(false),
timeout: Cell::new(None),
});
}
Accept {
poll,
poller,
rx,
sockets,
workers,
@ -213,62 +183,105 @@ impl Accept {
}
fn poll(&mut self) {
trace!("Starting server accept loop");
log::trace!("Starting server accept loop");
// Add all sources
for (idx, info) in self.sockets.iter().enumerate() {
log::info!("Starting socket listener on {}", info.addr);
self.add_source(idx);
}
// Create storage for events
let mut events = mio::Events::with_capacity(128);
let mut events = Vec::with_capacity(128);
loop {
if let Err(e) = self.poll.poll(&mut events, None) {
match e.kind() {
std::io::ErrorKind::Interrupted => {
continue;
}
_ => {
panic!("Poll error: {}", e);
}
if let Err(e) = self.poller.wait(&mut events, None) {
if e.kind() == io::ErrorKind::Interrupted {
continue;
} else {
panic!("Cannot wait for events in poller: {}", e)
}
}
for event in events.iter() {
let token = event.token();
match token {
NOTIFY => {
if !self.process_cmd() {
return;
}
}
_ => {
let token = usize::from(token);
if token < DELTA {
continue;
}
self.accept(token - DELTA);
}
let readd = self.accept(event.key);
if readd {
self.add_source(event.key);
}
}
if !self.process_cmd() {
break;
}
events.clear();
}
// cleanup
for info in &self.sockets {
info.sock.remove_source()
}
}
fn add_source(&self, idx: usize) {
let info = &self.sockets[idx];
loop {
// try to register poller source
let result = if info.registered.get() {
self.poller.modify(&info.sock, Event::readable(idx))
} else {
self.poller.add(&info.sock, Event::readable(idx))
};
if let Err(err) = result {
if err.kind() == io::ErrorKind::WouldBlock {
continue;
}
log::error!("Cannot register socket listener: {}", err);
// sleep after error
info.timeout.set(Some(Instant::now() + ERR_TIMEOUT));
let notify = self.notify.clone();
System::current().arbiter().spawn(Box::pin(async move {
sleep(ERR_SLEEP_TIMEOUT).await;
notify.send(Command::Timer);
}));
} else {
info.registered.set(true);
}
break;
}
}
fn remove_source(&self, key: usize) {
let info = &self.sockets[key];
let result = if info.registered.get() {
self.poller.modify(&info.sock, Event::none(key))
} else {
return;
};
// stop listening for incoming connections
if let Err(err) = result {
log::error!("Cannot stop socket listener for {} err: {}", info.addr, err);
}
}
fn process_timer(&mut self) {
let now = Instant::now();
for (token, info) in self.sockets.iter_mut() {
if let Some(inst) = info.timeout.take() {
if now > inst {
if !self.backpressure {
if let Err(err) = self.poll.registry().register(
&mut info.sock,
mio::Token(token + DELTA),
mio::Interest::READABLE,
) {
error!("Cannot register server socket {}", err);
} else {
info!("Resume accepting connections on {}", info.addr);
}
}
} else {
info.timeout = Some(inst);
break;
for key in 0..self.sockets.len() {
let info = &mut self.sockets[key];
if let Some(inst) = info.timeout.get() {
if now > inst && !self.backpressure {
log::info!(
"Resuming socket listener on {} after timeout",
info.addr
);
info.timeout.take();
self.add_source(key);
}
}
}
@ -278,44 +291,33 @@ impl Accept {
loop {
match self.rx.try_recv() {
Ok(cmd) => match cmd {
Command::Pause => {
for (_, info) in self.sockets.iter_mut() {
if let Err(err) =
self.poll.registry().deregister(&mut info.sock)
{
error!("Cannot deregister server socket {}", err);
} else {
info!("Paused accepting connections on {}", info.addr);
}
}
self.update_status(ServerStatus::NotReady);
}
Command::Resume => {
for (token, info) in self.sockets.iter_mut() {
if let Err(err) = self.poll.registry().register(
&mut info.sock,
mio::Token(token + DELTA),
mio::Interest::READABLE,
) {
error!("Cannot resume socket accept process: {}", err);
} else {
info!(
"Accepting connections on {} has been resumed",
info.addr
);
}
}
self.update_status(ServerStatus::Ready);
}
Command::Stop => {
for (_, info) in self.sockets.iter_mut() {
trace!("Stopping socket listener: {}", info.addr);
let _ = self.poll.registry().deregister(&mut info.sock);
log::trace!("Stopping accept loop");
for (key, info) in self.sockets.iter().enumerate() {
log::info!("Stopping socket listener on {}", info.addr);
self.remove_source(key);
}
self.update_status(ServerStatus::NotReady);
return false;
}
Command::Pause => {
log::trace!("Pausing accept loop");
for (key, info) in self.sockets.iter().enumerate() {
log::info!("Stopping socket listener on {}", info.addr);
self.remove_source(key);
}
self.update_status(ServerStatus::NotReady);
}
Command::Resume => {
log::trace!("Resuming accept loop");
for (key, info) in self.sockets.iter().enumerate() {
log::info!("Resuming socket listener on {}", info.addr);
self.add_source(key);
}
self.update_status(ServerStatus::Ready);
}
Command::Worker(worker) => {
log::trace!("Adding new worker to accept loop");
self.backpressure(false);
self.workers.push(worker);
}
@ -323,14 +325,16 @@ impl Accept {
self.process_timer();
}
Command::WorkerAvailable => {
log::trace!("Worker is available");
self.backpressure(false);
}
},
Err(err) => match err {
sync_mpsc::TryRecvError::Empty => break,
sync_mpsc::TryRecvError::Disconnected => {
for (_, info) in self.sockets.iter_mut() {
let _ = self.poll.registry().deregister(&mut info.sock);
mpsc::TryRecvError::Empty => break,
mpsc::TryRecvError::Disconnected => {
for (key, info) in self.sockets.iter().enumerate() {
log::info!("Stopping socket listener on {}", info.addr);
self.remove_source(key);
}
return false;
}
@ -350,36 +354,32 @@ impl Accept {
if self.backpressure {
if !on {
self.backpressure = false;
for (token, info) in self.sockets.iter_mut() {
if info.timeout.is_some() {
// socket will re-register itself after timeout
continue;
}
if let Err(err) = self.poll.registry().register(
&mut info.sock,
mio::Token(token + DELTA),
mio::Interest::READABLE,
) {
error!("Cannot resume socket accept process: {}", err);
} else {
info!("Accepting connections on {} has been resumed", info.addr);
for (key, info) in self.sockets.iter().enumerate() {
if info.timeout.get().is_none() {
// socket with timeout will re-register itself after timeout
log::info!(
"Resuming socket listener on {} after back-pressure",
info.addr
);
self.add_source(key);
}
}
}
} else if on {
self.backpressure = true;
for (_, info) in self.sockets.iter_mut() {
for key in 0..self.sockets.len() {
// disable err timeout
let info = &mut self.sockets[key];
if info.timeout.take().is_none() {
trace!("Enabling backpressure for {}", info.addr);
let _ = self.poll.registry().deregister(&mut info.sock);
log::trace!("Enabling back-pressure for {}", info.addr);
self.remove_source(key);
}
}
}
}
fn accept_one(&mut self, mut msg: Connection) {
trace!("Accepting connection: {:?}", msg.io);
log::trace!("Accepting connection: {:?}", msg.io);
if self.backpressure {
while !self.workers.is_empty() {
@ -391,7 +391,7 @@ impl Accept {
msg = tmp;
self.workers.swap_remove(self.next);
if self.workers.is_empty() {
error!("No workers");
log::error!("No workers");
return;
} else if self.workers.len() <= self.next {
self.next = 0;
@ -413,13 +413,13 @@ impl Accept {
return;
}
Err(tmp) => {
trace!("Worker failed while processing connection");
log::trace!("Worker failed while processing connection");
self.update_status(ServerStatus::WorkerFailed);
self.srv.worker_faulted(self.workers[self.next].idx);
msg = tmp;
self.workers.swap_remove(self.next);
if self.workers.is_empty() {
error!("No workers");
log::error!("No workers");
self.backpressure(true);
return;
} else if self.workers.len() <= self.next {
@ -432,13 +432,13 @@ impl Accept {
self.next = (self.next + 1) % self.workers.len();
}
// enable backpressure
trace!("No available workers, enable back-pressure");
log::trace!("No available workers, enable back-pressure");
self.backpressure(true);
self.accept_one(msg);
}
}
fn accept(&mut self, token: usize) {
fn accept(&mut self, token: usize) -> bool {
loop {
let msg = if let Some(info) = self.sockets.get_mut(token) {
match info.sock.accept() {
@ -446,32 +446,41 @@ impl Accept {
io,
token: info.token,
},
Ok(None) => return,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
Ok(None) => return true,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return true,
Err(ref e) if connection_error(e) => continue,
Err(e) => {
error!("Error accepting connection: {}", e);
if let Err(err) = self.poll.registry().deregister(&mut info.sock)
{
error!("Cannot deregister server socket {}", err);
}
log::error!("Error accepting socket: {}", e);
// sleep after error
info.timeout = Some(Instant::now() + ERR_TIMEOUT);
info.timeout.set(Some(Instant::now() + ERR_TIMEOUT));
let notify = self.notify.clone();
System::current().arbiter().spawn(Box::pin(async move {
sleep(ERR_SLEEP_TIMEOUT).await;
notify.send(Command::Timer);
}));
return;
return false;
}
}
} else {
return;
return false;
};
self.accept_one(msg);
}
}
}
/// This function defines errors that are per-connection. Which basically
/// means that if we get this error from `accept()` system call it means
/// next connection might be ready to be accepted.
///
/// All other errors will incur a timeout before next `accept()` is performed.
/// The timeout is useful to handle resource exhaustion errors like ENFILE
/// and EMFILE. Otherwise, could enter into tight loop.
fn connection_error(e: &io::Error) -> bool {
e.kind() == io::ErrorKind::ConnectionRefused
|| e.kind() == io::ErrorKind::ConnectionAborted
|| e.kind() == io::ErrorKind::ConnectionReset
}

View file

@ -1,38 +1,11 @@
use std::{convert::TryFrom, fmt, io, net};
use crate::io::Io;
use crate::rt::net::TcpStream;
use crate::{io::Io, rt::net::TcpStream};
pub(crate) enum Listener {
Tcp(mio::net::TcpListener),
Tcp(net::TcpListener),
#[cfg(unix)]
Uds(mio::net::UnixListener),
}
pub(crate) enum SocketAddr {
Tcp(net::SocketAddr),
#[cfg(unix)]
Uds(mio::net::SocketAddr),
}
impl fmt::Display for SocketAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
SocketAddr::Tcp(ref addr) => write!(f, "{}", addr),
#[cfg(unix)]
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
}
}
}
impl fmt::Debug for SocketAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr),
#[cfg(unix)]
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
}
}
Uds(std::os::unix::net::UnixListener),
}
impl fmt::Debug for Listener {
@ -57,16 +30,42 @@ impl fmt::Display for Listener {
}
}
pub(crate) enum SocketAddr {
Tcp(net::SocketAddr),
#[cfg(unix)]
Uds(std::os::unix::net::SocketAddr),
}
impl fmt::Display for SocketAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
SocketAddr::Tcp(ref addr) => write!(f, "{}", addr),
#[cfg(unix)]
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
}
}
}
impl fmt::Debug for SocketAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr),
#[cfg(unix)]
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
}
}
}
impl Listener {
pub(super) fn from_tcp(lst: net::TcpListener) -> Self {
let _ = lst.set_nonblocking(true);
Listener::Tcp(mio::net::TcpListener::from_std(lst))
Listener::Tcp(lst)
}
#[cfg(unix)]
pub(super) fn from_uds(lst: std::os::unix::net::UnixListener) -> Self {
let _ = lst.set_nonblocking(true);
Listener::Uds(mio::net::UnixListener::from_std(lst))
Listener::Uds(lst)
}
pub(crate) fn local_addr(&self) -> SocketAddr {
@ -88,52 +87,47 @@ impl Listener {
}
}
}
}
impl mio::event::Source for Listener {
#[inline]
fn register(
&mut self,
poll: &mio::Registry,
token: mio::Token,
interest: mio::Interest,
) -> io::Result<()> {
pub(crate) fn remove_source(&self) {
match *self {
Listener::Tcp(ref mut lst) => lst.register(poll, token, interest),
Listener::Tcp(_) => (),
#[cfg(unix)]
Listener::Uds(ref mut lst) => lst.register(poll, token, interest),
}
}
#[inline]
fn reregister(
&mut self,
poll: &mio::Registry,
token: mio::Token,
interest: mio::Interest,
) -> io::Result<()> {
match *self {
Listener::Tcp(ref mut lst) => lst.reregister(poll, token, interest),
#[cfg(unix)]
Listener::Uds(ref mut lst) => lst.reregister(poll, token, interest),
}
}
#[inline]
fn deregister(&mut self, poll: &mio::Registry) -> io::Result<()> {
match *self {
Listener::Tcp(ref mut lst) => lst.deregister(poll),
#[cfg(unix)]
Listener::Uds(ref mut lst) => {
let res = lst.deregister(poll);
Listener::Uds(ref lst) => {
// cleanup file path
if let Ok(addr) = lst.local_addr() {
if let Some(path) = addr.as_pathname() {
let _ = std::fs::remove_file(path);
}
}
res
}
}
}
}
#[cfg(unix)]
mod listener_impl {
use super::*;
use std::os::unix::io::{AsRawFd, RawFd};
impl AsRawFd for Listener {
fn as_raw_fd(&self) -> RawFd {
match *self {
Listener::Tcp(ref lst) => lst.as_raw_fd(),
Listener::Uds(ref lst) => lst.as_raw_fd(),
}
}
}
}
#[cfg(windows)]
mod listener_impl {
use super::*;
use std::os::windows::io::{AsRawSocket, RawSocket};
impl AsRawSocket for Listener {
fn as_raw_socket(&self) -> RawSocket {
match *self {
Listener::Tcp(ref lst) => lst.as_raw_socket(),
}
}
}
@ -141,42 +135,26 @@ impl mio::event::Source for Listener {
#[derive(Debug)]
pub enum Stream {
Tcp(mio::net::TcpStream),
Tcp(net::TcpStream),
#[cfg(unix)]
Uds(mio::net::UnixStream),
Uds(std::os::unix::net::UnixStream),
}
impl TryFrom<Stream> for Io {
type Error = io::Error;
fn try_from(sock: Stream) -> Result<Self, Self::Error> {
#[cfg(unix)]
match sock {
Stream::Tcp(stream) => {
use std::os::unix::io::{FromRawFd, IntoRawFd};
let fd = IntoRawFd::into_raw_fd(stream);
let io = TcpStream::from_std(unsafe { FromRawFd::from_raw_fd(fd) })?;
io.set_nodelay(true)?;
Ok(Io::new(io))
stream.set_nonblocking(true)?;
stream.set_nodelay(true)?;
Ok(Io::new(TcpStream::from_std(stream)?))
}
#[cfg(unix)]
Stream::Uds(stream) => {
use crate::rt::net::UnixStream;
use std::os::unix::io::{FromRawFd, IntoRawFd};
let fd = IntoRawFd::into_raw_fd(stream);
let io = UnixStream::from_std(unsafe { FromRawFd::from_raw_fd(fd) })?;
Ok(Io::new(io))
}
}
#[cfg(windows)]
match sock {
Stream::Tcp(stream) => {
use std::os::windows::io::{FromRawSocket, IntoRawSocket};
let fd = IntoRawSocket::into_raw_socket(stream);
let io =
TcpStream::from_std(unsafe { FromRawSocket::from_raw_socket(fd) })?;
io.set_nodelay(true)?;
Ok(Io::new(io))
stream.set_nonblocking(true)?;
Ok(Io::new(UnixStream::from_std(stream)?))
}
}
}
@ -198,8 +176,7 @@ mod tests {
let socket = Socket::new(Domain::IPV4, Type::STREAM, None).unwrap();
socket.set_reuse_address(true).unwrap();
socket.bind(&SockAddr::from(addr)).unwrap();
let tcp = net::TcpListener::from(socket);
let lst = Listener::Tcp(mio::net::TcpListener::from_std(tcp));
let lst = Listener::Tcp(net::TcpListener::from(socket));
assert!(format!("{:?}", lst).contains("TcpListener"));
assert!(format!("{}", lst).contains("127.0.0.1"));
}
@ -211,7 +188,6 @@ mod tests {
let _ = std::fs::remove_file("/tmp/sock.xxxxx");
if let Ok(lst) = UnixListener::bind("/tmp/sock.xxxxx") {
let lst = mio::net::UnixListener::from_std(lst);
let addr = lst.local_addr().expect("Couldn't get local address");
let a = SocketAddr::Uds(addr);
assert!(format!("{:?}", a).contains("/tmp/sock.xxxxx"));

View file

@ -573,8 +573,8 @@ mod tests {
let (_tx1, rx1) = unbounded();
let (tx2, rx2) = unbounded();
let (sync_tx, _sync_rx) = std::sync::mpsc::channel();
let poll = mio::Poll::new().unwrap();
let waker = Arc::new(mio::Waker::new(poll.registry(), mio::Token(1)).unwrap());
let poll = Arc::new(polling::Poller::new().unwrap());
let waker = poll.clone();
let avail =
WorkerAvailability::new(AcceptNotify::new(waker.clone(), sync_tx.clone()));

View file

@ -724,9 +724,9 @@ where
.map_err(|e| log::error!("Cannot set alpn protocol: {:?}", e));
Connector::default()
.lifetime(Seconds::ZERO)
.keep_alive(Seconds(10))
.timeout(Millis(10_000))
.disconnect_timeout(Millis(3_000))
.keep_alive(Seconds(30))
.timeout(Millis(30_000))
.disconnect_timeout(Millis(5_000))
.openssl(builder.build())
.finish()
}
@ -734,14 +734,14 @@ where
{
Connector::default()
.lifetime(Seconds::ZERO)
.timeout(Millis(10_000))
.timeout(Millis(30_000))
.finish()
}
};
Client::build()
.connector(connector)
.timeout(Seconds(10))
.timeout(Seconds(30))
.finish()
};

View file

@ -68,11 +68,16 @@ async fn test_connection_reuse_h2() {
.map_err(|e| log::error!("Cannot set alpn protocol: {:?}", e));
let client = Client::build()
.connector(Connector::default().openssl(builder.build()).finish())
.connector(
Connector::default()
.timeout(Seconds(30))
.openssl(builder.build())
.finish(),
)
.finish();
// req 1
let request = client.get(srv.surl("/")).timeout(Seconds(10)).send();
let request = client.get(srv.surl("/")).timeout(Seconds(30)).send();
let response = request.await.unwrap();
assert!(response.status().is_success());

View file

@ -4,6 +4,7 @@ use futures::future::{self, ready, FutureExt};
use futures::stream::{once, StreamExt};
use regex::Regex;
use ntex::http::header::{HeaderName, HeaderValue};
use ntex::http::test::server as test_server;
use ntex::http::{
body, header, HttpService, KeepAlive, Method, Request, Response, StatusCode,
@ -115,7 +116,7 @@ async fn test_chunked_payload() {
let returned_size = {
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
let _ = stream
.write_all(b"POST /test HTTP/1.1\r\nTransfer-Encoding: chunked\r\n\r\n");
.write_all(b"POST /test HTTP/1.0\r\nTransfer-Encoding: chunked\r\n\r\n");
for chunk_size in chunk_sizes.iter() {
let mut bytes = Vec::new();
@ -289,11 +290,6 @@ async fn test_http1_keepalive_disabled() {
#[ntex::test]
async fn test_content_length() {
use ntex::http::{
header::{HeaderName, HeaderValue},
StatusCode,
};
let srv = test_server(|| {
HttpService::build().h1(|req: Request| {
let indx: usize = req.uri().path()[1..].parse().unwrap();