clippy warnings

This commit is contained in:
Nikolay Kim 2020-04-02 12:47:29 +06:00
parent 4ba4fb6168
commit 022845af53
14 changed files with 139 additions and 163 deletions

View file

@ -165,7 +165,7 @@ impl SystemRunner {
let SystemRunner { mut rt, stop, .. } = self;
// run loop
let result = match rt.block_on(stop) {
match rt.block_on(stop) {
Ok(code) => {
if code != 0 {
Err(io::Error::new(
@ -177,8 +177,7 @@ impl SystemRunner {
}
}
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
};
result
}
}
/// Execute a future and wait for result.

View file

@ -90,7 +90,6 @@ impl Runtime {
where
F: Future,
{
let res = self.local.block_on(&mut self.rt, f);
res
self.local.block_on(&mut self.rt, f)
}
}

View file

@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.1.1"
version = "0.1.2"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Framework for composable network services"
readme = "README.md"

View file

@ -37,9 +37,9 @@ impl Address for &'static str {
/// Connect request
#[derive(Eq, PartialEq, Debug, Hash)]
pub struct Connect<T> {
pub(crate) req: T,
pub(crate) port: u16,
pub(crate) addr: Option<Either<SocketAddr, VecDeque<SocketAddr>>>,
pub(super) req: T,
pub(super) port: u16,
pub(super) addr: Option<Either<SocketAddr, VecDeque<SocketAddr>>>,
}
impl<T: Address> Connect<T> {

View file

@ -5,8 +5,8 @@
//! * `openssl` - enables ssl support via `openssl` crate
//! * `rustls` - enables ssl support via `rustls` crate
mod connect;
mod error;
mod message;
mod resolve;
mod service;
mod uri;
@ -23,8 +23,8 @@ use trust_dns_resolver::system_conf::read_system_conf;
use crate::rt::Arbiter;
pub use self::connect::{Address, Connect};
pub use self::error::ConnectError;
pub use self::message::{Address, Connect};
pub use self::resolve::{AsyncResolver, Resolver};
pub use self::service::Connector;

View file

@ -23,9 +23,7 @@ use crate::channel::condition::{Condition, Waiter};
use crate::rt::net::{self, TcpStream};
use crate::service::{Service, ServiceFactory};
use super::connect::{Address, Connect};
use super::default_resolver;
use super::error::ConnectError;
use super::{default_resolver, Address, Connect, ConnectError};
/// DNS Resolver Service
pub struct Resolver<T> {
@ -184,7 +182,9 @@ impl AsyncResolver {
/// * `options` - basic lookup options for the resolver
pub fn new(config: ResolverConfig, options: ResolverOpts) -> Self {
AsyncResolver {
state: Rc::new(RefCell::new(AsyncResolverState::New(config, options))),
state: Rc::new(RefCell::new(AsyncResolverState::New(Some(
TAsyncResolver::new(config, options, Handle).boxed_local(),
)))),
}
}
@ -193,7 +193,9 @@ impl AsyncResolver {
/// This will use `/etc/resolv.conf` on Unix OSes and the registry on Windows.
pub fn from_system_conf() -> Self {
AsyncResolver {
state: Rc::new(RefCell::new(AsyncResolverState::NewFromSystem)),
state: Rc::new(RefCell::new(AsyncResolverState::New(Some(
TokioAsyncResolver::from_system_conf(Handle).boxed_local(),
)))),
}
}
@ -210,10 +212,9 @@ type TokioAsyncResolver =
TAsyncResolver<GenericConnection, GenericConnectionProvider<TokioRuntime>>;
enum AsyncResolverState {
New(ResolverConfig, ResolverOpts),
NewFromSystem,
New(Option<LocalBoxFuture<'static, Result<TokioAsyncResolver, ResolveError>>>),
Creating(Condition),
Resolver(TokioAsyncResolver),
Resolver(Box<TokioAsyncResolver>),
}
pub struct LookupIpFuture {
@ -241,7 +242,8 @@ impl Future for LookupIpFuture {
LookupIpState::Create(ref mut fut) => {
let resolver = ready!(Pin::new(fut).poll(cx))?;
this.fut = LookupIpState::Init;
*this.state.borrow_mut() = AsyncResolverState::Resolver(resolver);
*this.state.borrow_mut() =
AsyncResolverState::Resolver(Box::new(resolver));
}
LookupIpState::Wait(ref mut waiter) => {
ready!(waiter.poll_waiter(cx));
@ -250,22 +252,8 @@ impl Future for LookupIpFuture {
LookupIpState::Init => {
let mut state = this.state.borrow_mut();
match &mut *state {
AsyncResolverState::New(config, options) => {
this.fut = LookupIpState::Create(
TAsyncResolver::new(
config.clone(),
options.clone(),
Handle,
)
.boxed_local(),
);
*state = AsyncResolverState::Creating(Condition::default());
}
AsyncResolverState::NewFromSystem => {
this.fut = LookupIpState::Create(
TokioAsyncResolver::from_system_conf(Handle)
.boxed_local(),
);
AsyncResolverState::New(ref mut fut) => {
this.fut = LookupIpState::Create(fut.take().unwrap());
*state = AsyncResolverState::Creating(Condition::default());
}
AsyncResolverState::Creating(ref cond) => {
@ -361,7 +349,7 @@ impl trust_dns_proto::tcp::Connect for AsyncIo02As03<TcpStream> {
#[async_trait::async_trait]
impl trust_dns_proto::udp::UdpSocket for UdpSocket {
async fn bind(addr: &SocketAddr) -> io::Result<Self> {
net::UdpSocket::bind(addr).await.map(|sock| UdpSocket(sock))
net::UdpSocket::bind(addr).await.map(UdpSocket)
}
async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {

View file

@ -11,9 +11,7 @@ use futures::future::{self, err, ok, FutureExt, LocalBoxFuture, Ready};
use crate::rt::net::TcpStream;
use crate::service::{Service, ServiceFactory};
use super::connect::{Address, Connect};
use super::error::ConnectError;
use super::resolve::{AsyncResolver, Resolver};
use super::{Address, AsyncResolver, Connect, ConnectError, Resolver};
pub struct Connector<T> {
resolver: Resolver<T>,

View file

@ -8,10 +8,9 @@ use slab::Slab;
use crate::rt::time::{delay_until, Instant};
use crate::rt::System;
use super::server::Server;
use super::socket::{SocketAddr, SocketListener, StdListener};
use super::worker::{Conn, WorkerClient};
use super::Token;
use super::{Server, Token};
pub(super) enum Command {
Pause,

View file

@ -18,12 +18,11 @@ use crate::rt::{spawn, System};
use super::accept::{AcceptLoop, AcceptNotify, Command};
use super::config::{ConfiguredService, ServiceConfig};
use super::server::{Server, ServerCommand};
use super::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
use super::signals::{Signal, Signals};
use super::socket::StdListener;
use super::worker::{self, Worker, WorkerAvailability, WorkerClient};
use super::Token;
use super::{Server, ServerCommand, Token};
/// Server builder
pub struct ServerBuilder {
@ -226,7 +225,7 @@ impl ServerBuilder {
self.services.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory.clone(),
factory,
addr,
));
self.sockets

View file

@ -96,7 +96,7 @@ impl ConfiguredService {
pub(super) fn stream(&mut self, token: Token, name: String, addr: net::SocketAddr) {
self.names.insert(token, (name.clone(), addr));
self.topics.insert(name.clone(), token);
self.topics.insert(name, token);
self.services.push(token);
}
}

View file

@ -1,13 +1,20 @@
//! General purpose tcp server
#![allow(clippy::type_complexity)]
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use futures::channel::mpsc::UnboundedSender;
use futures::channel::oneshot;
use futures::FutureExt;
use crate::util::counter::Counter;
mod accept;
mod builder;
mod config;
mod server;
mod service;
mod signals;
mod socket;
@ -22,7 +29,6 @@ pub mod rustls;
pub use self::builder::ServerBuilder;
pub use self::config::{ServiceConfig, ServiceRuntime};
pub use self::server::Server;
pub use self::service::ServiceFactory;
pub use self::test::{build_test_server, test_server, TestServer};
@ -68,3 +74,101 @@ pub enum SslError<E1, E2> {
Ssl(E1),
Service(E2),
}
#[derive(Debug)]
enum ServerCommand {
WorkerFaulted(usize),
Pause(oneshot::Sender<()>),
Resume(oneshot::Sender<()>),
Signal(signals::Signal),
/// Whether to try and shut down gracefully
Stop {
graceful: bool,
completion: Option<oneshot::Sender<()>>,
},
/// Notify of server stop
Notify(oneshot::Sender<()>),
}
/// Server controller
#[derive(Debug)]
pub struct Server(
UnboundedSender<ServerCommand>,
Option<oneshot::Receiver<()>>,
);
impl Server {
fn new(tx: UnboundedSender<ServerCommand>) -> Self {
Server(tx, None)
}
/// Start server building process
pub fn build() -> ServerBuilder {
ServerBuilder::default()
}
fn signal(&self, sig: signals::Signal) {
let _ = self.0.unbounded_send(ServerCommand::Signal(sig));
}
fn worker_faulted(&self, idx: usize) {
let _ = self.0.unbounded_send(ServerCommand::WorkerFaulted(idx));
}
/// Pause accepting incoming connections
///
/// If socket contains some pending connection, they might be dropped.
/// All opened connection remains active.
pub fn pause(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.unbounded_send(ServerCommand::Pause(tx));
rx.map(|_| ())
}
/// Resume accepting incoming connections
pub fn resume(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.unbounded_send(ServerCommand::Resume(tx));
rx.map(|_| ())
}
/// Stop incoming connection processing, stop all workers and exit.
///
/// If server starts with `spawn()` method, then spawned thread get terminated.
pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.unbounded_send(ServerCommand::Stop {
graceful,
completion: Some(tx),
});
rx.map(|_| ())
}
}
impl Clone for Server {
fn clone(&self) -> Self {
Self(self.0.clone(), None)
}
}
impl Future for Server {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if this.1.is_none() {
let (tx, rx) = oneshot::channel();
if this.0.unbounded_send(ServerCommand::Notify(tx)).is_err() {
return Poll::Ready(Ok(()));
}
this.1 = Some(rx);
}
match Pin::new(this.1.as_mut().unwrap()).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
Poll::Ready(Err(_)) => Poll::Ready(Ok(())),
}
}
}

View file

@ -1,108 +0,0 @@
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::channel::mpsc::UnboundedSender;
use futures::channel::oneshot;
use futures::FutureExt;
use super::builder::ServerBuilder;
use super::signals::Signal;
#[derive(Debug)]
pub(crate) enum ServerCommand {
WorkerFaulted(usize),
Pause(oneshot::Sender<()>),
Resume(oneshot::Sender<()>),
Signal(Signal),
/// Whether to try and shut down gracefully
Stop {
graceful: bool,
completion: Option<oneshot::Sender<()>>,
},
/// Notify of server stop
Notify(oneshot::Sender<()>),
}
#[derive(Debug)]
pub struct Server(
UnboundedSender<ServerCommand>,
Option<oneshot::Receiver<()>>,
);
impl Server {
pub(crate) fn new(tx: UnboundedSender<ServerCommand>) -> Self {
Server(tx, None)
}
/// Start server building process
pub fn build() -> ServerBuilder {
ServerBuilder::default()
}
pub(crate) fn signal(&self, sig: Signal) {
let _ = self.0.unbounded_send(ServerCommand::Signal(sig));
}
pub(crate) fn worker_faulted(&self, idx: usize) {
let _ = self.0.unbounded_send(ServerCommand::WorkerFaulted(idx));
}
/// Pause accepting incoming connections
///
/// If socket contains some pending connection, they might be dropped.
/// All opened connection remains active.
pub fn pause(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.unbounded_send(ServerCommand::Pause(tx));
rx.map(|_| ())
}
/// Resume accepting incoming connections
pub fn resume(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.unbounded_send(ServerCommand::Resume(tx));
rx.map(|_| ())
}
/// Stop incoming connection processing, stop all workers and exit.
///
/// If server starts with `spawn()` method, then spawned thread get terminated.
pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.unbounded_send(ServerCommand::Stop {
graceful,
completion: Some(tx),
});
rx.map(|_| ())
}
}
impl Clone for Server {
fn clone(&self) -> Self {
Self(self.0.clone(), None)
}
}
impl Future for Server {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if this.1.is_none() {
let (tx, rx) = oneshot::channel();
if this.0.unbounded_send(ServerCommand::Notify(tx)).is_err() {
return Poll::Ready(Ok(()));
}
this.1 = Some(rx);
}
match Pin::new(this.1.as_mut().unwrap()).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
Poll::Ready(Err(_)) => Poll::Ready(Ok(())),
}
}
}

View file

@ -371,11 +371,8 @@ impl Future for Worker {
Ok(false) => {
// push connection back to queue
if let Some(conn) = conn {
match self.state {
WorkerState::Unavailable(ref mut conns) => {
conns.push(conn);
}
_ => (),
if let WorkerState::Unavailable(ref mut conns) = self.state {
conns.push(conn);
}
}
Poll::Pending
@ -398,7 +395,8 @@ impl Future for Worker {
WorkerState::Restarting(idx, token, ref mut fut) => {
match Pin::new(fut).poll(cx) {
Poll::Ready(Ok(item)) => {
for (token, service) in item {
// TODO: deal with multiple services
if let Some((token, service)) = item.into_iter().next() {
trace!(
"Service {:?} has been restarted",
self.factories[idx].name(token)

View file

@ -149,7 +149,7 @@ impl<T: Serialize, Err: ErrorRenderer> Responder<Err> for Form<T> {
fn respond_to(self, _: &HttpRequest) -> Self::Future {
let body = match serde_urlencoded::to_string(&self.0) {
Ok(body) => body,
Err(e) => return err(e.into()),
Err(e) => return err(e),
};
ok(Response::build(StatusCode::OK)