diff --git a/Cargo.toml b/Cargo.toml index 6aa86081..333c2295 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,4 +16,4 @@ ntex-router = { path = "ntex-router" } ntex-rt = { path = "ntex-rt" } ntex-service = { path = "ntex-service" } ntex-macros = { path = "ntex-macros" } -ntex-util = { path = "ntex-util" } \ No newline at end of file +ntex-util = { path = "ntex-util" } diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index 4a7e96ae..9572a1c4 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.3.19] - 2021-07-xx + +* drop direct tokio dependency + ## [0.3.18] - 2021-06-03 * server: expose server status change notifications diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 2f379e5c..61c0bae4 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "0.3.18" +version = "0.3.19" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" @@ -68,7 +68,9 @@ sha-1 = "0.9" slab = "0.4" serde = { version = "1.0", features=["derive"] } socket2 = "0.4" -tokio = { version = "1", default-features = false, features = ["sync"] } + +async-oneshot = "0.5.0" +async-channel = "1.6.1" # http/web framework h2 = { version = "0.3", optional = true } diff --git a/ntex/src/server/builder.rs b/ntex/src/server/builder.rs index ef6e97db..acbaacc0 100644 --- a/ntex/src/server/builder.rs +++ b/ntex/src/server/builder.rs @@ -1,10 +1,11 @@ use std::task::{Context, Poll}; use std::{future::Future, io, mem, net, pin::Pin, time::Duration}; +use async_channel::{unbounded, Receiver}; +use async_oneshot as oneshot; +use futures_core::Stream; use log::{error, info}; use socket2::{Domain, SockAddr, Socket, Type}; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; -use tokio::sync::oneshot; use crate::rt::{net::TcpStream, spawn, time::sleep, System}; use crate::util::join_all; @@ -31,7 +32,7 @@ pub struct ServerBuilder { exit: bool, shutdown_timeout: Duration, no_signals: bool, - cmd: UnboundedReceiver, + cmd: Receiver, server: Server, notify: Vec>, } @@ -45,7 +46,7 @@ impl Default for ServerBuilder { impl ServerBuilder { /// Create new Server builder instance pub fn new() -> ServerBuilder { - let (tx, rx) = unbounded_channel(); + let (tx, rx) = unbounded(); let server = Server::new(tx); ServerBuilder { @@ -323,11 +324,11 @@ impl ServerBuilder { fn handle_cmd(&mut self, item: ServerCommand) { match item { - ServerCommand::Pause(tx) => { + ServerCommand::Pause(mut tx) => { self.accept.send(Command::Pause); let _ = tx.send(()); } - ServerCommand::Resume(tx) => { + ServerCommand::Resume(mut tx) => { self.accept.send(Command::Resume); let _ = tx.send(()); } @@ -386,10 +387,10 @@ impl ServerBuilder { spawn(async move { let _ = join_all(futs).await; - if let Some(tx) = completion { + if let Some(mut tx) = completion { let _ = tx.send(()); } - for tx in notify { + for mut tx in notify { let _ = tx.send(()); } if exit { @@ -405,10 +406,10 @@ impl ServerBuilder { System::current().stop(); }); } - if let Some(tx) = completion { + if let Some(mut tx) = completion { let _ = tx.send(()); } - for tx in notify { + for mut tx in notify { let _ = tx.send(()); } } @@ -451,7 +452,7 @@ impl Future for ServerBuilder { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match Pin::new(&mut self.cmd).poll_recv(cx) { + match Pin::new(&mut self.cmd).poll_next(cx) { Poll::Ready(Some(it)) => self.as_mut().get_mut().handle_cmd(it), Poll::Ready(None) => return Poll::Pending, Poll::Pending => return Poll::Pending, diff --git a/ntex/src/server/mod.rs b/ntex/src/server/mod.rs index a605bb4b..70bb1db2 100644 --- a/ntex/src/server/mod.rs +++ b/ntex/src/server/mod.rs @@ -4,8 +4,8 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Context, Poll}; use std::{future::Future, io, pin::Pin}; -use tokio::sync::mpsc::UnboundedSender; -use tokio::sync::oneshot; +use async_channel::Sender; +use async_oneshot as oneshot; use crate::util::counter::Counter; @@ -98,13 +98,10 @@ enum ServerCommand { /// Server controller #[derive(Debug)] -pub struct Server( - UnboundedSender, - Option>, -); +pub struct Server(Sender, Option>); impl Server { - fn new(tx: UnboundedSender) -> Self { + fn new(tx: Sender) -> Self { Server(tx, None) } @@ -114,11 +111,11 @@ impl Server { } fn signal(&self, sig: signals::Signal) { - let _ = self.0.send(ServerCommand::Signal(sig)); + let _ = self.0.try_send(ServerCommand::Signal(sig)); } fn worker_faulted(&self, idx: usize) { - let _ = self.0.send(ServerCommand::WorkerFaulted(idx)); + let _ = self.0.try_send(ServerCommand::WorkerFaulted(idx)); } /// Pause accepting incoming connections @@ -126,8 +123,8 @@ impl Server { /// If socket contains some pending connection, they might be dropped. /// All opened connection remains active. pub fn pause(&self) -> impl Future { - let (tx, rx) = oneshot::channel(); - let _ = self.0.send(ServerCommand::Pause(tx)); + let (tx, rx) = oneshot::oneshot(); + let _ = self.0.try_send(ServerCommand::Pause(tx)); async move { let _ = rx.await; } @@ -135,8 +132,8 @@ impl Server { /// Resume accepting incoming connections pub fn resume(&self) -> impl Future { - let (tx, rx) = oneshot::channel(); - let _ = self.0.send(ServerCommand::Resume(tx)); + let (tx, rx) = oneshot::oneshot(); + let _ = self.0.try_send(ServerCommand::Resume(tx)); async move { let _ = rx.await; } @@ -146,8 +143,8 @@ impl Server { /// /// If server starts with `spawn()` method, then spawned thread get terminated. pub fn stop(&self, graceful: bool) -> impl Future { - let (tx, rx) = oneshot::channel(); - let _ = self.0.send(ServerCommand::Stop { + let (tx, rx) = oneshot::oneshot(); + let _ = self.0.try_send(ServerCommand::Stop { graceful, completion: Some(tx), }); @@ -170,8 +167,8 @@ impl Future for Server { let this = self.get_mut(); if this.1.is_none() { - let (tx, rx) = oneshot::channel(); - if this.0.send(ServerCommand::Notify(tx)).is_err() { + let (tx, rx) = oneshot::oneshot(); + if this.0.try_send(ServerCommand::Notify(tx)).is_err() { return Poll::Ready(Ok(())); } this.1 = Some(rx); diff --git a/ntex/src/server/worker.rs b/ntex/src/server/worker.rs index 0826d348..899c9b8f 100644 --- a/ntex/src/server/worker.rs +++ b/ntex/src/server/worker.rs @@ -2,8 +2,9 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::task::{Context, Poll}; use std::{future::Future, pin::Pin, sync::Arc, time}; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use tokio::sync::oneshot; +use async_channel::{unbounded, Receiver, Sender}; +use async_oneshot as oneshot; +use futures_core::Stream as FutStream; use crate::rt::time::{sleep_until, Instant, Sleep}; use crate::rt::{spawn, Arbiter}; @@ -55,16 +56,16 @@ thread_local! { #[derive(Clone, Debug)] pub(super) struct WorkerClient { pub(super) idx: usize, - tx1: UnboundedSender, - tx2: UnboundedSender, + tx1: Sender, + tx2: Sender, avail: WorkerAvailability, } impl WorkerClient { pub(super) fn new( idx: usize, - tx1: UnboundedSender, - tx2: UnboundedSender, + tx1: Sender, + tx2: Sender, avail: WorkerAvailability, ) -> Self { WorkerClient { @@ -76,7 +77,9 @@ impl WorkerClient { } pub(super) fn send(&self, msg: Connection) -> Result<(), Connection> { - self.tx1.send(WorkerCommand(msg)).map_err(|msg| msg.0 .0) + self.tx1 + .try_send(WorkerCommand(msg)) + .map_err(|msg| msg.into_inner().0) } pub(super) fn available(&self) -> bool { @@ -84,8 +87,8 @@ impl WorkerClient { } pub(super) fn stop(&self, graceful: bool) -> oneshot::Receiver { - let (result, rx) = oneshot::channel(); - let _ = self.tx2.send(StopCommand { graceful, result }); + let (result, rx) = oneshot::oneshot(); + let _ = self.tx2.try_send(StopCommand { graceful, result }); rx } } @@ -121,8 +124,8 @@ impl WorkerAvailability { /// Worker accepts Socket objects via unbounded channel and starts stream /// processing. pub(super) struct Worker { - rx: UnboundedReceiver, - rx2: UnboundedReceiver, + rx: Receiver, + rx2: Receiver, services: Vec, availability: WorkerAvailability, conns: Counter, @@ -161,8 +164,8 @@ impl Worker { availability: WorkerAvailability, shutdown_timeout: time::Duration, ) -> WorkerClient { - let (tx1, rx1) = unbounded_channel(); - let (tx2, rx2) = unbounded_channel(); + let (tx1, rx1) = unbounded(); + let (tx2, rx2) = unbounded(); let avail = availability.clone(); Arbiter::default().exec_fn(move || { @@ -185,8 +188,8 @@ impl Worker { } async fn create( - rx: UnboundedReceiver, - rx2: UnboundedReceiver, + rx: Receiver, + rx2: Receiver, factories: Vec>, availability: WorkerAvailability, shutdown_timeout: time::Duration, @@ -329,8 +332,10 @@ impl Future for Worker { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // `StopWorker` message handler - if let Poll::Ready(Some(StopCommand { graceful, result })) = - Pin::new(&mut self.rx2).poll_recv(cx) + if let Poll::Ready(Some(StopCommand { + graceful, + mut result, + })) = Pin::new(&mut self.rx2).poll_next(cx) { self.availability.set(false); let num = num_connections(); @@ -470,7 +475,7 @@ impl Future for Worker { } } - match Pin::new(&mut self.rx).poll_recv(cx) { + match Pin::new(&mut self.rx).poll_next(cx) { // handle incoming io stream Poll::Ready(Some(WorkerCommand(msg))) => { let guard = self.conns.get(); @@ -573,8 +578,8 @@ mod tests { #[crate::rt_test] #[allow(clippy::mutex_atomic)] async fn basics() { - let (_tx1, rx1) = unbounded_channel(); - let (tx2, rx2) = unbounded_channel(); + let (_tx1, rx1) = unbounded(); + let (tx2, rx2) = unbounded(); let (sync_tx, _sync_rx) = std::sync::mpsc::channel(); let poll = mio::Poll::new().unwrap(); let waker = Arc::new(mio::Waker::new(poll.registry(), mio::Token(1)).unwrap()); @@ -639,8 +644,8 @@ mod tests { // shutdown let g = MAX_CONNS_COUNTER.with(|conns| conns.get()); - let (tx, rx) = oneshot::channel(); - tx2.send(StopCommand { + let (tx, rx) = oneshot::oneshot(); + tx2.try_send(StopCommand { graceful: true, result: tx, }) @@ -653,8 +658,8 @@ mod tests { let _ = rx.await; // force shutdown - let (_tx1, rx1) = unbounded_channel(); - let (tx2, rx2) = unbounded_channel(); + let (_tx1, rx1) = unbounded(); + let (tx2, rx2) = unbounded(); let avail = WorkerAvailability::new(AcceptNotify::new(waker, sync_tx.clone())); let f = SrvFactory { st: st.clone(), @@ -683,8 +688,8 @@ mod tests { let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await; assert!(avail.available()); - let (tx, rx) = oneshot::channel(); - tx2.send(StopCommand { + let (tx, rx) = oneshot::oneshot(); + tx2.try_send(StopCommand { graceful: false, result: tx, })