drop direct tokio dependency

This commit is contained in:
Nikolay Kim 2021-06-25 21:54:04 +06:00
parent fa59cd2d1c
commit 5f2f65e403
6 changed files with 66 additions and 57 deletions

View file

@ -16,4 +16,4 @@ ntex-router = { path = "ntex-router" }
ntex-rt = { path = "ntex-rt" }
ntex-service = { path = "ntex-service" }
ntex-macros = { path = "ntex-macros" }
ntex-util = { path = "ntex-util" }
ntex-util = { path = "ntex-util" }

View file

@ -1,5 +1,9 @@
# Changes
## [0.3.19] - 2021-07-xx
* drop direct tokio dependency
## [0.3.18] - 2021-06-03
* server: expose server status change notifications

View file

@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.3.18"
version = "0.3.19"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"
@ -68,7 +68,9 @@ sha-1 = "0.9"
slab = "0.4"
serde = { version = "1.0", features=["derive"] }
socket2 = "0.4"
tokio = { version = "1", default-features = false, features = ["sync"] }
async-oneshot = "0.5.0"
async-channel = "1.6.1"
# http/web framework
h2 = { version = "0.3", optional = true }

View file

@ -1,10 +1,11 @@
use std::task::{Context, Poll};
use std::{future::Future, io, mem, net, pin::Pin, time::Duration};
use async_channel::{unbounded, Receiver};
use async_oneshot as oneshot;
use futures_core::Stream;
use log::{error, info};
use socket2::{Domain, SockAddr, Socket, Type};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio::sync::oneshot;
use crate::rt::{net::TcpStream, spawn, time::sleep, System};
use crate::util::join_all;
@ -31,7 +32,7 @@ pub struct ServerBuilder {
exit: bool,
shutdown_timeout: Duration,
no_signals: bool,
cmd: UnboundedReceiver<ServerCommand>,
cmd: Receiver<ServerCommand>,
server: Server,
notify: Vec<oneshot::Sender<()>>,
}
@ -45,7 +46,7 @@ impl Default for ServerBuilder {
impl ServerBuilder {
/// Create new Server builder instance
pub fn new() -> ServerBuilder {
let (tx, rx) = unbounded_channel();
let (tx, rx) = unbounded();
let server = Server::new(tx);
ServerBuilder {
@ -323,11 +324,11 @@ impl ServerBuilder {
fn handle_cmd(&mut self, item: ServerCommand) {
match item {
ServerCommand::Pause(tx) => {
ServerCommand::Pause(mut tx) => {
self.accept.send(Command::Pause);
let _ = tx.send(());
}
ServerCommand::Resume(tx) => {
ServerCommand::Resume(mut tx) => {
self.accept.send(Command::Resume);
let _ = tx.send(());
}
@ -386,10 +387,10 @@ impl ServerBuilder {
spawn(async move {
let _ = join_all(futs).await;
if let Some(tx) = completion {
if let Some(mut tx) = completion {
let _ = tx.send(());
}
for tx in notify {
for mut tx in notify {
let _ = tx.send(());
}
if exit {
@ -405,10 +406,10 @@ impl ServerBuilder {
System::current().stop();
});
}
if let Some(tx) = completion {
if let Some(mut tx) = completion {
let _ = tx.send(());
}
for tx in notify {
for mut tx in notify {
let _ = tx.send(());
}
}
@ -451,7 +452,7 @@ impl Future for ServerBuilder {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match Pin::new(&mut self.cmd).poll_recv(cx) {
match Pin::new(&mut self.cmd).poll_next(cx) {
Poll::Ready(Some(it)) => self.as_mut().get_mut().handle_cmd(it),
Poll::Ready(None) => return Poll::Pending,
Poll::Pending => return Poll::Pending,

View file

@ -4,8 +4,8 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use std::{future::Future, io, pin::Pin};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot;
use async_channel::Sender;
use async_oneshot as oneshot;
use crate::util::counter::Counter;
@ -98,13 +98,10 @@ enum ServerCommand {
/// Server controller
#[derive(Debug)]
pub struct Server(
UnboundedSender<ServerCommand>,
Option<oneshot::Receiver<()>>,
);
pub struct Server(Sender<ServerCommand>, Option<oneshot::Receiver<()>>);
impl Server {
fn new(tx: UnboundedSender<ServerCommand>) -> Self {
fn new(tx: Sender<ServerCommand>) -> Self {
Server(tx, None)
}
@ -114,11 +111,11 @@ impl Server {
}
fn signal(&self, sig: signals::Signal) {
let _ = self.0.send(ServerCommand::Signal(sig));
let _ = self.0.try_send(ServerCommand::Signal(sig));
}
fn worker_faulted(&self, idx: usize) {
let _ = self.0.send(ServerCommand::WorkerFaulted(idx));
let _ = self.0.try_send(ServerCommand::WorkerFaulted(idx));
}
/// Pause accepting incoming connections
@ -126,8 +123,8 @@ impl Server {
/// If socket contains some pending connection, they might be dropped.
/// All opened connection remains active.
pub fn pause(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.send(ServerCommand::Pause(tx));
let (tx, rx) = oneshot::oneshot();
let _ = self.0.try_send(ServerCommand::Pause(tx));
async move {
let _ = rx.await;
}
@ -135,8 +132,8 @@ impl Server {
/// Resume accepting incoming connections
pub fn resume(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.send(ServerCommand::Resume(tx));
let (tx, rx) = oneshot::oneshot();
let _ = self.0.try_send(ServerCommand::Resume(tx));
async move {
let _ = rx.await;
}
@ -146,8 +143,8 @@ impl Server {
///
/// If server starts with `spawn()` method, then spawned thread get terminated.
pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.send(ServerCommand::Stop {
let (tx, rx) = oneshot::oneshot();
let _ = self.0.try_send(ServerCommand::Stop {
graceful,
completion: Some(tx),
});
@ -170,8 +167,8 @@ impl Future for Server {
let this = self.get_mut();
if this.1.is_none() {
let (tx, rx) = oneshot::channel();
if this.0.send(ServerCommand::Notify(tx)).is_err() {
let (tx, rx) = oneshot::oneshot();
if this.0.try_send(ServerCommand::Notify(tx)).is_err() {
return Poll::Ready(Ok(()));
}
this.1 = Some(rx);

View file

@ -2,8 +2,9 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::task::{Context, Poll};
use std::{future::Future, pin::Pin, sync::Arc, time};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot;
use async_channel::{unbounded, Receiver, Sender};
use async_oneshot as oneshot;
use futures_core::Stream as FutStream;
use crate::rt::time::{sleep_until, Instant, Sleep};
use crate::rt::{spawn, Arbiter};
@ -55,16 +56,16 @@ thread_local! {
#[derive(Clone, Debug)]
pub(super) struct WorkerClient {
pub(super) idx: usize,
tx1: UnboundedSender<WorkerCommand>,
tx2: UnboundedSender<StopCommand>,
tx1: Sender<WorkerCommand>,
tx2: Sender<StopCommand>,
avail: WorkerAvailability,
}
impl WorkerClient {
pub(super) fn new(
idx: usize,
tx1: UnboundedSender<WorkerCommand>,
tx2: UnboundedSender<StopCommand>,
tx1: Sender<WorkerCommand>,
tx2: Sender<StopCommand>,
avail: WorkerAvailability,
) -> Self {
WorkerClient {
@ -76,7 +77,9 @@ impl WorkerClient {
}
pub(super) fn send(&self, msg: Connection) -> Result<(), Connection> {
self.tx1.send(WorkerCommand(msg)).map_err(|msg| msg.0 .0)
self.tx1
.try_send(WorkerCommand(msg))
.map_err(|msg| msg.into_inner().0)
}
pub(super) fn available(&self) -> bool {
@ -84,8 +87,8 @@ impl WorkerClient {
}
pub(super) fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
let (result, rx) = oneshot::channel();
let _ = self.tx2.send(StopCommand { graceful, result });
let (result, rx) = oneshot::oneshot();
let _ = self.tx2.try_send(StopCommand { graceful, result });
rx
}
}
@ -121,8 +124,8 @@ impl WorkerAvailability {
/// Worker accepts Socket objects via unbounded channel and starts stream
/// processing.
pub(super) struct Worker {
rx: UnboundedReceiver<WorkerCommand>,
rx2: UnboundedReceiver<StopCommand>,
rx: Receiver<WorkerCommand>,
rx2: Receiver<StopCommand>,
services: Vec<WorkerService>,
availability: WorkerAvailability,
conns: Counter,
@ -161,8 +164,8 @@ impl Worker {
availability: WorkerAvailability,
shutdown_timeout: time::Duration,
) -> WorkerClient {
let (tx1, rx1) = unbounded_channel();
let (tx2, rx2) = unbounded_channel();
let (tx1, rx1) = unbounded();
let (tx2, rx2) = unbounded();
let avail = availability.clone();
Arbiter::default().exec_fn(move || {
@ -185,8 +188,8 @@ impl Worker {
}
async fn create(
rx: UnboundedReceiver<WorkerCommand>,
rx2: UnboundedReceiver<StopCommand>,
rx: Receiver<WorkerCommand>,
rx2: Receiver<StopCommand>,
factories: Vec<Box<dyn InternalServiceFactory>>,
availability: WorkerAvailability,
shutdown_timeout: time::Duration,
@ -329,8 +332,10 @@ impl Future for Worker {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// `StopWorker` message handler
if let Poll::Ready(Some(StopCommand { graceful, result })) =
Pin::new(&mut self.rx2).poll_recv(cx)
if let Poll::Ready(Some(StopCommand {
graceful,
mut result,
})) = Pin::new(&mut self.rx2).poll_next(cx)
{
self.availability.set(false);
let num = num_connections();
@ -470,7 +475,7 @@ impl Future for Worker {
}
}
match Pin::new(&mut self.rx).poll_recv(cx) {
match Pin::new(&mut self.rx).poll_next(cx) {
// handle incoming io stream
Poll::Ready(Some(WorkerCommand(msg))) => {
let guard = self.conns.get();
@ -573,8 +578,8 @@ mod tests {
#[crate::rt_test]
#[allow(clippy::mutex_atomic)]
async fn basics() {
let (_tx1, rx1) = unbounded_channel();
let (tx2, rx2) = unbounded_channel();
let (_tx1, rx1) = unbounded();
let (tx2, rx2) = unbounded();
let (sync_tx, _sync_rx) = std::sync::mpsc::channel();
let poll = mio::Poll::new().unwrap();
let waker = Arc::new(mio::Waker::new(poll.registry(), mio::Token(1)).unwrap());
@ -639,8 +644,8 @@ mod tests {
// shutdown
let g = MAX_CONNS_COUNTER.with(|conns| conns.get());
let (tx, rx) = oneshot::channel();
tx2.send(StopCommand {
let (tx, rx) = oneshot::oneshot();
tx2.try_send(StopCommand {
graceful: true,
result: tx,
})
@ -653,8 +658,8 @@ mod tests {
let _ = rx.await;
// force shutdown
let (_tx1, rx1) = unbounded_channel();
let (tx2, rx2) = unbounded_channel();
let (_tx1, rx1) = unbounded();
let (tx2, rx2) = unbounded();
let avail = WorkerAvailability::new(AcceptNotify::new(waker, sync_tx.clone()));
let f = SrvFactory {
st: st.clone(),
@ -683,8 +688,8 @@ mod tests {
let _ = lazy(|cx| Pin::new(&mut worker).poll(cx)).await;
assert!(avail.available());
let (tx, rx) = oneshot::channel();
tx2.send(StopCommand {
let (tx, rx) = oneshot::oneshot();
tx2.try_send(StopCommand {
graceful: false,
result: tx,
})