Add ServerBuilder::configure_async() helper, async version of configure method

This commit is contained in:
Nikolay Kim 2022-09-20 07:54:51 +02:00
parent bd5d40e439
commit 8b7c3111c0
4 changed files with 93 additions and 2 deletions

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.5.26] - 2022-09-20
* server: Add ServerBuilder::configure_async() helper, async version of configure method
## [0.5.25] - 2022-08-22 ## [0.5.25] - 2022-08-22
* http: Fix http2 content-length handling * http: Fix http2 content-length handling

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex" name = "ntex"
version = "0.5.25" version = "0.5.26"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services" description = "Framework for composable network services"
readme = "README.md" readme = "README.md"
@ -57,7 +57,7 @@ ntex-macros = "0.1.3"
ntex-util = "0.1.17" ntex-util = "0.1.17"
ntex-bytes = "0.1.16" ntex-bytes = "0.1.16"
ntex-h2 = "0.1.4" ntex-h2 = "0.1.4"
ntex-rt = "0.4.4" ntex-rt = "0.4.6"
ntex-io = "0.1.8" ntex-io = "0.1.8"
ntex-tls = "0.1.5" ntex-tls = "0.1.5"
ntex-tokio = { version = "0.1.3", optional = true } ntex-tokio = { version = "0.1.3", optional = true }

View file

@ -168,6 +168,33 @@ impl ServerBuilder {
Ok(self) Ok(self)
} }
/// Execute external async configuration as part of the server building
/// process.
///
/// This function is useful for moving parts of configuration to a
/// different module or even library.
pub async fn configure_async<F, R>(mut self, f: F) -> io::Result<ServerBuilder>
where
F: Fn(&mut ServiceConfig) -> R,
R: Future<Output = io::Result<()>>,
{
let mut cfg = ServiceConfig::new(self.threads, self.backlog);
f(&mut cfg).await?;
let apply = cfg.apply;
let mut srv = ConfiguredService::new(apply);
for (name, lst) in cfg.services {
let token = self.token.next();
srv.stream(token, name.clone(), lst.local_addr()?);
self.sockets.push((token, name, Listener::from_tcp(lst)));
}
self.services.push(Box::new(srv));
self.threads = cfg.threads;
Ok(self)
}
/// Register async service configuration function. /// Register async service configuration function.
/// ///
/// This function get called during worker runtime configuration stage. /// This function get called during worker runtime configuration stage.

View file

@ -185,6 +185,66 @@ fn test_on_worker_start() {
let _ = h.join(); let _ = h.join();
} }
#[test]
#[cfg(feature = "tokio")]
fn test_configure_async() {
let addr1 = TestServer::unused_addr();
let addr2 = TestServer::unused_addr();
let addr3 = TestServer::unused_addr();
let (tx, rx) = mpsc::channel();
let num = Arc::new(AtomicUsize::new(0));
let num2 = num.clone();
let h = thread::spawn(move || {
let num = num2.clone();
let num2 = num2.clone();
let sys = ntex::rt::System::new("test");
sys.block_on(async move {
let srv = Server::build()
.disable_signals()
.configure_async(move |cfg| {
let num = num.clone();
let lst = net::TcpListener::bind(addr3).unwrap();
cfg.bind("addr1", addr1)
.unwrap()
.bind("addr2", addr2)
.unwrap()
.listen("addr3", lst)
.on_worker_start(move |rt| {
let num = num.clone();
async move {
rt.service("addr1", fn_service(|_| Ready::Ok::<_, ()>(())));
rt.service("addr3", fn_service(|_| Ready::Ok::<_, ()>(())));
let _ = num.fetch_add(1, Relaxed);
Ok::<_, io::Error>(())
}
})
.unwrap();
Ready::Ok::<_, io::Error>(())
})
.await
.unwrap()
.on_worker_start(move |_| {
let _ = num2.fetch_add(1, Relaxed);
Ready::Ok::<_, io::Error>(())
})
.workers(1)
.run();
let _ = tx.send((srv, ntex::rt::System::current()));
Ok::<_, io::Error>(())
})
});
let (_, sys) = rx.recv().unwrap();
thread::sleep(time::Duration::from_millis(500));
assert!(net::TcpStream::connect(addr1).is_ok());
assert!(net::TcpStream::connect(addr2).is_ok());
assert!(net::TcpStream::connect(addr3).is_ok());
assert_eq!(num.load(Relaxed), 2);
sys.stop();
let _ = h.join();
}
#[test] #[test]
#[cfg(feature = "tokio")] #[cfg(feature = "tokio")]
#[cfg(not(target_os = "macos"))] #[cfg(not(target_os = "macos"))]