do not box openssl accept future

This commit is contained in:
Nikolay Kim 2021-02-28 19:20:09 +06:00
parent 78dd4e5fbd
commit eb2a956362
3 changed files with 26 additions and 31 deletions

View file

@ -293,10 +293,6 @@ impl State {
self.0.read_task.register(waker);
}
pub(super) fn register_write_task(&self, waker: &Waker) {
self.0.write_task.register(waker);
}
pub(super) fn update_read_task(&self, result: ReadResult, waker: &Waker) {
match result {
ReadResult::Updated => {
@ -313,7 +309,7 @@ impl State {
self.0.read_task.register(waker);
}
pub(super) fn update_write_task(&self, ready: bool) {
pub(super) fn update_write_task(&self, ready: bool, waker: &Waker) {
if ready {
let mut flags = self.0.flags.get();
if flags.contains(Flags::WR_BACKPRESSURE) {
@ -324,6 +320,7 @@ impl State {
} else {
self.insert_flags(Flags::WR_BACKPRESSURE);
}
self.0.write_task.register(waker);
}
#[inline]
@ -600,7 +597,7 @@ impl State {
// encode item
if let Err(err) = codec.encode(item, &mut write_buf) {
log::trace!("Codec encoder error: {:?}", err);
log::trace!("Encoder error: {:?}", err);
self.insert_flags(Flags::DSP_STOP | Flags::ST_DSP_ERR);
self.0.dispatch_task.wake();
return Err(Either::Right(err));

View file

@ -105,16 +105,15 @@ where
match result {
Poll::Ready(Ok(_)) | Poll::Pending => {
this.state.update_write_task(len < HW)
this.state.update_write_task(len < HW, cx.waker());
Poll::Pending
}
Poll::Ready(Err(err)) => {
log::trace!("error during sending data: {:?}", err);
this.state.set_io_error(Some(err));
return Poll::Ready(());
Poll::Ready(())
}
}
this.state.register_write_task(cx.waker());
Poll::Pending
}
IoWriteState::Shutdown(ref mut delay, ref mut st) => {
// close WRITE side and wait for disconnect on read side.

View file

@ -1,15 +1,10 @@
use std::error::Error;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use std::{fmt, io};
use std::{error::Error, fmt, io, marker::PhantomData, pin::Pin, time::Duration};
pub use open_ssl::ssl::{AlpnError, Ssl, SslAcceptor, SslAcceptorBuilder};
pub use open_ssl::ssl::{self, AlpnError, Ssl, SslAcceptor, SslAcceptorBuilder};
pub use tokio_openssl::SslStream;
use futures::future::{ok, FutureExt, LocalBoxFuture, Ready};
use futures::future::{ok, Future, Ready};
use crate::codec::{AsyncRead, AsyncWrite};
use crate::rt::time::{sleep, Sleep};
@ -111,20 +106,13 @@ where
.expect("Provided SSL acceptor was invalid.");
AcceptorServiceResponse {
_guard: self.conns.get(),
io: None,
delay: if self.timeout == ZERO {
None
} else {
Some(sleep(self.timeout))
},
fut: async move {
let mut io = SslStream::new(ssl, req)?;
Pin::new(&mut io).accept().await.map_err(|e| {
let e: Box<dyn Error> = Box::new(e);
e
})?;
Ok(io)
}
.boxed_local(),
io_factory: Some(SslStream::new(ssl, req)),
}
}
}
@ -135,9 +123,10 @@ pin_project_lite::pin_project! {
T: AsyncRead,
T: AsyncWrite,
{
fut: LocalBoxFuture<'static, Result<SslStream<T>, Box<dyn Error>>>,
io: Option<SslStream<T>>,
#[pin]
delay: Option<Sleep>,
io_factory: Option<Result<SslStream<T>, open_ssl::error::ErrorStack>>,
_guard: CounterGuard,
}
}
@ -146,7 +135,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Future for AcceptorServiceResponse<T> {
type Output = Result<SslStream<T>, Box<dyn Error>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let this = self.project();
if let Some(delay) = this.delay.as_pin_mut() {
match delay.poll(cx) {
@ -160,7 +149,17 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Future for AcceptorServiceResponse<T> {
}
}
let io = futures::ready!(Pin::new(&mut this.fut).poll(cx))?;
Poll::Ready(Ok(io))
match this.io_factory.take() {
Some(Ok(io)) => *this.io = Some(io),
Some(Err(err)) => return Poll::Ready(Err(Box::new(err))),
None => (),
}
let io = this.io.as_mut().unwrap();
let res = futures::ready!(Pin::new(io).poll_accept(cx));
match res {
Ok(_) => Poll::Ready(Ok(this.io.take().unwrap())),
Err(e) => Poll::Ready(Err(Box::new(e))),
}
}
}