Rename .apply/.apply_async to .on_worker_start()

This commit is contained in:
Nikolay Kim 2021-12-06 23:30:08 +06:00
parent 17631cd8a1
commit 42296e9239
5 changed files with 141 additions and 25 deletions

View file

@ -1,5 +1,9 @@
# Changes
## [0.4.13] - 2021-12-07
* server: Rename .apply/.apply_async to .on_worker_start()
## [0.4.12] - 2021-12-06
* http: Use memory pools

View file

@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.4.12"
version = "0.4.13"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"

View file

@ -1,4 +1,6 @@
use std::{future::Future, io, mem, net, pin::Pin, task::Context, task::Poll};
use std::{
fmt, future::Future, io, marker, mem, net, pin::Pin, task::Context, task::Poll,
};
use async_channel::{unbounded, Receiver};
use async_oneshot as oneshot;
@ -10,7 +12,7 @@ use crate::rt::{net::TcpStream, spawn, System};
use crate::{time::sleep, time::Millis, util::join_all};
use super::accept::{AcceptLoop, AcceptNotify, Command};
use super::config::{ConfiguredService, ServiceConfig};
use super::config::{ConfigWrapper, ConfiguredService, ServiceConfig, ServiceRuntime};
use super::service::{Factory, InternalServiceFactory, StreamServiceFactory};
use super::signals::{Signal, Signals};
use super::socket::Listener;
@ -152,20 +154,37 @@ impl ServerBuilder {
f(&mut cfg)?;
if let Some(apply) = cfg.apply {
let mut srv = ConfiguredService::new(apply);
for (name, lst) in cfg.services {
let token = self.token.next();
srv.stream(token, name.clone(), lst.local_addr()?);
self.sockets.push((token, name, Listener::from_tcp(lst)));
}
self.services.push(Box::new(srv));
let apply = cfg.apply;
let mut srv = ConfiguredService::new(apply);
for (name, lst) in cfg.services {
let token = self.token.next();
srv.stream(token, name.clone(), lst.local_addr()?);
self.sockets.push((token, name, Listener::from_tcp(lst)));
}
self.services.push(Box::new(srv));
self.threads = cfg.threads;
Ok(self)
}
/// Register async service configuration function.
///
/// This function get called during worker runtime configuration stage.
/// It get executed in the worker thread.
pub fn on_worker_start<F, R, E>(mut self, f: F) -> Self
where
F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
R: Future<Output = Result<(), E>> + 'static,
E: fmt::Display + 'static,
{
self.services
.push(Box::new(ConfiguredService::new(Box::new(ConfigWrapper {
f,
_t: marker::PhantomData,
}))));
self
}
/// Add new service to the server.
pub fn bind<F, U, N: AsRef<str>>(
mut self,

View file

@ -17,9 +17,10 @@ use super::Token;
pub struct ServiceConfig {
pub(super) services: Vec<(String, net::TcpListener)>,
pub(super) apply: Option<Box<dyn ServiceRuntimeConfiguration + Send>>,
pub(super) apply: Box<dyn ServiceRuntimeConfiguration + Send>,
pub(super) threads: usize,
pub(super) backlog: i32,
applied: bool,
}
impl ServiceConfig {
@ -28,7 +29,14 @@ impl ServiceConfig {
threads,
backlog,
services: Vec::new(),
apply: None,
applied: false,
apply: Box::new(ConfigWrapper {
f: |_| {
not_configured();
Ready::Ok::<_, &'static str>(())
},
_t: PhantomData,
}),
}
}
@ -52,13 +60,21 @@ impl ServiceConfig {
name: N,
lst: net::TcpListener,
) -> &mut Self {
if self.apply.is_none() {
let _ = self.apply(not_configured);
if !self.applied {
self.apply = Box::new(ConfigWrapper {
f: |_| {
not_configured();
Ready::Ok::<_, &'static str>(())
},
_t: PhantomData,
});
}
self.services.push((name.as_ref().to_string(), lst));
self
}
#[doc(hidden)]
#[deprecated(since = "0.4.13", note = "Use .on_worker_start() instead")]
/// Register service configuration function.
///
/// This function get called during worker runtime configuration.
@ -67,12 +83,16 @@ impl ServiceConfig {
where
F: Fn(&mut ServiceRuntime) + Send + Clone + 'static,
{
self.apply_async::<_, Ready<(), &'static str>, &'static str>(move |mut rt| {
f(&mut rt);
Ready::Ok(())
})
self.on_worker_start::<_, Ready<(), &'static str>, &'static str>(
move |mut rt| {
f(&mut rt);
Ready::Ok(())
},
)
}
#[doc(hidden)]
#[deprecated(since = "0.4.13", note = "Use .on_worker_start() instead")]
/// Register async service configuration function.
///
/// This function get called during worker runtime configuration.
@ -83,7 +103,22 @@ impl ServiceConfig {
R: Future<Output = Result<(), E>> + 'static,
E: fmt::Display + 'static,
{
self.apply = Some(Box::new(ConfigWrapper { f, _t: PhantomData }));
self.on_worker_start(f)?;
Ok(())
}
/// Register async service configuration function.
///
/// This function get called during worker runtime configuration stage.
/// It get executed in the worker thread.
pub fn on_worker_start<F, R, E>(&mut self, f: F) -> io::Result<()>
where
F: Fn(ServiceRuntime) -> R + Send + Clone + 'static,
R: Future<Output = Result<(), E>> + 'static,
E: fmt::Display + 'static,
{
self.applied = true;
self.apply = Box::new(ConfigWrapper { f, _t: PhantomData });
Ok(())
}
}
@ -186,9 +221,9 @@ pub(super) trait ServiceRuntimeConfiguration {
) -> Pin<Box<dyn Future<Output = Result<(), ()>>>>;
}
struct ConfigWrapper<F, R, E> {
f: F,
_t: PhantomData<(R, E)>,
pub(super) struct ConfigWrapper<F, R, E> {
pub(super) f: F,
pub(super) _t: PhantomData<(R, E)>,
}
// SAFETY: we dont store R or E in ConfigWrapper
@ -220,7 +255,7 @@ where
}
}
fn not_configured(_: &mut ServiceRuntime) {
fn not_configured() {
error!("Service is not configured");
}

View file

@ -9,7 +9,7 @@ use ntex::codec::{BytesCodec, Framed};
use ntex::rt::net::TcpStream;
use ntex::server::{Server, TestServer};
use ntex::service::fn_service;
use ntex::util::Bytes;
use ntex::util::{Bytes, Ready};
#[test]
fn test_bind() {
@ -231,6 +231,64 @@ fn test_configure_async() {
let _ = h.join();
}
#[test]
fn test_on_worker_start() {
let addr1 = TestServer::unused_addr();
let addr2 = TestServer::unused_addr();
let addr3 = TestServer::unused_addr();
let (tx, rx) = mpsc::channel();
let num = Arc::new(AtomicUsize::new(0));
let num2 = num.clone();
let h = thread::spawn(move || {
let num = num2.clone();
let num2 = num2.clone();
let mut sys = ntex::rt::System::new("test");
let srv = sys.exec(|| {
Server::build()
.disable_signals()
.configure(move |cfg| {
let num = num.clone();
let lst = net::TcpListener::bind(addr3).unwrap();
cfg.bind("addr1", addr1)
.unwrap()
.bind("addr2", addr2)
.unwrap()
.listen("addr3", lst)
.apply_async(move |rt| {
let num = num.clone();
async move {
rt.service("addr1", fn_service(|_| ok::<_, ()>(())));
rt.service("addr3", fn_service(|_| ok::<_, ()>(())));
let _ = num.fetch_add(1, Relaxed);
Ok::<_, io::Error>(())
}
})
.unwrap();
Ok::<_, io::Error>(())
})
.unwrap()
.on_worker_start(move |_| {
let _ = num2.fetch_add(1, Relaxed);
Ready::Ok::<_, io::Error>(())
})
.workers(1)
.start()
});
let _ = tx.send((srv, ntex::rt::System::current()));
let _ = sys.run();
});
let (_, sys) = rx.recv().unwrap();
thread::sleep(time::Duration::from_millis(500));
assert!(net::TcpStream::connect(addr1).is_ok());
assert!(net::TcpStream::connect(addr2).is_ok());
assert!(net::TcpStream::connect(addr3).is_ok());
assert_eq!(num.load(Relaxed), 2);
sys.stop();
let _ = h.join();
}
#[test]
#[allow(unreachable_code)]
fn test_panic_in_worker() {