mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-03 21:07:39 +03:00
move server to ntex
This commit is contained in:
parent
84d55aa200
commit
4fb031c454
30 changed files with 169 additions and 746 deletions
|
@ -5,7 +5,6 @@ members = [
|
|||
"ntex-service",
|
||||
"ntex-web-macros",
|
||||
|
||||
"actix-net/actix-server",
|
||||
"actix-net/actix-service",
|
||||
"actix-net/router",
|
||||
]
|
||||
|
@ -15,7 +14,6 @@ ntex = { path = "ntex" }
|
|||
ntex-router = { path = "ntex-router" }
|
||||
ntex-service = { path = "ntex-service" }
|
||||
|
||||
actix-server = { path = "actix-net/actix-server" }
|
||||
actix-service = { path = "actix-net/actix-service" }
|
||||
actix-router = { path = "actix-net/router" }
|
||||
|
||||
|
@ -23,7 +21,5 @@ actix-codec = { path = "actix-net/actix-codec" }
|
|||
actix-connect = { path = "actix-net/actix-connect" }
|
||||
actix-rt = { path = "actix-net/actix-rt" }
|
||||
actix-macros = { path = "actix-net/actix-macros" }
|
||||
actix-testing = { path = "actix-net/actix-testing" }
|
||||
actix-threadpool = { path = "actix-net/actix-threadpool" }
|
||||
actix-tls = { path = "actix-net/actix-tls" }
|
||||
bytestring = { path = "actix-net/string" }
|
||||
|
|
|
@ -1,194 +0,0 @@
|
|||
# Changes
|
||||
|
||||
## [1.0.1] - 2019-12-29
|
||||
|
||||
### Changed
|
||||
|
||||
* Rename `.start()` method to `.run()`
|
||||
|
||||
## [1.0.0] - 2019-12-11
|
||||
|
||||
### Changed
|
||||
|
||||
* Use actix-net releases
|
||||
|
||||
|
||||
## [1.0.0-alpha.4] - 2019-12-08
|
||||
|
||||
### Changed
|
||||
|
||||
* Use actix-service 1.0.0-alpha.4
|
||||
|
||||
## [1.0.0-alpha.3] - 2019-12-07
|
||||
|
||||
### Changed
|
||||
|
||||
* Migrate to tokio 0.2
|
||||
|
||||
### Fixed
|
||||
|
||||
* Fix compilation on non-unix platforms
|
||||
|
||||
* Better handling server configuration
|
||||
|
||||
|
||||
## [1.0.0-alpha.2] - 2019-12-02
|
||||
|
||||
### Changed
|
||||
|
||||
* Simplify server service (remove actix-server-config)
|
||||
|
||||
* Allow to wait on `Server` until server stops
|
||||
|
||||
|
||||
## [0.8.0-alpha.1] - 2019-11-22
|
||||
|
||||
### Changed
|
||||
|
||||
* Migrate to `std::future`
|
||||
|
||||
|
||||
## [0.7.0] - 2019-10-04
|
||||
|
||||
### Changed
|
||||
|
||||
* Update `rustls` to 0.16
|
||||
* Minimum required Rust version upped to 1.37.0
|
||||
|
||||
|
||||
## [0.6.1] - 2019-09-25
|
||||
|
||||
### Added
|
||||
|
||||
* Add UDS listening support to `ServerBuilder`
|
||||
|
||||
|
||||
## [0.6.0] - 2019-07-18
|
||||
|
||||
### Added
|
||||
|
||||
* Support Unix domain sockets #3
|
||||
|
||||
|
||||
## [0.5.1] - 2019-05-18
|
||||
|
||||
### Changed
|
||||
|
||||
* ServerBuilder::shutdown_timeout() accepts u64
|
||||
|
||||
|
||||
## [0.5.0] - 2019-05-12
|
||||
|
||||
### Added
|
||||
|
||||
* Add `Debug` impl for `SslError`
|
||||
|
||||
* Derive debug for `Server` and `ServerCommand`
|
||||
|
||||
### Changed
|
||||
|
||||
* Upgrade to actix-service 0.4
|
||||
|
||||
|
||||
## [0.4.3] - 2019-04-16
|
||||
|
||||
### Added
|
||||
|
||||
* Re-export `IoStream` trait
|
||||
|
||||
### Changed
|
||||
|
||||
* Deppend on `ssl` and `rust-tls` features from actix-server-config
|
||||
|
||||
|
||||
## [0.4.2] - 2019-03-30
|
||||
|
||||
### Fixed
|
||||
|
||||
* Fix SIGINT force shutdown
|
||||
|
||||
|
||||
## [0.4.1] - 2019-03-14
|
||||
|
||||
### Added
|
||||
|
||||
* `SystemRuntime::on_start()` - allow to run future before server service initialization
|
||||
|
||||
|
||||
## [0.4.0] - 2019-03-12
|
||||
|
||||
### Changed
|
||||
|
||||
* Use `ServerConfig` for service factory
|
||||
|
||||
* Wrap tcp socket to `Io` type
|
||||
|
||||
* Upgrade actix-service
|
||||
|
||||
|
||||
## [0.3.1] - 2019-03-04
|
||||
|
||||
### Added
|
||||
|
||||
* Add `ServerBuilder::maxconnrate` sets the maximum per-worker number of concurrent connections
|
||||
|
||||
* Add helper ssl error `SslError`
|
||||
|
||||
|
||||
### Changed
|
||||
|
||||
* Rename `StreamServiceFactory` to `ServiceFactory`
|
||||
|
||||
* Deprecate `StreamServiceFactory`
|
||||
|
||||
|
||||
## [0.3.0] - 2019-03-02
|
||||
|
||||
### Changed
|
||||
|
||||
* Use new `NewService` trait
|
||||
|
||||
|
||||
## [0.2.1] - 2019-02-09
|
||||
|
||||
### Changed
|
||||
|
||||
* Drop service response
|
||||
|
||||
|
||||
## [0.2.0] - 2019-02-01
|
||||
|
||||
### Changed
|
||||
|
||||
* Migrate to actix-service 0.2
|
||||
|
||||
* Updated rustls dependency
|
||||
|
||||
|
||||
## [0.1.3] - 2018-12-21
|
||||
|
||||
### Fixed
|
||||
|
||||
* Fix max concurrent connections handling
|
||||
|
||||
|
||||
## [0.1.2] - 2018-12-12
|
||||
|
||||
### Changed
|
||||
|
||||
* rename ServiceConfig::rt() to ServiceConfig::apply()
|
||||
|
||||
|
||||
### Fixed
|
||||
|
||||
* Fix back-pressure for concurrent ssl handshakes
|
||||
|
||||
|
||||
## [0.1.1] - 2018-12-11
|
||||
|
||||
* Fix signal handling on windows
|
||||
|
||||
|
||||
## [0.1.0] - 2018-12-09
|
||||
|
||||
* Move server to separate crate
|
|
@ -1,41 +0,0 @@
|
|||
[package]
|
||||
name = "actix-server"
|
||||
version = "1.0.1"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix server - General purpose tcp server"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
homepage = "https://actix.rs"
|
||||
repository = "https://github.com/actix/actix-net.git"
|
||||
documentation = "https://docs.rs/actix-server/"
|
||||
categories = ["network-programming", "asynchronous"]
|
||||
license = "MIT/Apache-2.0"
|
||||
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
|
||||
edition = "2018"
|
||||
|
||||
[lib]
|
||||
name = "actix_server"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[features]
|
||||
default = []
|
||||
|
||||
[dependencies]
|
||||
actix-service = "1.0.1"
|
||||
actix-rt = "1.0.0"
|
||||
actix-codec = "0.2.0"
|
||||
actix-utils = "1.0.4"
|
||||
|
||||
log = "0.4"
|
||||
num_cpus = "1.11"
|
||||
mio = "0.6.19"
|
||||
net2 = "0.2"
|
||||
futures = "0.3.1"
|
||||
slab = "0.4"
|
||||
|
||||
# unix domain sockets
|
||||
mio-uds = { version = "0.6.7" }
|
||||
|
||||
[dev-dependencies]
|
||||
bytes = "0.5"
|
||||
env_logger = "0.7"
|
||||
actix-testing = "1.0.0"
|
|
@ -1 +0,0 @@
|
|||
../LICENSE-APACHE
|
|
@ -1 +0,0 @@
|
|||
../LICENSE-MIT
|
|
@ -1,37 +0,0 @@
|
|||
//! General purpose tcp server
|
||||
#![deny(rust_2018_idioms, warnings)]
|
||||
#![allow(clippy::type_complexity)]
|
||||
|
||||
mod accept;
|
||||
mod builder;
|
||||
mod config;
|
||||
mod server;
|
||||
mod service;
|
||||
mod signals;
|
||||
mod socket;
|
||||
mod worker;
|
||||
|
||||
pub use self::builder::ServerBuilder;
|
||||
pub use self::config::{ServiceConfig, ServiceRuntime};
|
||||
pub use self::server::Server;
|
||||
pub use self::service::ServiceFactory;
|
||||
|
||||
#[doc(hidden)]
|
||||
pub use self::socket::FromStream;
|
||||
|
||||
/// Socket id token
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
|
||||
pub(crate) struct Token(usize);
|
||||
|
||||
impl Token {
|
||||
pub(crate) fn next(&mut self) -> Token {
|
||||
let token = Token(self.0);
|
||||
self.0 += 1;
|
||||
token
|
||||
}
|
||||
}
|
||||
|
||||
/// Start server building process
|
||||
pub fn new() -> ServerBuilder {
|
||||
ServerBuilder::default()
|
||||
}
|
|
@ -1,27 +0,0 @@
|
|||
# Changes
|
||||
|
||||
## [1.0.0] - 2019-12-11
|
||||
|
||||
* Update actix-server to 1.0.0
|
||||
|
||||
## [1.0.0-alpha.3] - 2019-12-07
|
||||
|
||||
* Migrate to tokio 0.2
|
||||
|
||||
## [1.0.0-alpha.2] - 2019-12-02
|
||||
|
||||
* Re-export `test` attribute macros
|
||||
|
||||
|
||||
## [0.3.0-alpha.1] - 2019-11-22
|
||||
|
||||
* Migrate to std::future
|
||||
|
||||
## [0.2.0] - 2019-10-14
|
||||
|
||||
* Upgrade actix-server and actix-server-config deps
|
||||
|
||||
|
||||
## [0.1.0] - 2019-09-25
|
||||
|
||||
* Initial impl
|
|
@ -1,26 +0,0 @@
|
|||
[package]
|
||||
name = "actix-testing"
|
||||
version = "1.0.0"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix testing utils"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
homepage = "https://actix.rs"
|
||||
repository = "https://github.com/actix/actix-net.git"
|
||||
documentation = "https://docs.rs/actix-testing/"
|
||||
categories = ["network-programming", "asynchronous"]
|
||||
license = "MIT/Apache-2.0"
|
||||
edition = "2018"
|
||||
|
||||
[lib]
|
||||
name = "actix_testing"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
actix-rt = "1.0.0"
|
||||
actix-macros = "0.1.0"
|
||||
actix-server = "1.0.0"
|
||||
actix-service = "1.0.0"
|
||||
|
||||
log = "0.4"
|
||||
net2 = "0.2"
|
||||
futures = "0.3.1"
|
|
@ -1 +0,0 @@
|
|||
../LICENSE-APACHE
|
|
@ -1 +0,0 @@
|
|||
../LICENSE-MIT
|
|
@ -1,9 +0,0 @@
|
|||
# Actix test utilities [](https://crates.io/crates/actix-testint) [](https://gitter.im/actix/actix?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
|
||||
|
||||
## Documentation & community resources
|
||||
|
||||
* [User Guide](https://actix.rs/docs/)
|
||||
* [API Documentation](https://docs.rs/actix-testing/)
|
||||
* [Chat on gitter](https://gitter.im/actix/actix)
|
||||
* Cargo package: [actix-http-test](https://crates.io/crates/actix-testing)
|
||||
* Minimum supported Rust version: 1.37 or later
|
|
@ -1,152 +0,0 @@
|
|||
//! Various helpers for Actix applications to use during testing.
|
||||
#![deny(rust_2018_idioms, warnings)]
|
||||
#![allow(clippy::type_complexity)]
|
||||
|
||||
use std::sync::mpsc;
|
||||
use std::{net, thread};
|
||||
|
||||
use actix_rt::{net::TcpStream, System};
|
||||
use actix_server::{Server, ServerBuilder, ServiceFactory};
|
||||
use net2::TcpBuilder;
|
||||
|
||||
#[cfg(not(test))] // Work around for rust-lang/rust#62127
|
||||
pub use actix_macros::test;
|
||||
|
||||
/// The `TestServer` type.
|
||||
///
|
||||
/// `TestServer` is very simple test server that simplify process of writing
|
||||
/// integration tests for actix-net applications.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```rust
|
||||
/// use actix_service::fn_service;
|
||||
/// use actix_testing::TestServer;
|
||||
///
|
||||
/// #[actix_rt::main]
|
||||
/// async fn main() {
|
||||
/// let srv = TestServer::with(|| fn_service(
|
||||
/// |sock| async move {
|
||||
/// println!("New connection: {:?}", sock);
|
||||
/// Ok::<_, ()>(())
|
||||
/// }
|
||||
/// ));
|
||||
///
|
||||
/// println!("SOCKET: {:?}", srv.connect());
|
||||
/// }
|
||||
/// ```
|
||||
pub struct TestServer;
|
||||
|
||||
/// Test server runstime
|
||||
pub struct TestServerRuntime {
|
||||
addr: net::SocketAddr,
|
||||
host: String,
|
||||
port: u16,
|
||||
system: System,
|
||||
}
|
||||
|
||||
impl TestServer {
|
||||
/// Start new server with server builder
|
||||
pub fn start<F>(mut factory: F) -> TestServerRuntime
|
||||
where
|
||||
F: FnMut(ServerBuilder) -> ServerBuilder + Send + 'static,
|
||||
{
|
||||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
// run server in separate thread
|
||||
thread::spawn(move || {
|
||||
let sys = System::new("actix-test-server");
|
||||
factory(Server::build())
|
||||
.workers(1)
|
||||
.disable_signals()
|
||||
.start();
|
||||
|
||||
tx.send(System::current()).unwrap();
|
||||
sys.run()
|
||||
});
|
||||
let system = rx.recv().unwrap();
|
||||
|
||||
TestServerRuntime {
|
||||
system,
|
||||
addr: "127.0.0.1:0".parse().unwrap(),
|
||||
host: "127.0.0.1".to_string(),
|
||||
port: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Start new test server with application factory
|
||||
pub fn with<F: ServiceFactory<TcpStream>>(factory: F) -> TestServerRuntime {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
// run server in separate thread
|
||||
thread::spawn(move || {
|
||||
let sys = System::new("actix-test-server");
|
||||
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
|
||||
let local_addr = tcp.local_addr().unwrap();
|
||||
|
||||
Server::build()
|
||||
.listen("test", tcp, factory)?
|
||||
.workers(1)
|
||||
.disable_signals()
|
||||
.start();
|
||||
|
||||
tx.send((System::current(), local_addr)).unwrap();
|
||||
sys.run()
|
||||
});
|
||||
|
||||
let (system, addr) = rx.recv().unwrap();
|
||||
|
||||
let host = format!("{}", addr.ip());
|
||||
let port = addr.port();
|
||||
|
||||
TestServerRuntime {
|
||||
system,
|
||||
addr,
|
||||
host,
|
||||
port,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get firat available unused local address
|
||||
pub fn unused_addr() -> net::SocketAddr {
|
||||
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
|
||||
let socket = TcpBuilder::new_v4().unwrap();
|
||||
socket.bind(&addr).unwrap();
|
||||
socket.reuse_address(true).unwrap();
|
||||
let tcp = socket.to_tcp_listener().unwrap();
|
||||
tcp.local_addr().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl TestServerRuntime {
|
||||
/// Test server host
|
||||
pub fn host(&self) -> &str {
|
||||
&self.host
|
||||
}
|
||||
|
||||
/// Test server port
|
||||
pub fn port(&self) -> u16 {
|
||||
self.port
|
||||
}
|
||||
|
||||
/// Get test server address
|
||||
pub fn addr(&self) -> net::SocketAddr {
|
||||
self.addr
|
||||
}
|
||||
|
||||
/// Stop http server
|
||||
fn stop(&mut self) {
|
||||
self.system.stop();
|
||||
}
|
||||
|
||||
/// Connect to server, return tokio TcpStream
|
||||
pub fn connect(&self) -> std::io::Result<TcpStream> {
|
||||
TcpStream::from_std(net::TcpStream::connect(self.addr)?)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TestServerRuntime {
|
||||
fn drop(&mut self) {
|
||||
self.stop()
|
||||
}
|
||||
}
|
|
@ -1,35 +0,0 @@
|
|||
# Changes
|
||||
|
||||
## [0.3.1] - 2019-12-12
|
||||
|
||||
### Changed
|
||||
|
||||
* Use parking_lot 0.10
|
||||
|
||||
## [0.3.0] - 2019-12-02
|
||||
|
||||
### Changed
|
||||
|
||||
* Expect `Result` type as a function return type
|
||||
|
||||
## [0.2.0] - 2019-11-21
|
||||
|
||||
### Changed
|
||||
|
||||
* Migrate to `std::future`
|
||||
|
||||
## [0.1.2] - 2019-08-05
|
||||
|
||||
### Changed
|
||||
|
||||
* Update `derive_more` to 0.15
|
||||
|
||||
* Update `parking_lot` to 0.9
|
||||
|
||||
## [0.1.1] - 2019-06-05
|
||||
|
||||
* Update parking_lot
|
||||
|
||||
## [0.1.0] - 2019-03-28
|
||||
|
||||
* Move threadpool to separate crate
|
|
@ -1,26 +0,0 @@
|
|||
[package]
|
||||
name = "actix-threadpool"
|
||||
version = "0.3.1"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "Actix thread pool for sync code"
|
||||
keywords = ["actix", "network", "framework", "async", "futures"]
|
||||
homepage = "https://actix.rs"
|
||||
repository = "https://github.com/actix/actix-net.git"
|
||||
documentation = "https://docs.rs/actix-threadpool/"
|
||||
categories = ["network-programming", "asynchronous"]
|
||||
license = "MIT/Apache-2.0"
|
||||
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
|
||||
edition = "2018"
|
||||
|
||||
[lib]
|
||||
name = "actix_threadpool"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
derive_more = "0.99.2"
|
||||
futures-channel = "0.3.1"
|
||||
parking_lot = "0.10"
|
||||
lazy_static = "1.3"
|
||||
log = "0.4"
|
||||
num_cpus = "1.10"
|
||||
threadpool = "1.7"
|
|
@ -1 +0,0 @@
|
|||
../LICENSE-APACHE
|
|
@ -1 +0,0 @@
|
|||
../LICENSE-MIT
|
|
@ -1,90 +0,0 @@
|
|||
//! Thread pool for blocking operations
|
||||
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use derive_more::Display;
|
||||
use futures_channel::oneshot;
|
||||
use parking_lot::Mutex;
|
||||
use threadpool::ThreadPool;
|
||||
|
||||
/// Env variable for default cpu pool size.
|
||||
const ENV_CPU_POOL_VAR: &str = "ACTIX_THREADPOOL";
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub(crate) static ref DEFAULT_POOL: Mutex<ThreadPool> = {
|
||||
let num = std::env::var(ENV_CPU_POOL_VAR)
|
||||
.map_err(|_| ())
|
||||
.and_then(|val| {
|
||||
val.parse().map_err(|_| log::warn!(
|
||||
"Can not parse {} value, using default",
|
||||
ENV_CPU_POOL_VAR,
|
||||
))
|
||||
})
|
||||
.unwrap_or_else(|_| num_cpus::get() * 5);
|
||||
Mutex::new(
|
||||
threadpool::Builder::new()
|
||||
.thread_name("actix-web".to_owned())
|
||||
.num_threads(num)
|
||||
.build(),
|
||||
)
|
||||
};
|
||||
}
|
||||
|
||||
thread_local! {
|
||||
static POOL: ThreadPool = {
|
||||
DEFAULT_POOL.lock().clone()
|
||||
};
|
||||
}
|
||||
|
||||
/// Blocking operation execution error
|
||||
#[derive(Debug, Display)]
|
||||
pub enum BlockingError<E: fmt::Debug> {
|
||||
#[display(fmt = "{:?}", _0)]
|
||||
Error(E),
|
||||
#[display(fmt = "Thread pool is gone")]
|
||||
Canceled,
|
||||
}
|
||||
|
||||
/// Execute blocking function on a thread pool, returns future that resolves
|
||||
/// to result of the function execution.
|
||||
pub fn run<F, I, E>(f: F) -> CpuFuture<I, E>
|
||||
where
|
||||
F: FnOnce() -> Result<I, E> + Send + 'static,
|
||||
I: Send + 'static,
|
||||
E: Send + fmt::Debug + 'static,
|
||||
{
|
||||
let (tx, rx) = oneshot::channel();
|
||||
POOL.with(|pool| {
|
||||
pool.execute(move || {
|
||||
if !tx.is_canceled() {
|
||||
let _ = tx.send(f());
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
CpuFuture { rx }
|
||||
}
|
||||
|
||||
/// Blocking operation completion future. It resolves with results
|
||||
/// of blocking function execution.
|
||||
pub struct CpuFuture<I, E> {
|
||||
rx: oneshot::Receiver<Result<I, E>>,
|
||||
}
|
||||
|
||||
impl<I, E: fmt::Debug> Future for CpuFuture<I, E> {
|
||||
type Output = Result<I, BlockingError<E>>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let rx = Pin::new(&mut self.rx);
|
||||
let res = match rx.poll(cx) {
|
||||
Poll::Pending => return Poll::Pending,
|
||||
Poll::Ready(res) => res
|
||||
.map_err(|_| BlockingError::Canceled)
|
||||
.and_then(|res| res.map_err(BlockingError::Error)),
|
||||
};
|
||||
Poll::Ready(res)
|
||||
}
|
||||
}
|
|
@ -44,7 +44,6 @@ actix-codec = "0.2.0"
|
|||
actix-connect = "1.0.1"
|
||||
actix-macros = "0.1.0"
|
||||
actix-rt = "1.0.0"
|
||||
actix-server = "1.0.0"
|
||||
actix-threadpool = "0.3.1"
|
||||
actix-tls = { version = "1.0.0" }
|
||||
|
||||
|
@ -64,7 +63,9 @@ indexmap = "1.3"
|
|||
lazy_static = "1.4"
|
||||
log = "0.4"
|
||||
mime = "0.3"
|
||||
mio = "0.6.19"
|
||||
net2 = "0.2.33"
|
||||
num_cpus = "1.11"
|
||||
percent-encoding = "2.1"
|
||||
pin-project = "0.4.6"
|
||||
rand = "0.7"
|
||||
|
@ -80,6 +81,9 @@ coo-kie = { version = "0.13.3", package = "cookie", optional = true }
|
|||
open-ssl = { version="0.10", package = "openssl", optional = true }
|
||||
rust-tls = { version = "0.16.0", package = "rustls", optional = true }
|
||||
|
||||
# FIXME: Remove it and use mio own uds feature once mio 0.7 is released
|
||||
mio-uds = { version = "0.6.7" }
|
||||
|
||||
# compression
|
||||
brotli2 = { version="0.3.2", optional = true }
|
||||
flate2 = { version = "1.0.13", optional = true }
|
||||
|
@ -87,7 +91,6 @@ flate2 = { version = "1.0.13", optional = true }
|
|||
tokio = "0.2.4"
|
||||
|
||||
[dev-dependencies]
|
||||
actix-server = "1.0.0"
|
||||
actix-connect = { version = "1.0.0", features=["openssl"] }
|
||||
actix-tls = { version = "1.0.0", features=["openssl"] }
|
||||
futures = "0.3.1"
|
||||
|
|
|
@ -7,12 +7,12 @@ use actix_rt::System;
|
|||
use log::{error, info};
|
||||
use slab::Slab;
|
||||
|
||||
use crate::server::Server;
|
||||
use crate::socket::{SocketAddr, SocketListener, StdListener};
|
||||
use crate::worker::{Conn, WorkerClient};
|
||||
use crate::Token;
|
||||
use super::server::Server;
|
||||
use super::socket::{SocketAddr, SocketListener, StdListener};
|
||||
use super::worker::{Conn, WorkerClient};
|
||||
use super::Token;
|
||||
|
||||
pub(crate) enum Command {
|
||||
pub(super) enum Command {
|
||||
Pause,
|
||||
Resume,
|
||||
Stop,
|
||||
|
@ -27,14 +27,14 @@ struct ServerSocketInfo {
|
|||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct AcceptNotify(mio::SetReadiness);
|
||||
pub(super) struct AcceptNotify(mio::SetReadiness);
|
||||
|
||||
impl AcceptNotify {
|
||||
pub(crate) fn new(ready: mio::SetReadiness) -> Self {
|
||||
pub(super) fn new(ready: mio::SetReadiness) -> Self {
|
||||
AcceptNotify(ready)
|
||||
}
|
||||
|
||||
pub(crate) fn notify(&self) {
|
||||
pub(super) fn notify(&self) {
|
||||
let _ = self.0.set_readiness(mio::Ready::readable());
|
||||
}
|
||||
}
|
||||
|
@ -45,7 +45,7 @@ impl Default for AcceptNotify {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) struct AcceptLoop {
|
||||
pub(super) struct AcceptLoop {
|
||||
cmd_reg: Option<mio::Registration>,
|
||||
cmd_ready: mio::SetReadiness,
|
||||
notify_reg: Option<mio::Registration>,
|
||||
|
@ -56,7 +56,7 @@ pub(crate) struct AcceptLoop {
|
|||
}
|
||||
|
||||
impl AcceptLoop {
|
||||
pub fn new(srv: Server) -> AcceptLoop {
|
||||
pub(super) fn new(srv: Server) -> AcceptLoop {
|
||||
let (tx, rx) = sync_mpsc::channel();
|
||||
let (cmd_reg, cmd_ready) = mio::Registration::new2();
|
||||
let (notify_reg, notify_ready) = mio::Registration::new2();
|
||||
|
@ -72,16 +72,16 @@ impl AcceptLoop {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn send(&self, msg: Command) {
|
||||
pub(super) fn send(&self, msg: Command) {
|
||||
let _ = self.tx.send(msg);
|
||||
let _ = self.cmd_ready.set_readiness(mio::Ready::readable());
|
||||
}
|
||||
|
||||
pub fn get_notify(&self) -> AcceptNotify {
|
||||
pub(super) fn get_notify(&self) -> AcceptNotify {
|
||||
AcceptNotify::new(self.notify_ready.clone())
|
||||
}
|
||||
|
||||
pub(crate) fn start(
|
||||
pub(super) fn start(
|
||||
&mut self,
|
||||
socks: Vec<(Token, StdListener)>,
|
||||
workers: Vec<WorkerClient>,
|
||||
|
@ -130,7 +130,7 @@ fn connection_error(e: &io::Error) -> bool {
|
|||
|
||||
impl Accept {
|
||||
#![allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn start(
|
||||
fn start(
|
||||
rx: sync_mpsc::Receiver<Command>,
|
||||
cmd_reg: mio::Registration,
|
||||
notify_reg: mio::Registration,
|
||||
|
@ -440,7 +440,8 @@ impl Accept {
|
|||
|
||||
let r = self.timer.1.clone();
|
||||
System::current().arbiter().send(Box::pin(async move {
|
||||
delay_until(Instant::now() + Duration::from_millis(510)).await;
|
||||
delay_until(Instant::now() + Duration::from_millis(510))
|
||||
.await;
|
||||
let _ = r.set_readiness(mio::Ready::readable());
|
||||
}));
|
||||
return;
|
|
@ -15,14 +15,14 @@ use log::{error, info};
|
|||
use net2::TcpBuilder;
|
||||
use num_cpus;
|
||||
|
||||
use crate::accept::{AcceptLoop, AcceptNotify, Command};
|
||||
use crate::config::{ConfiguredService, ServiceConfig};
|
||||
use crate::server::{Server, ServerCommand};
|
||||
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
|
||||
use crate::signals::{Signal, Signals};
|
||||
use crate::socket::StdListener;
|
||||
use crate::worker::{self, Worker, WorkerAvailability, WorkerClient};
|
||||
use crate::Token;
|
||||
use super::accept::{AcceptLoop, AcceptNotify, Command};
|
||||
use super::config::{ConfiguredService, ServiceConfig};
|
||||
use super::server::{Server, ServerCommand};
|
||||
use super::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
|
||||
use super::signals::{Signal, Signals};
|
||||
use super::socket::StdListener;
|
||||
use super::worker::{self, Worker, WorkerAvailability, WorkerClient};
|
||||
use super::Token;
|
||||
|
||||
/// Server builder
|
||||
pub struct ServerBuilder {
|
||||
|
@ -157,7 +157,12 @@ impl ServerBuilder {
|
|||
}
|
||||
|
||||
/// Add new service to the server.
|
||||
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
|
||||
pub fn bind<F, U, N: AsRef<str>>(
|
||||
mut self,
|
||||
name: N,
|
||||
addr: U,
|
||||
factory: F,
|
||||
) -> io::Result<Self>
|
||||
where
|
||||
F: ServiceFactory<TcpStream>,
|
||||
U: net::ToSocketAddrs,
|
||||
|
@ -375,7 +380,8 @@ impl ServerBuilder {
|
|||
spawn(
|
||||
async {
|
||||
delay_until(
|
||||
Instant::now() + Duration::from_millis(300),
|
||||
Instant::now()
|
||||
+ Duration::from_millis(300),
|
||||
)
|
||||
.await;
|
||||
System::current().stop();
|
||||
|
@ -390,12 +396,11 @@ impl ServerBuilder {
|
|||
// we need to stop system if server was spawned
|
||||
if self.exit {
|
||||
spawn(
|
||||
delay_until(Instant::now() + Duration::from_millis(300)).then(
|
||||
|_| {
|
||||
delay_until(Instant::now() + Duration::from_millis(300))
|
||||
.then(|_| {
|
||||
System::current().stop();
|
||||
ready(())
|
||||
},
|
||||
),
|
||||
}),
|
||||
);
|
||||
}
|
||||
if let Some(tx) = completion {
|
||||
|
@ -485,7 +490,10 @@ pub(super) fn bind_addr<S: net::ToSocketAddrs>(
|
|||
}
|
||||
}
|
||||
|
||||
fn create_tcp_listener(addr: net::SocketAddr, backlog: i32) -> io::Result<net::TcpListener> {
|
||||
fn create_tcp_listener(
|
||||
addr: net::SocketAddr,
|
||||
backlog: i32,
|
||||
) -> io::Result<net::TcpListener> {
|
||||
let builder = match addr {
|
||||
net::SocketAddr::V4(_) => TcpBuilder::new_v4()?,
|
||||
net::SocketAddr::V6(_) => TcpBuilder::new_v6()?,
|
|
@ -2,11 +2,12 @@ use std::collections::HashMap;
|
|||
use std::{fmt, io, net};
|
||||
|
||||
use actix_rt::net::TcpStream;
|
||||
use actix_service as actix;
|
||||
use actix_utils::counter::CounterGuard;
|
||||
use futures::future::{ok, Future, FutureExt, LocalBoxFuture};
|
||||
use log::error;
|
||||
|
||||
use crate::service;
|
||||
use crate::util::counter::CounterGuard;
|
||||
|
||||
use super::builder::bind_addr;
|
||||
use super::service::{
|
||||
BoxedServerService, InternalServiceFactory, ServerMessage, StreamService,
|
||||
|
@ -14,10 +15,10 @@ use super::service::{
|
|||
use super::Token;
|
||||
|
||||
pub struct ServiceConfig {
|
||||
pub(crate) services: Vec<(String, net::TcpListener)>,
|
||||
pub(crate) apply: Option<Box<dyn ServiceRuntimeConfiguration>>,
|
||||
pub(crate) threads: usize,
|
||||
pub(crate) backlog: i32,
|
||||
pub(super) services: Vec<(String, net::TcpListener)>,
|
||||
pub(super) apply: Option<Box<dyn ServiceRuntimeConfiguration>>,
|
||||
pub(super) threads: usize,
|
||||
pub(super) backlog: i32,
|
||||
}
|
||||
|
||||
impl ServiceConfig {
|
||||
|
@ -53,7 +54,11 @@ impl ServiceConfig {
|
|||
}
|
||||
|
||||
/// Add new service to server
|
||||
pub fn listen<N: AsRef<str>>(&mut self, name: N, lst: net::TcpListener) -> &mut Self {
|
||||
pub fn listen<N: AsRef<str>>(
|
||||
&mut self,
|
||||
name: N,
|
||||
lst: net::TcpListener,
|
||||
) -> &mut Self {
|
||||
if self.apply.is_none() {
|
||||
self.apply = Some(Box::new(not_configured));
|
||||
}
|
||||
|
@ -110,7 +115,9 @@ impl InternalServiceFactory for ConfiguredService {
|
|||
})
|
||||
}
|
||||
|
||||
fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
|
||||
fn create(
|
||||
&self,
|
||||
) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
|
||||
// configure services
|
||||
let mut rt = ServiceRuntime::new(self.topics.clone());
|
||||
self.rt.configure(&mut rt);
|
||||
|
@ -142,7 +149,7 @@ impl InternalServiceFactory for ConfiguredService {
|
|||
let name = names.remove(&token).unwrap().0;
|
||||
res.push((
|
||||
token,
|
||||
Box::new(StreamService::new(actix::fn_service(
|
||||
Box::new(StreamService::new(service::fn_service(
|
||||
move |_: TcpStream| {
|
||||
error!("Service {:?} is not configured", name);
|
||||
ok::<_, ()>(())
|
||||
|
@ -209,8 +216,8 @@ impl ServiceRuntime {
|
|||
/// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods.
|
||||
pub fn service<T, F>(&mut self, name: &str, service: F)
|
||||
where
|
||||
F: actix::IntoServiceFactory<T>,
|
||||
T: actix::ServiceFactory<Config = (), Request = TcpStream> + 'static,
|
||||
F: service::IntoServiceFactory<T>,
|
||||
T: service::ServiceFactory<Config = (), Request = TcpStream> + 'static,
|
||||
T::Future: 'static,
|
||||
T::Service: 'static,
|
||||
T::InitError: fmt::Debug,
|
||||
|
@ -238,7 +245,7 @@ impl ServiceRuntime {
|
|||
}
|
||||
|
||||
type BoxedNewService = Box<
|
||||
dyn actix::ServiceFactory<
|
||||
dyn service::ServiceFactory<
|
||||
Request = (Option<CounterGuard>, ServerMessage),
|
||||
Response = (),
|
||||
Error = (),
|
||||
|
@ -253,9 +260,9 @@ struct ServiceFactory<T> {
|
|||
inner: T,
|
||||
}
|
||||
|
||||
impl<T> actix::ServiceFactory for ServiceFactory<T>
|
||||
impl<T> service::ServiceFactory for ServiceFactory<T>
|
||||
where
|
||||
T: actix::ServiceFactory<Config = (), Request = TcpStream>,
|
||||
T: service::ServiceFactory<Config = (), Request = TcpStream>,
|
||||
T::Future: 'static,
|
||||
T::Service: 'static,
|
||||
T::Error: 'static,
|
|
@ -1,6 +1,38 @@
|
|||
//! General purpose tcp server
|
||||
#![allow(clippy::type_complexity)]
|
||||
|
||||
mod accept;
|
||||
mod builder;
|
||||
mod config;
|
||||
mod server;
|
||||
mod service;
|
||||
mod signals;
|
||||
mod socket;
|
||||
mod test;
|
||||
mod worker;
|
||||
|
||||
pub use self::builder::ServerBuilder;
|
||||
pub use self::config::{ServiceConfig, ServiceRuntime};
|
||||
pub use self::server::Server;
|
||||
pub use self::service::ServiceFactory;
|
||||
pub use self::test::{test_server, TestServer};
|
||||
pub use actix_server::*;
|
||||
|
||||
#[doc(hidden)]
|
||||
pub use self::socket::FromStream;
|
||||
|
||||
/// Socket id token
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
|
||||
pub(crate) struct Token(usize);
|
||||
|
||||
impl Token {
|
||||
pub(crate) fn next(&mut self) -> Token {
|
||||
let token = Token(self.0);
|
||||
self.0 += 1;
|
||||
token
|
||||
}
|
||||
}
|
||||
|
||||
/// Start server building process
|
||||
pub fn new() -> ServerBuilder {
|
||||
ServerBuilder::default()
|
||||
}
|
||||
|
|
|
@ -7,8 +7,8 @@ use futures::channel::mpsc::UnboundedSender;
|
|||
use futures::channel::oneshot;
|
||||
use futures::FutureExt;
|
||||
|
||||
use crate::builder::ServerBuilder;
|
||||
use crate::signals::Signal;
|
||||
use super::builder::ServerBuilder;
|
||||
use super::signals::Signal;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum ServerCommand {
|
|
@ -4,14 +4,15 @@ use std::task::{Context, Poll};
|
|||
use std::time::Duration;
|
||||
|
||||
use actix_rt::spawn;
|
||||
use actix_service::{self as actix, Service, ServiceFactory as ActixServiceFactory};
|
||||
use actix_utils::counter::CounterGuard;
|
||||
use futures::future::{err, ok, LocalBoxFuture, Ready};
|
||||
use futures::{FutureExt, TryFutureExt};
|
||||
use log::error;
|
||||
|
||||
use crate::service::{Service, ServiceFactory as ActixServiceFactory};
|
||||
use crate::util::counter::CounterGuard;
|
||||
|
||||
use super::socket::{FromStream, StdStream};
|
||||
use super::Token;
|
||||
use crate::socket::{FromStream, StdStream};
|
||||
|
||||
/// Server message
|
||||
pub(crate) enum ServerMessage {
|
||||
|
@ -24,7 +25,7 @@ pub(crate) enum ServerMessage {
|
|||
}
|
||||
|
||||
pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
|
||||
type Factory: actix::ServiceFactory<Config = (), Request = Stream>;
|
||||
type Factory: ActixServiceFactory<Config = (), Request = Stream>;
|
||||
|
||||
fn create(&self) -> Self::Factory;
|
||||
}
|
||||
|
@ -34,7 +35,9 @@ pub(crate) trait InternalServiceFactory: Send {
|
|||
|
||||
fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
|
||||
|
||||
fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>;
|
||||
fn create(
|
||||
&self,
|
||||
) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>;
|
||||
}
|
||||
|
||||
pub(crate) type BoxedServerService = Box<
|
||||
|
@ -72,7 +75,10 @@ where
|
|||
self.service.poll_ready(ctx).map_err(|_| ())
|
||||
}
|
||||
|
||||
fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
|
||||
fn call(
|
||||
&mut self,
|
||||
(guard, req): (Option<CounterGuard>, ServerMessage),
|
||||
) -> Self::Future {
|
||||
match req {
|
||||
ServerMessage::Connect(stream) => {
|
||||
let stream = FromStream::from_stdstream(stream).map_err(|e| {
|
||||
|
@ -143,7 +149,9 @@ where
|
|||
})
|
||||
}
|
||||
|
||||
fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
|
||||
fn create(
|
||||
&self,
|
||||
) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
|
||||
let token = self.token;
|
||||
self.inner
|
||||
.create()
|
||||
|
@ -166,7 +174,9 @@ impl InternalServiceFactory for Box<dyn InternalServiceFactory> {
|
|||
self.as_ref().clone_factory()
|
||||
}
|
||||
|
||||
fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
|
||||
fn create(
|
||||
&self,
|
||||
) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
|
||||
self.as_ref().create()
|
||||
}
|
||||
}
|
||||
|
@ -174,7 +184,7 @@ impl InternalServiceFactory for Box<dyn InternalServiceFactory> {
|
|||
impl<F, T, I> ServiceFactory<I> for F
|
||||
where
|
||||
F: Fn() -> T + Send + Clone + 'static,
|
||||
T: actix::ServiceFactory<Config = (), Request = I>,
|
||||
T: ActixServiceFactory<Config = (), Request = I>,
|
||||
I: FromStream,
|
||||
{
|
||||
type Factory = T;
|
|
@ -40,7 +40,9 @@ impl fmt::Display for StdListener {
|
|||
match *self {
|
||||
StdListener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()),
|
||||
#[cfg(all(unix))]
|
||||
StdListener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()),
|
||||
StdListener::Uds(ref lst) => {
|
||||
write!(f, "{:?}", lst.local_addr().ok().unwrap())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -85,9 +87,9 @@ pub(crate) enum SocketListener {
|
|||
impl SocketListener {
|
||||
pub(crate) fn accept(&self) -> io::Result<Option<(StdStream, SocketAddr)>> {
|
||||
match *self {
|
||||
SocketListener::Tcp(ref lst) => lst
|
||||
.accept_std()
|
||||
.map(|(stream, addr)| Some((StdStream::Tcp(stream), SocketAddr::Tcp(addr)))),
|
||||
SocketListener::Tcp(ref lst) => lst.accept_std().map(|(stream, addr)| {
|
||||
Some((StdStream::Tcp(stream), SocketAddr::Tcp(addr)))
|
||||
}),
|
||||
#[cfg(all(unix))]
|
||||
SocketListener::Uds(ref lst) => lst.accept_std().map(|res| {
|
||||
res.map(|(stream, addr)| (StdStream::Uds(stream), SocketAddr::Uds(addr)))
|
|
@ -6,32 +6,33 @@ use std::time;
|
|||
|
||||
use actix_rt::time::{delay_until, Delay, Instant};
|
||||
use actix_rt::{spawn, Arbiter};
|
||||
use actix_utils::counter::Counter;
|
||||
use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
|
||||
use futures::channel::oneshot;
|
||||
use futures::future::{join_all, LocalBoxFuture, MapOk};
|
||||
use futures::{Future, FutureExt, Stream, TryFutureExt};
|
||||
use log::{error, info, trace};
|
||||
|
||||
use crate::accept::AcceptNotify;
|
||||
use crate::service::{BoxedServerService, InternalServiceFactory, ServerMessage};
|
||||
use crate::socket::{SocketAddr, StdStream};
|
||||
use crate::Token;
|
||||
use crate::util::counter::Counter;
|
||||
|
||||
pub(crate) struct WorkerCommand(Conn);
|
||||
use super::accept::AcceptNotify;
|
||||
use super::service::{BoxedServerService, InternalServiceFactory, ServerMessage};
|
||||
use super::socket::{SocketAddr, StdStream};
|
||||
use super::Token;
|
||||
|
||||
pub(super) struct WorkerCommand(Conn);
|
||||
|
||||
/// Stop worker message. Returns `true` on successful shutdown
|
||||
/// and `false` if some connections still alive.
|
||||
pub(crate) struct StopCommand {
|
||||
pub(super) struct StopCommand {
|
||||
graceful: bool,
|
||||
result: oneshot::Sender<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Conn {
|
||||
pub io: StdStream,
|
||||
pub token: Token,
|
||||
pub peer: Option<SocketAddr>,
|
||||
pub(super) struct Conn {
|
||||
pub(super) io: StdStream,
|
||||
pub(super) token: Token,
|
||||
pub(super) peer: Option<SocketAddr>,
|
||||
}
|
||||
|
||||
static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600);
|
||||
|
@ -42,11 +43,11 @@ static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600);
|
|||
/// reached for each worker.
|
||||
///
|
||||
/// By default max connections is set to a 25k per worker.
|
||||
pub fn max_concurrent_connections(num: usize) {
|
||||
pub(super) fn max_concurrent_connections(num: usize) {
|
||||
MAX_CONNS.store(num, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub(crate) fn num_connections() -> usize {
|
||||
pub(super) fn num_connections() -> usize {
|
||||
MAX_CONNS_COUNTER.with(|conns| conns.total())
|
||||
}
|
||||
|
||||
|
@ -56,15 +57,15 @@ thread_local! {
|
|||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct WorkerClient {
|
||||
pub idx: usize,
|
||||
pub(super) struct WorkerClient {
|
||||
pub(super) idx: usize,
|
||||
tx1: UnboundedSender<WorkerCommand>,
|
||||
tx2: UnboundedSender<StopCommand>,
|
||||
avail: WorkerAvailability,
|
||||
}
|
||||
|
||||
impl WorkerClient {
|
||||
pub fn new(
|
||||
pub(super) fn new(
|
||||
idx: usize,
|
||||
tx1: UnboundedSender<WorkerCommand>,
|
||||
tx2: UnboundedSender<StopCommand>,
|
||||
|
@ -78,17 +79,17 @@ impl WorkerClient {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn send(&self, msg: Conn) -> Result<(), Conn> {
|
||||
pub(super) fn send(&self, msg: Conn) -> Result<(), Conn> {
|
||||
self.tx1
|
||||
.unbounded_send(WorkerCommand(msg))
|
||||
.map_err(|msg| msg.into_inner().0)
|
||||
}
|
||||
|
||||
pub fn available(&self) -> bool {
|
||||
pub(super) fn available(&self) -> bool {
|
||||
self.avail.available()
|
||||
}
|
||||
|
||||
pub fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
|
||||
pub(super) fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
|
||||
let (result, rx) = oneshot::channel();
|
||||
let _ = self.tx2.unbounded_send(StopCommand { graceful, result });
|
||||
rx
|
||||
|
@ -96,24 +97,24 @@ impl WorkerClient {
|
|||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct WorkerAvailability {
|
||||
pub(super) struct WorkerAvailability {
|
||||
notify: AcceptNotify,
|
||||
available: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl WorkerAvailability {
|
||||
pub fn new(notify: AcceptNotify) -> Self {
|
||||
pub(super) fn new(notify: AcceptNotify) -> Self {
|
||||
WorkerAvailability {
|
||||
notify,
|
||||
available: Arc::new(AtomicBool::new(false)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn available(&self) -> bool {
|
||||
pub(super) fn available(&self) -> bool {
|
||||
self.available.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
pub fn set(&self, val: bool) {
|
||||
pub(super) fn set(&self, val: bool) {
|
||||
let old = self.available.swap(val, Ordering::Release);
|
||||
if !old && val {
|
||||
self.notify.notify()
|
||||
|
@ -125,7 +126,7 @@ impl WorkerAvailability {
|
|||
///
|
||||
/// Worker accepts Socket objects via unbounded channel and starts stream
|
||||
/// processing.
|
||||
pub(crate) struct Worker {
|
||||
pub(super) struct Worker {
|
||||
rx: UnboundedReceiver<WorkerCommand>,
|
||||
rx2: UnboundedReceiver<StopCommand>,
|
||||
services: Vec<WorkerService>,
|
||||
|
@ -160,7 +161,7 @@ enum WorkerServiceStatus {
|
|||
}
|
||||
|
||||
impl Worker {
|
||||
pub(crate) fn start(
|
||||
pub(super) fn start(
|
||||
idx: usize,
|
||||
factories: Vec<Box<dyn InternalServiceFactory>>,
|
||||
availability: WorkerAvailability,
|
||||
|
@ -332,7 +333,9 @@ impl Future for Worker {
|
|||
if num != 0 {
|
||||
info!("Graceful worker shutdown, {} connections", num);
|
||||
self.state = WorkerState::Shutdown(
|
||||
Box::pin(delay_until(Instant::now() + time::Duration::from_secs(1))),
|
||||
Box::pin(delay_until(
|
||||
Instant::now() + time::Duration::from_secs(1),
|
||||
)),
|
||||
Box::pin(delay_until(Instant::now() + self.shutdown_timeout)),
|
||||
Some(result),
|
||||
);
|
||||
|
@ -383,8 +386,11 @@ impl Future for Worker {
|
|||
self.factories[idx].name(token)
|
||||
);
|
||||
self.services[token.0].status = WorkerServiceStatus::Restarting;
|
||||
self.state =
|
||||
WorkerState::Restarting(idx, token, self.factories[idx].create());
|
||||
self.state = WorkerState::Restarting(
|
||||
idx,
|
||||
token,
|
||||
self.factories[idx].create(),
|
||||
);
|
||||
self.poll(cx)
|
||||
}
|
||||
}
|
||||
|
@ -453,9 +459,10 @@ impl Future for Worker {
|
|||
match self.check_readiness(cx) {
|
||||
Ok(true) => {
|
||||
let guard = self.conns.get();
|
||||
let _ = self.services[msg.token.0]
|
||||
.service
|
||||
.call((Some(guard), ServerMessage::Connect(msg.io)));
|
||||
let _ = self.services[msg.token.0].service.call((
|
||||
Some(guard),
|
||||
ServerMessage::Connect(msg.io),
|
||||
));
|
||||
continue;
|
||||
}
|
||||
Ok(false) => {
|
|
@ -2,8 +2,6 @@ use std::marker::PhantomData;
|
|||
use std::sync::{Arc, Mutex};
|
||||
use std::{fmt, io, net};
|
||||
|
||||
use actix_server::{Server, ServerBuilder};
|
||||
|
||||
#[cfg(feature = "openssl")]
|
||||
use actix_tls::openssl::{AlpnError, SslAcceptor, SslAcceptorBuilder};
|
||||
#[cfg(feature = "rustls")]
|
||||
|
@ -20,9 +18,10 @@ use crate::http::{
|
|||
};
|
||||
#[cfg(unix)]
|
||||
use crate::pipeline_factory;
|
||||
use crate::server::{Server, ServerBuilder};
|
||||
use crate::{map_config, IntoServiceFactory, Service, ServiceFactory};
|
||||
|
||||
use crate::web::config::AppConfig;
|
||||
use super::config::AppConfig;
|
||||
|
||||
struct Socket {
|
||||
scheme: &'static str,
|
||||
|
|
|
@ -8,7 +8,6 @@ use std::{fmt, net, thread, time};
|
|||
|
||||
use actix_codec::{AsyncRead, AsyncWrite, Framed};
|
||||
use actix_rt::{time::delay_for, System};
|
||||
use actix_server::Server;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use futures::future::ok;
|
||||
use futures::stream::{Stream, StreamExt};
|
||||
|
@ -30,6 +29,7 @@ use crate::http::{
|
|||
Extensions, HttpService, Method, Payload, Request, StatusCode, Uri, Version,
|
||||
};
|
||||
use crate::router::{Path, ResourceDef};
|
||||
use crate::server::Server;
|
||||
use crate::{map_config, IntoService, IntoServiceFactory, Service, ServiceFactory};
|
||||
|
||||
use crate::web::config::AppConfig;
|
||||
|
|
|
@ -5,13 +5,14 @@ use std::{net, thread, time};
|
|||
|
||||
use actix_codec::{BytesCodec, Framed};
|
||||
use actix_rt::net::TcpStream;
|
||||
use actix_server::Server;
|
||||
use actix_service::fn_service;
|
||||
use bytes::Bytes;
|
||||
use futures::future::{lazy, ok};
|
||||
use futures::SinkExt;
|
||||
use net2::TcpBuilder;
|
||||
|
||||
use ntex::server::Server;
|
||||
use ntex::service::fn_service;
|
||||
|
||||
fn unused_addr() -> net::SocketAddr {
|
||||
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
|
||||
let socket = TcpBuilder::new_v4().unwrap();
|
Loading…
Add table
Add a link
Reference in a new issue