Merge pull request #75 from ntex-rs/refactor-server-factory

Refactor server factory trait
This commit is contained in:
Nikolay Kim 2021-12-20 20:47:03 +06:00 committed by GitHub
commit 23bdc94d8b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 149 additions and 127 deletions

View file

@ -35,7 +35,7 @@ async fn main() -> io::Result<()> {
// start server // start server
server::ServerBuilder::new() server::ServerBuilder::new()
.bind("basic", "127.0.0.1:8443", move || { .bind("basic", "127.0.0.1:8443", move |_| {
pipeline_factory(filter_factory(TlsAcceptor::new(tls_config.clone()))) pipeline_factory(filter_factory(TlsAcceptor::new(tls_config.clone())))
.and_then(fn_service(|io: Io<_>| async move { .and_then(fn_service(|io: Io<_>| async move {
println!("New client is connected"); println!("New client is connected");

View file

@ -24,7 +24,7 @@ async fn main() -> io::Result<()> {
// start server // start server
server::ServerBuilder::new() server::ServerBuilder::new()
.bind("basic", "127.0.0.1:8443", move || { .bind("basic", "127.0.0.1:8443", move |_| {
pipeline_factory(filter_factory(SslAcceptor::new(acceptor.clone()))) pipeline_factory(filter_factory(SslAcceptor::new(acceptor.clone())))
.and_then(fn_service(|io: Io<_>| async move { .and_then(fn_service(|io: Io<_>| async move {
println!("New client is connected"); println!("New client is connected");

View file

@ -37,7 +37,7 @@ async fn main() -> io::Result<()> {
// start server // start server
server::ServerBuilder::new() server::ServerBuilder::new()
.bind("basic", "127.0.0.1:8443", move || { .bind("basic", "127.0.0.1:8443", move |_| {
HttpService::build() HttpService::build()
.client_timeout(Seconds(1)) .client_timeout(Seconds(1))
.disconnect_timeout(Seconds(1)) .disconnect_timeout(Seconds(1))

View file

@ -4,6 +4,8 @@
* Refactor http/1 dispatcher * Refactor http/1 dispatcher
* Refactor Server service configuration
## [0.5.0-b.0] - 2021-12-19 ## [0.5.0-b.0] - 2021-12-19
* Migrate io to ntex-io * Migrate io to ntex-io

View file

@ -13,7 +13,7 @@ async fn main() -> io::Result<()> {
env_logger::init(); env_logger::init();
Server::build() Server::build()
.bind("echo", "127.0.0.1:8080", || { .bind("echo", "127.0.0.1:8080", |_| {
HttpService::build() HttpService::build()
.client_timeout(Seconds(1)) .client_timeout(Seconds(1))
.disconnect_timeout(Seconds(1)) .disconnect_timeout(Seconds(1))

View file

@ -25,7 +25,7 @@ async fn main() -> io::Result<()> {
env_logger::init(); env_logger::init();
Server::build() Server::build()
.bind("echo", "127.0.0.1:8080", || { .bind("echo", "127.0.0.1:8080", |_| {
HttpService::build().finish(handle_request) HttpService::build().finish(handle_request)
})? })?
.run() .run()

View file

@ -11,7 +11,7 @@ async fn main() -> io::Result<()> {
env_logger::init(); env_logger::init();
Server::build() Server::build()
.bind("hello-world", "127.0.0.1:8080", || { .bind("hello-world", "127.0.0.1:8080", |_| {
HttpService::build() HttpService::build()
.client_timeout(Seconds(1)) .client_timeout(Seconds(1))
.disconnect_timeout(Seconds(1)) .disconnect_timeout(Seconds(1))

View file

@ -4,8 +4,7 @@ use std::{convert::TryFrom, io, net, str::FromStr, sync::mpsc, thread};
#[cfg(feature = "cookie")] #[cfg(feature = "cookie")]
use coo_kie::{Cookie, CookieJar}; use coo_kie::{Cookie, CookieJar};
use crate::rt::System; use crate::{io::Io, rt::System, server::Server, service::ServiceFactory};
use crate::server::{Server, StreamServiceFactory};
use crate::{time::Millis, time::Seconds, util::Bytes}; use crate::{time::Millis, time::Seconds, util::Bytes};
use super::client::error::WsClientError; use super::client::error::WsClientError;
@ -209,7 +208,11 @@ fn parts(parts: &mut Option<Inner>) -> &mut Inner {
/// assert!(response.status().is_success()); /// assert!(response.status().is_success());
/// } /// }
/// ``` /// ```
pub fn server<F: StreamServiceFactory>(factory: F) -> TestServer { pub fn server<F, R>(factory: F) -> TestServer
where
F: Fn() -> R + Send + Clone + 'static,
R: ServiceFactory<Config = (), Request = Io>,
{
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
// run server in separate thread // run server in separate thread
@ -220,10 +223,10 @@ pub fn server<F: StreamServiceFactory>(factory: F) -> TestServer {
sys.exec(|| { sys.exec(|| {
Server::build() Server::build()
.listen("test", tcp, factory)? .listen("test", tcp, move |_| factory())?
.workers(1) .workers(1)
.disable_signals() .disable_signals()
.start(); .run();
Ok::<_, io::Error>(()) Ok::<_, io::Error>(())
})?; })?;

View file

@ -8,12 +8,16 @@ use futures_core::Stream;
use log::{error, info}; use log::{error, info};
use socket2::{Domain, SockAddr, Socket, Type}; use socket2::{Domain, SockAddr, Socket, Type};
use crate::io::Io;
use crate::rt::{spawn, Signal, System}; use crate::rt::{spawn, Signal, System};
use crate::{time::sleep, time::Millis, util::join_all, util::PoolId}; use crate::service::ServiceFactory;
use crate::{time::sleep, time::Millis, util::join_all};
use super::accept::{AcceptLoop, AcceptNotify, Command}; use super::accept::{AcceptLoop, AcceptNotify, Command};
use super::config::{ConfigWrapper, ConfiguredService, ServiceConfig, ServiceRuntime}; use super::config::{
use super::service::{Factory, InternalServiceFactory, StreamServiceFactory}; ConfigWrapper, Configuration, ConfiguredService, RuntimeConfiguration, ServiceConfig,
};
use super::service::{Factory, InternalServiceFactory};
use super::socket::Listener; use super::socket::Listener;
use super::worker::{self, Worker, WorkerAvailability, WorkerClient}; use super::worker::{self, Worker, WorkerAvailability, WorkerClient};
use super::{Server, ServerCommand, ServerStatus, Token}; use super::{Server, ServerCommand, ServerStatus, Token};
@ -147,9 +151,9 @@ impl ServerBuilder {
/// different module or even library. /// different module or even library.
pub fn configure<F>(mut self, f: F) -> io::Result<ServerBuilder> pub fn configure<F>(mut self, f: F) -> io::Result<ServerBuilder>
where where
F: Fn(&mut ServiceConfig) -> io::Result<()>, F: Fn(&mut Configuration) -> io::Result<()>,
{ {
let mut cfg = ServiceConfig::new(self.threads, self.backlog); let mut cfg = Configuration::new(self.threads, self.backlog);
f(&mut cfg)?; f(&mut cfg)?;
@ -172,7 +176,7 @@ impl ServerBuilder {
/// It get executed in the worker thread. /// It get executed in the worker thread.
pub fn on_worker_start<F, R, E>(mut self, f: F) -> Self pub fn on_worker_start<F, R, E>(mut self, f: F) -> Self
where where
F: Fn(ServiceRuntime) -> R + Send + Clone + 'static, F: Fn(RuntimeConfiguration) -> R + Send + Clone + 'static,
R: Future<Output = Result<(), E>> + 'static, R: Future<Output = Result<(), E>> + 'static,
E: fmt::Display + 'static, E: fmt::Display + 'static,
{ {
@ -184,27 +188,17 @@ impl ServerBuilder {
self self
} }
/// Set memory pool for name dservice.
///
/// Use specified memory pool for memory allocations. By default P0
/// memory pool is used.
pub fn memory_pool<N: AsRef<str>>(mut self, name: N, id: PoolId) -> Self {
for srv in &mut self.services {
srv.set_memory_pool(name.as_ref(), id)
}
self
}
/// Add new service to the server. /// Add new service to the server.
pub fn bind<F, U, N: AsRef<str>>( pub fn bind<F, U, N: AsRef<str>, R>(
mut self, mut self,
name: N, name: N,
addr: U, addr: U,
factory: F, factory: F,
) -> io::Result<Self> ) -> io::Result<Self>
where where
F: StreamServiceFactory,
U: net::ToSocketAddrs, U: net::ToSocketAddrs,
F: Fn(ServiceConfig) -> R + Send + Clone + 'static,
R: ServiceFactory<Config = (), Request = Io>,
{ {
let sockets = bind_addr(addr, self.backlog)?; let sockets = bind_addr(addr, self.backlog)?;
@ -227,11 +221,12 @@ impl ServerBuilder {
#[cfg(all(unix))] #[cfg(all(unix))]
/// Add new unix domain service to the server. /// Add new unix domain service to the server.
pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self> pub fn bind_uds<F, U, N, R>(self, name: N, addr: U, factory: F) -> io::Result<Self>
where where
F: StreamServiceFactory,
N: AsRef<str>, N: AsRef<str>,
U: AsRef<std::path::Path>, U: AsRef<std::path::Path>,
F: Fn(ServiceConfig) -> R + Send + Clone + 'static,
R: ServiceFactory<Config = (), Request = Io>,
{ {
use std::os::unix::net::UnixListener; use std::os::unix::net::UnixListener;
@ -252,14 +247,15 @@ impl ServerBuilder {
/// Add new unix domain service to the server. /// Add new unix domain service to the server.
/// Useful when running as a systemd service and /// Useful when running as a systemd service and
/// a socket FD can be acquired using the systemd crate. /// a socket FD can be acquired using the systemd crate.
pub fn listen_uds<F, N: AsRef<str>>( pub fn listen_uds<F, N: AsRef<str>, R>(
mut self, mut self,
name: N, name: N,
lst: std::os::unix::net::UnixListener, lst: std::os::unix::net::UnixListener,
factory: F, factory: F,
) -> io::Result<Self> ) -> io::Result<Self>
where where
F: StreamServiceFactory, F: Fn(ServiceConfig) -> R + Send + Clone + 'static,
R: ServiceFactory<Config = (), Request = Io>,
{ {
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
let token = self.token.next(); let token = self.token.next();
@ -276,14 +272,15 @@ impl ServerBuilder {
} }
/// Add new service to the server. /// Add new service to the server.
pub fn listen<F, N: AsRef<str>>( pub fn listen<F, N: AsRef<str>, R>(
mut self, mut self,
name: N, name: N,
lst: net::TcpListener, lst: net::TcpListener,
factory: F, factory: F,
) -> io::Result<Self> ) -> io::Result<Self>
where where
F: StreamServiceFactory, F: Fn(ServiceConfig) -> R + Send + Clone + 'static,
R: ServiceFactory<Config = (), Request = Io>,
{ {
let token = self.token.next(); let token = self.token.next();
self.services.push(Factory::create( self.services.push(Factory::create(
@ -297,11 +294,6 @@ impl ServerBuilder {
Ok(self) Ok(self)
} }
#[doc(hidden)]
pub fn start(self) -> Server {
self.run()
}
/// Starts processing incoming connections and return server controller. /// Starts processing incoming connections and return server controller.
pub fn run(mut self) -> Server { pub fn run(mut self) -> Server {
if self.sockets.is_empty() { if self.sockets.is_empty() {

View file

@ -1,6 +1,6 @@
use std::{ use std::{
cell::RefCell, fmt, future::Future, io, marker::PhantomData, mem, net, pin::Pin, cell::Cell, cell::RefCell, fmt, future::Future, io, marker::PhantomData, mem, net,
rc::Rc, pin::Pin, rc::Rc,
}; };
use log::error; use log::error;
@ -14,7 +14,32 @@ use super::service::{
}; };
use super::Token; use super::Token;
pub struct ServiceConfig { #[derive(Clone)]
pub struct ServiceConfig(pub(super) Rc<InnerServiceConfig>);
pub(super) struct InnerServiceConfig {
pub(super) pool: Cell<PoolId>,
}
impl Default for ServiceConfig {
fn default() -> Self {
Self(Rc::new(InnerServiceConfig {
pool: Cell::new(PoolId::DEFAULT),
}))
}
}
impl ServiceConfig {
/// Set memory pool for the service.
///
/// Use specified memory pool for memory allocations.
pub fn memory_pool(&self, id: PoolId) -> &Self {
self.0.pool.set(id);
self
}
}
pub struct Configuration {
pub(super) services: Vec<(String, net::TcpListener)>, pub(super) services: Vec<(String, net::TcpListener)>,
pub(super) apply: Box<dyn ServiceRuntimeConfiguration + Send>, pub(super) apply: Box<dyn ServiceRuntimeConfiguration + Send>,
pub(super) threads: usize, pub(super) threads: usize,
@ -22,9 +47,9 @@ pub struct ServiceConfig {
applied: bool, applied: bool,
} }
impl ServiceConfig { impl Configuration {
pub(super) fn new(threads: usize, backlog: i32) -> ServiceConfig { pub(super) fn new(threads: usize, backlog: i32) -> Self {
ServiceConfig { Configuration {
threads, threads,
backlog, backlog,
services: Vec::new(), services: Vec::new(),
@ -78,7 +103,7 @@ impl ServiceConfig {
/// It get executed in the worker thread. /// It get executed in the worker thread.
pub fn on_worker_start<F, R, E>(&mut self, f: F) -> io::Result<()> pub fn on_worker_start<F, R, E>(&mut self, f: F) -> io::Result<()>
where where
F: Fn(ServiceRuntime) -> R + Send + Clone + 'static, F: Fn(RuntimeConfiguration) -> R + Send + Clone + 'static,
R: Future<Output = Result<(), E>> + 'static, R: Future<Output = Result<(), E>> + 'static,
E: fmt::Display + 'static, E: fmt::Display + 'static,
{ {
@ -126,15 +151,13 @@ impl InternalServiceFactory for ConfiguredService {
}) })
} }
fn set_memory_pool(&self, _: &str, _: PoolId) {}
fn create( fn create(
&self, &self,
) -> Pin<Box<dyn Future<Output = Result<Vec<(Token, BoxedServerService)>, ()>>>> ) -> Pin<Box<dyn Future<Output = Result<Vec<(Token, BoxedServerService)>, ()>>>>
{ {
// configure services // configure services
let rt = ServiceRuntime::new(self.topics.clone()); let rt = RuntimeConfiguration::new(self.topics.clone());
let cfg_fut = self.rt.configure(ServiceRuntime(rt.0.clone())); let cfg_fut = self.rt.configure(RuntimeConfiguration(rt.0.clone()));
let mut names = self.names.clone(); let mut names = self.names.clone();
let tokens = self.services.clone(); let tokens = self.services.clone();
@ -185,7 +208,7 @@ pub(super) trait ServiceRuntimeConfiguration {
fn configure( fn configure(
&self, &self,
rt: ServiceRuntime, rt: RuntimeConfiguration,
) -> Pin<Box<dyn Future<Output = Result<(), ()>>>>; ) -> Pin<Box<dyn Future<Output = Result<(), ()>>>>;
} }
@ -199,7 +222,7 @@ unsafe impl<F: Send, R, E> Send for ConfigWrapper<F, R, E> {}
impl<F, R, E> ServiceRuntimeConfiguration for ConfigWrapper<F, R, E> impl<F, R, E> ServiceRuntimeConfiguration for ConfigWrapper<F, R, E>
where where
F: Fn(ServiceRuntime) -> R + Send + Clone + 'static, F: Fn(RuntimeConfiguration) -> R + Send + Clone + 'static,
R: Future<Output = Result<(), E>> + 'static, R: Future<Output = Result<(), E>> + 'static,
E: fmt::Display + 'static, E: fmt::Display + 'static,
{ {
@ -212,7 +235,7 @@ where
fn configure( fn configure(
&self, &self,
rt: ServiceRuntime, rt: RuntimeConfiguration,
) -> Pin<Box<dyn Future<Output = Result<(), ()>>>> { ) -> Pin<Box<dyn Future<Output = Result<(), ()>>>> {
let f = self.f.clone(); let f = self.f.clone();
Box::pin(async move { Box::pin(async move {
@ -227,7 +250,7 @@ fn not_configured() {
error!("Service is not configured"); error!("Service is not configured");
} }
pub struct ServiceRuntime(Rc<RefCell<ServiceRuntimeInner>>); pub struct RuntimeConfiguration(Rc<RefCell<ServiceRuntimeInner>>);
struct ServiceRuntimeInner { struct ServiceRuntimeInner {
names: HashMap<String, Token>, names: HashMap<String, Token>,
@ -235,9 +258,9 @@ struct ServiceRuntimeInner {
onstart: Vec<Pin<Box<dyn Future<Output = ()>>>>, onstart: Vec<Pin<Box<dyn Future<Output = ()>>>>,
} }
impl ServiceRuntime { impl RuntimeConfiguration {
fn new(names: HashMap<String, Token>) -> Self { fn new(names: HashMap<String, Token>) -> Self {
ServiceRuntime(Rc::new(RefCell::new(ServiceRuntimeInner { RuntimeConfiguration(Rc::new(RefCell::new(ServiceRuntimeInner {
names, names,
services: HashMap::default(), services: HashMap::default(),
onstart: Vec::new(), onstart: Vec::new(),

View file

@ -14,8 +14,7 @@ mod worker;
pub(crate) use self::builder::create_tcp_listener; pub(crate) use self::builder::create_tcp_listener;
pub use self::builder::ServerBuilder; pub use self::builder::ServerBuilder;
pub use self::config::{ServiceConfig, ServiceRuntime}; pub use self::config::{Configuration, RuntimeConfiguration, ServiceConfig};
pub use self::service::StreamServiceFactory;
pub use self::test::{build_test_server, test_server, TestServer}; pub use self::test::{build_test_server, test_server, TestServer};
#[non_exhaustive] #[non_exhaustive]

View file

@ -1,7 +1,5 @@
use std::convert::TryInto; use std::convert::TryInto;
use std::{ use std::{future::Future, net::SocketAddr, pin::Pin, task::Context, task::Poll};
cell::Cell, future::Future, net::SocketAddr, pin::Pin, task::Context, task::Poll,
};
use log::error; use log::error;
@ -10,7 +8,7 @@ use crate::service::{Service, ServiceFactory};
use crate::util::{counter::CounterGuard, Pool, PoolId, Ready}; use crate::util::{counter::CounterGuard, Pool, PoolId, Ready};
use crate::{rt::spawn, time::Millis}; use crate::{rt::spawn, time::Millis};
use super::{socket::Stream, Token}; use super::{socket::Stream, ServiceConfig, Token};
/// Server message /// Server message
pub(super) enum ServerMessage { pub(super) enum ServerMessage {
@ -22,10 +20,10 @@ pub(super) enum ServerMessage {
ForceShutdown, ForceShutdown,
} }
pub trait StreamServiceFactory: Send + Clone + 'static { pub(super) trait StreamServiceFactory: Send + Clone + 'static {
type Factory: ServiceFactory<Config = (), Request = Io>; type Factory: ServiceFactory<Config = (), Request = Io>;
fn create(&self) -> Self::Factory; fn create(&self, _: ServiceConfig) -> Self::Factory;
} }
pub(super) trait InternalServiceFactory: Send { pub(super) trait InternalServiceFactory: Send {
@ -33,8 +31,6 @@ pub(super) trait InternalServiceFactory: Send {
fn clone_factory(&self) -> Box<dyn InternalServiceFactory>; fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
fn set_memory_pool(&self, name: &str, pool: PoolId);
fn create( fn create(
&self, &self,
) -> Pin<Box<dyn Future<Output = Result<Vec<(Token, BoxedServerService)>, ()>>>>; ) -> Pin<Box<dyn Future<Output = Result<Vec<(Token, BoxedServerService)>, ()>>>>;
@ -120,7 +116,6 @@ pub(super) struct Factory<F: StreamServiceFactory> {
inner: F, inner: F,
token: Token, token: Token,
addr: SocketAddr, addr: SocketAddr,
pool: Cell<PoolId>,
} }
impl<F> Factory<F> impl<F> Factory<F>
@ -138,7 +133,6 @@ where
token, token,
inner, inner,
addr, addr,
pool: Cell::new(PoolId::P0),
}) })
} }
} }
@ -157,29 +151,22 @@ where
inner: self.inner.clone(), inner: self.inner.clone(),
token: self.token, token: self.token,
addr: self.addr, addr: self.addr,
pool: self.pool.clone(),
}) })
} }
fn set_memory_pool(&self, name: &str, pool: PoolId) {
if self.name == name {
self.pool.set(pool)
}
}
fn create( fn create(
&self, &self,
) -> Pin<Box<dyn Future<Output = Result<Vec<(Token, BoxedServerService)>, ()>>>> ) -> Pin<Box<dyn Future<Output = Result<Vec<(Token, BoxedServerService)>, ()>>>>
{ {
let token = self.token; let token = self.token;
let pool = self.pool.get(); let cfg = ServiceConfig::default();
let fut = self.inner.create().new_service(()); let fut = self.inner.create(cfg.clone()).new_service(());
Box::pin(async move { Box::pin(async move {
match fut.await { match fut.await {
Ok(inner) => { Ok(inner) => {
let service: BoxedServerService = let service: BoxedServerService =
Box::new(StreamService::new(inner, pool)); Box::new(StreamService::new(inner, cfg.0.pool.get()));
Ok(vec![(token, service)]) Ok(vec![(token, service)])
} }
Err(_) => Err(()), Err(_) => Err(()),
@ -197,10 +184,6 @@ impl InternalServiceFactory for Box<dyn InternalServiceFactory> {
self.as_ref().clone_factory() self.as_ref().clone_factory()
} }
fn set_memory_pool(&self, name: &str, pool: PoolId) {
self.as_ref().set_memory_pool(name, pool)
}
fn create( fn create(
&self, &self,
) -> Pin<Box<dyn Future<Output = Result<Vec<(Token, BoxedServerService)>, ()>>>> ) -> Pin<Box<dyn Future<Output = Result<Vec<(Token, BoxedServerService)>, ()>>>>
@ -211,13 +194,13 @@ impl InternalServiceFactory for Box<dyn InternalServiceFactory> {
impl<F, T> StreamServiceFactory for F impl<F, T> StreamServiceFactory for F
where where
F: Fn() -> T + Send + Clone + 'static, F: Fn(ServiceConfig) -> T + Send + Clone + 'static,
T: ServiceFactory<Config = (), Request = Io>, T: ServiceFactory<Config = (), Request = Io>,
{ {
type Factory = T; type Factory = T;
#[inline] #[inline]
fn create(&self) -> T { fn create(&self, cfg: ServiceConfig) -> T {
(self)() (self)(cfg)
} }
} }

View file

@ -3,9 +3,9 @@ use std::{io, net, sync::mpsc, thread};
use socket2::{Domain, SockAddr, Socket, Type}; use socket2::{Domain, SockAddr, Socket, Type};
use crate::io::Io;
use crate::rt::{tcp_connect, System}; use crate::rt::{tcp_connect, System};
use crate::server::{Server, ServerBuilder, StreamServiceFactory}; use crate::server::{Server, ServerBuilder};
use crate::{io::Io, service::ServiceFactory};
/// Start test server /// Start test server
/// ///
@ -38,7 +38,11 @@ use crate::server::{Server, ServerBuilder, StreamServiceFactory};
/// assert!(response.status().is_success()); /// assert!(response.status().is_success());
/// } /// }
/// ``` /// ```
pub fn test_server<F: StreamServiceFactory>(factory: F) -> TestServer { pub fn test_server<F, R>(factory: F) -> TestServer
where
F: Fn() -> R + Send + Clone + 'static,
R: ServiceFactory<Config = (), Request = Io>,
{
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
// run server in separate thread // run server in separate thread
@ -49,10 +53,10 @@ pub fn test_server<F: StreamServiceFactory>(factory: F) -> TestServer {
sys.exec(|| { sys.exec(|| {
Server::build() Server::build()
.listen("test", tcp, factory)? .listen("test", tcp, move |_| factory())?
.workers(1) .workers(1)
.disable_signals() .disable_signals()
.start(); .run();
Ok::<_, io::Error>(()) Ok::<_, io::Error>(())
})?; })?;
@ -77,10 +81,7 @@ where
let sys = System::new("ntex-test-server"); let sys = System::new("ntex-test-server");
sys.exec(|| { sys.exec(|| {
factory(Server::build()) factory(Server::build()).workers(1).disable_signals().run();
.workers(1)
.disable_signals()
.start();
}); });
tx.send(System::current()).unwrap(); tx.send(System::current()).unwrap();

View file

@ -592,7 +592,7 @@ mod tests {
vec![Factory::create( vec![Factory::create(
"test".to_string(), "test".to_string(),
Token(0), Token(0),
move || f.clone(), move |_| f.clone(),
"127.0.0.1:8080".parse().unwrap(), "127.0.0.1:8080".parse().unwrap(),
)], )],
avail.clone(), avail.clone(),
@ -664,7 +664,7 @@ mod tests {
vec![Factory::create( vec![Factory::create(
"test".to_string(), "test".to_string(),
Token(0), Token(0),
move || f.clone(), move |_| f.clone(),
"127.0.0.1:8080".parse().unwrap(), "127.0.0.1:8080".parse().unwrap(),
)], )],
avail.clone(), avail.clone(),

View file

@ -468,7 +468,7 @@ where
/// ///
/// #[ntex::main] /// #[ntex::main]
/// async fn main() -> std::io::Result<()> { /// async fn main() -> std::io::Result<()> {
/// server::build().bind("http", "127.0.0.1:0", || /// server::build().bind("http", "127.0.0.1:0", |_|
/// http::HttpService::build().finish( /// http::HttpService::build().finish(
/// web::App::new() /// web::App::new()
/// .route("/index.html", web::get().to(|| async { "hello_world" })) /// .route("/index.html", web::get().to(|| async { "hello_world" }))
@ -498,7 +498,7 @@ where
/// ///
/// #[ntex::main] /// #[ntex::main]
/// async fn main() -> std::io::Result<()> { /// async fn main() -> std::io::Result<()> {
/// server::build().bind("http", "127.0.0.1:0", || /// server::build().bind("http", "127.0.0.1:0", |_|
/// http::HttpService::build().finish( /// http::HttpService::build().finish(
/// web::App::new() /// web::App::new()
/// .route("/index.html", web::get().to(|| async { "hello_world" })) /// .route("/index.html", web::get().to(|| async { "hello_world" }))

View file

@ -9,8 +9,8 @@ use crate::http::{
body::MessageBody, HttpService, KeepAlive, Request, Response, ResponseError, body::MessageBody, HttpService, KeepAlive, Request, Response, ResponseError,
}; };
use crate::server::{Server, ServerBuilder}; use crate::server::{Server, ServerBuilder};
use crate::time::Seconds;
use crate::{service::map_config, IntoServiceFactory, Service, ServiceFactory}; use crate::{service::map_config, IntoServiceFactory, Service, ServiceFactory};
use crate::{time::Seconds, util::PoolId};
use super::config::AppConfig; use super::config::AppConfig;
@ -20,6 +20,7 @@ struct Config {
client_timeout: Seconds, client_timeout: Seconds,
client_disconnect: Seconds, client_disconnect: Seconds,
handshake_timeout: Seconds, handshake_timeout: Seconds,
pool: PoolId,
} }
/// An HTTP Server. /// An HTTP Server.
@ -79,6 +80,7 @@ where
client_timeout: Seconds(5), client_timeout: Seconds(5),
client_disconnect: Seconds(5), client_disconnect: Seconds(5),
handshake_timeout: Seconds(5), handshake_timeout: Seconds(5),
pool: PoolId::P0,
})), })),
backlog: 1024, backlog: 1024,
builder: ServerBuilder::default(), builder: ServerBuilder::default(),
@ -219,6 +221,14 @@ where
self self
} }
/// Set memory pool.
///
/// Use specified memory pool for memory allocations.
pub fn memory_pool(&mut self, id: PoolId) -> &mut Self {
self.config.lock().unwrap().pool = id;
self
}
/// Use listener for accepting incoming connection requests /// Use listener for accepting incoming connection requests
/// ///
/// HttpServer does not change any configuration for TcpListener, /// HttpServer does not change any configuration for TcpListener,
@ -231,13 +241,14 @@ where
self.builder = self.builder.listen( self.builder = self.builder.listen(
format!("ntex-web-service-{}", addr), format!("ntex-web-service-{}", addr),
lst, lst,
move || { move |r| {
let c = cfg.lock().unwrap(); let c = cfg.lock().unwrap();
let cfg = AppConfig::new( let cfg = AppConfig::new(
false, false,
addr, addr,
c.host.clone().unwrap_or_else(|| format!("{}", addr)), c.host.clone().unwrap_or_else(|| format!("{}", addr)),
); );
r.memory_pool(c.pool);
HttpService::build() HttpService::build()
.keep_alive(c.keep_alive) .keep_alive(c.keep_alive)
@ -274,13 +285,15 @@ where
self.builder = self.builder.listen( self.builder = self.builder.listen(
format!("ntex-web-service-{}", addr), format!("ntex-web-service-{}", addr),
lst, lst,
move || { move |r| {
let c = cfg.lock().unwrap(); let c = cfg.lock().unwrap();
let cfg = AppConfig::new( let cfg = AppConfig::new(
true, true,
addr, addr,
c.host.clone().unwrap_or_else(|| format!("{}", addr)), c.host.clone().unwrap_or_else(|| format!("{}", addr)),
); );
r.memory_pool(c.pool);
HttpService::build() HttpService::build()
.keep_alive(c.keep_alive) .keep_alive(c.keep_alive)
.client_timeout(c.client_timeout) .client_timeout(c.client_timeout)
@ -318,13 +331,15 @@ where
self.builder = self.builder.listen( self.builder = self.builder.listen(
format!("ntex-web-rustls-service-{}", addr), format!("ntex-web-rustls-service-{}", addr),
lst, lst,
move || { move |r| {
let c = cfg.lock().unwrap(); let c = cfg.lock().unwrap();
let cfg = AppConfig::new( let cfg = AppConfig::new(
true, true,
addr, addr,
c.host.clone().unwrap_or_else(|| format!("{}", addr)), c.host.clone().unwrap_or_else(|| format!("{}", addr)),
); );
r.memory_pool(c.pool);
HttpService::build() HttpService::build()
.keep_alive(c.keep_alive) .keep_alive(c.keep_alive)
.client_timeout(c.client_timeout) .client_timeout(c.client_timeout)
@ -436,13 +451,15 @@ where
let addr = format!("ntex-web-service-{:?}", lst.local_addr()?); let addr = format!("ntex-web-service-{:?}", lst.local_addr()?);
self.builder = self.builder.listen_uds(addr, lst, move || { self.builder = self.builder.listen_uds(addr, lst, move |r| {
let c = cfg.lock().unwrap(); let c = cfg.lock().unwrap();
let config = AppConfig::new( let config = AppConfig::new(
false, false,
socket_addr, socket_addr,
c.host.clone().unwrap_or_else(|| format!("{}", socket_addr)), c.host.clone().unwrap_or_else(|| format!("{}", socket_addr)),
); );
r.memory_pool(c.pool);
HttpService::build() HttpService::build()
.keep_alive(c.keep_alive) .keep_alive(c.keep_alive)
.client_timeout(c.client_timeout) .client_timeout(c.client_timeout)
@ -469,13 +486,15 @@ where
self.builder = self.builder.bind_uds( self.builder = self.builder.bind_uds(
format!("ntex-web-service-{:?}", addr.as_ref()), format!("ntex-web-service-{:?}", addr.as_ref()),
addr, addr,
move || { move |r| {
let c = cfg.lock().unwrap(); let c = cfg.lock().unwrap();
let config = AppConfig::new( let config = AppConfig::new(
false, false,
socket_addr, socket_addr,
c.host.clone().unwrap_or_else(|| format!("{}", socket_addr)), c.host.clone().unwrap_or_else(|| format!("{}", socket_addr)),
); );
r.memory_pool(c.pool);
HttpService::build() HttpService::build()
.keep_alive(c.keep_alive) .keep_alive(c.keep_alive)
.client_timeout(c.client_timeout) .client_timeout(c.client_timeout)
@ -520,7 +539,7 @@ where
/// } /// }
/// ``` /// ```
pub fn run(self) -> Server { pub fn run(self) -> Server {
self.builder.start() self.builder.run()
} }
} }

View file

@ -624,21 +624,21 @@ where
match cfg.stream { match cfg.stream {
StreamType::Tcp => match cfg.tp { StreamType::Tcp => match cfg.tp {
HttpVer::Http1 => builder.listen("test", tcp, move || { HttpVer::Http1 => builder.listen("test", tcp, move |_| {
let cfg = let cfg =
AppConfig::new(false, local_addr, format!("{}", local_addr)); AppConfig::new(false, local_addr, format!("{}", local_addr));
HttpService::build() HttpService::build()
.client_timeout(ctimeout) .client_timeout(ctimeout)
.h1(map_config(factory(), move |_| cfg.clone())) .h1(map_config(factory(), move |_| cfg.clone()))
}), }),
HttpVer::Http2 => builder.listen("test", tcp, move || { HttpVer::Http2 => builder.listen("test", tcp, move |_| {
let cfg = let cfg =
AppConfig::new(false, local_addr, format!("{}", local_addr)); AppConfig::new(false, local_addr, format!("{}", local_addr));
HttpService::build() HttpService::build()
.client_timeout(ctimeout) .client_timeout(ctimeout)
.h2(map_config(factory(), move |_| cfg.clone())) .h2(map_config(factory(), move |_| cfg.clone()))
}), }),
HttpVer::Both => builder.listen("test", tcp, move || { HttpVer::Both => builder.listen("test", tcp, move |_| {
let cfg = let cfg =
AppConfig::new(false, local_addr, format!("{}", local_addr)); AppConfig::new(false, local_addr, format!("{}", local_addr));
HttpService::build() HttpService::build()
@ -648,7 +648,7 @@ where
}, },
#[cfg(feature = "openssl")] #[cfg(feature = "openssl")]
StreamType::Openssl(acceptor) => match cfg.tp { StreamType::Openssl(acceptor) => match cfg.tp {
HttpVer::Http1 => builder.listen("test", tcp, move || { HttpVer::Http1 => builder.listen("test", tcp, move |_| {
let cfg = let cfg =
AppConfig::new(true, local_addr, format!("{}", local_addr)); AppConfig::new(true, local_addr, format!("{}", local_addr));
HttpService::build() HttpService::build()
@ -656,7 +656,7 @@ where
.h1(map_config(factory(), move |_| cfg.clone())) .h1(map_config(factory(), move |_| cfg.clone()))
.openssl(acceptor.clone()) .openssl(acceptor.clone())
}), }),
HttpVer::Http2 => builder.listen("test", tcp, move || { HttpVer::Http2 => builder.listen("test", tcp, move |_| {
let cfg = let cfg =
AppConfig::new(true, local_addr, format!("{}", local_addr)); AppConfig::new(true, local_addr, format!("{}", local_addr));
HttpService::build() HttpService::build()
@ -664,7 +664,7 @@ where
.h2(map_config(factory(), move |_| cfg.clone())) .h2(map_config(factory(), move |_| cfg.clone()))
.openssl(acceptor.clone()) .openssl(acceptor.clone())
}), }),
HttpVer::Both => builder.listen("test", tcp, move || { HttpVer::Both => builder.listen("test", tcp, move |_| {
let cfg = let cfg =
AppConfig::new(true, local_addr, format!("{}", local_addr)); AppConfig::new(true, local_addr, format!("{}", local_addr));
HttpService::build() HttpService::build()
@ -675,7 +675,7 @@ where
}, },
#[cfg(feature = "rustls")] #[cfg(feature = "rustls")]
StreamType::Rustls(config) => match cfg.tp { StreamType::Rustls(config) => match cfg.tp {
HttpVer::Http1 => builder.listen("test", tcp, move || { HttpVer::Http1 => builder.listen("test", tcp, move |_| {
let cfg = let cfg =
AppConfig::new(true, local_addr, format!("{}", local_addr)); AppConfig::new(true, local_addr, format!("{}", local_addr));
HttpService::build() HttpService::build()
@ -683,7 +683,7 @@ where
.h1(map_config(factory(), move |_| cfg.clone())) .h1(map_config(factory(), move |_| cfg.clone()))
.rustls(config.clone()) .rustls(config.clone())
}), }),
HttpVer::Http2 => builder.listen("test", tcp, move || { HttpVer::Http2 => builder.listen("test", tcp, move |_| {
let cfg = let cfg =
AppConfig::new(true, local_addr, format!("{}", local_addr)); AppConfig::new(true, local_addr, format!("{}", local_addr));
HttpService::build() HttpService::build()
@ -691,7 +691,7 @@ where
.h2(map_config(factory(), move |_| cfg.clone())) .h2(map_config(factory(), move |_| cfg.clone()))
.rustls(config.clone()) .rustls(config.clone())
}), }),
HttpVer::Both => builder.listen("test", tcp, move || { HttpVer::Both => builder.listen("test", tcp, move |_| {
let cfg = let cfg =
AppConfig::new(true, local_addr, format!("{}", local_addr)); AppConfig::new(true, local_addr, format!("{}", local_addr));
HttpService::build() HttpService::build()
@ -702,7 +702,7 @@ where
}, },
} }
.unwrap() .unwrap()
.start() .run()
}); });
tx.send((System::current(), srv, local_addr)).unwrap(); tx.send((System::current(), srv, local_addr)).unwrap();

View file

@ -21,9 +21,9 @@ fn test_bind() {
Server::build() Server::build()
.workers(1) .workers(1)
.disable_signals() .disable_signals()
.bind("test", addr, move || fn_service(|_| ok::<_, ()>(()))) .bind("test", addr, move |_| fn_service(|_| ok::<_, ()>(())))
.unwrap() .unwrap()
.start() .run()
}); });
let _ = tx.send((srv, ntex::rt::System::current())); let _ = tx.send((srv, ntex::rt::System::current()));
let _ = sys.run(); let _ = sys.run();
@ -48,9 +48,9 @@ fn test_listen() {
Server::build() Server::build()
.disable_signals() .disable_signals()
.workers(1) .workers(1)
.listen("test", lst, move || fn_service(|_| ok::<_, ()>(()))) .listen("test", lst, move |_| fn_service(|_| ok::<_, ()>(())))
.unwrap() .unwrap()
.start() .run()
}); });
let _ = tx.send(ntex::rt::System::current()); let _ = tx.send(ntex::rt::System::current());
let _ = sys.run(); let _ = sys.run();
@ -65,7 +65,7 @@ fn test_listen() {
#[test] #[test]
#[cfg(unix)] #[cfg(unix)]
fn test_start() { fn test_run() {
let addr = TestServer::unused_addr(); let addr = TestServer::unused_addr();
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
@ -75,7 +75,7 @@ fn test_start() {
Server::build() Server::build()
.backlog(100) .backlog(100)
.disable_signals() .disable_signals()
.bind("test", addr, move || { .bind("test", addr, move |_| {
fn_service(|io: Io| async move { fn_service(|io: Io| async move {
io.send(Bytes::from_static(b"test"), &BytesCodec) io.send(Bytes::from_static(b"test"), &BytesCodec)
.await .await
@ -84,7 +84,7 @@ fn test_start() {
}) })
}) })
.unwrap() .unwrap()
.start() .run()
}); });
let _ = tx.send((srv, ntex::rt::System::current())); let _ = tx.send((srv, ntex::rt::System::current()));
@ -170,7 +170,7 @@ fn test_on_worker_start() {
Ready::Ok::<_, io::Error>(()) Ready::Ok::<_, io::Error>(())
}) })
.workers(1) .workers(1)
.start() .run()
}); });
let _ = tx.send((srv, ntex::rt::System::current())); let _ = tx.send((srv, ntex::rt::System::current()));
let _ = sys.run(); let _ = sys.run();
@ -203,7 +203,7 @@ fn test_panic_in_worker() {
Server::build() Server::build()
.workers(1) .workers(1)
.disable_signals() .disable_signals()
.bind("test", addr, move || { .bind("test", addr, move |_| {
let counter = counter.clone(); let counter = counter.clone();
fn_service(move |_| { fn_service(move |_| {
counter.fetch_add(1, Relaxed); counter.fetch_add(1, Relaxed);
@ -212,7 +212,7 @@ fn test_panic_in_worker() {
}) })
}) })
.unwrap() .unwrap()
.start() .run()
}); });
let _ = tx.send((srv.clone(), ntex::rt::System::current())); let _ = tx.send((srv.clone(), ntex::rt::System::current()));
sys.exec(move || ntex::rt::spawn(srv.map(|_| ()))); sys.exec(move || ntex::rt::spawn(srv.map(|_| ())));