Rename RecvError::StopDispatcher to RecvError::Stop

This commit is contained in:
Nikolay Kim 2021-12-26 23:23:57 +06:00
parent 19c6a2b731
commit a598b741c7
11 changed files with 70 additions and 17 deletions

View file

@ -2,6 +2,8 @@
## [0.1.0-b.6] - 2021-12-26
* Rename `RecvError::StopDispatcher` to `RecvError::Stop`
* Better error information for .poll_recv() method.
* Remove redundant Io::poll_write_backpressure() method.
@ -10,6 +12,8 @@
* Fix read filters ordering
* Fix read filter root buffer
## [0.1.0-b.5] - 2021-12-24
* Use new ntex-service traits

View file

@ -226,7 +226,7 @@ where
slf.st.set(DispatcherState::Stop);
DispatchItem::KeepAliveTimeout
}
Err(RecvError::StopDispatcher) => {
Err(RecvError::Stop) => {
log::trace!("dispatcher is instructed to stop");
slf.st.set(DispatcherState::Stop);
continue;

View file

@ -98,7 +98,7 @@ impl Filter for Base {
#[inline]
fn get_read_buf(&self) -> Option<BytesMut> {
None
self.0 .0.read_buf.take()
}
#[inline]

View file

@ -426,7 +426,7 @@ impl<F> Io<F> {
io::ErrorKind::Other,
"Keep-alive",
))),
Err(RecvError::StopDispatcher) => Err(Either::Right(io::Error::new(
Err(RecvError::Stop) => Err(Either::Right(io::Error::new(
io::ErrorKind::Other,
"Dispatcher stopped",
))),
@ -558,7 +558,7 @@ impl<F> Io<F> {
Ok(None) => {
let flags = self.flags();
if flags.contains(Flags::DSP_STOP) {
Poll::Ready(Err(RecvError::StopDispatcher))
Poll::Ready(Err(RecvError::Stop))
} else if flags.contains(Flags::DSP_KEEPALIVE) {
Poll::Ready(Err(RecvError::KeepAlive))
} else if flags.contains(Flags::WR_BACKPRESSURE) {

View file

@ -324,6 +324,27 @@ mod tests {
assert!(state.flags().contains(Flags::IO_SHUTDOWN));
}
#[ntex::test]
async fn read_readiness() {
let (client, server) = IoTest::create();
client.remote_buffer_cap(1024);
let io = Io::new(server);
assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_pending());
client.write(TEXT);
assert_eq!(io.read_ready().await.unwrap(), Some(()));
assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_pending());
let item = io.with_read_buf(|buffer| buffer.clone());
assert_eq!(item, Bytes::from_static(BIN));
client.write(TEXT);
sleep(Millis(50)).await;
assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_ready());
assert!(lazy(|cx| io.poll_read_ready(cx)).await.is_pending());
}
#[ntex::test]
async fn on_disconnect() {
let (client, server) = IoTest::create();

View file

@ -103,8 +103,8 @@ pub enum RecvError<U: Decoder> {
KeepAlive,
/// Write backpressure is enabled
WriteBackpressure,
/// Dispatcher marked stopped
StopDispatcher,
/// Stop io stream handling
Stop,
/// Unrecoverable frame decoding errors
Decoder(U::Error),
/// Peer is disconnected

View file

@ -176,7 +176,7 @@ impl Stream for PlStream {
Err(RecvError::KeepAlive) => {
Err(io::Error::new(io::ErrorKind::Other, "Keep-alive").into())
}
Err(RecvError::StopDispatcher) => {
Err(RecvError::Stop) => {
Err(io::Error::new(io::ErrorKind::Other, "Dispatcher stopped")
.into())
}

View file

@ -328,7 +328,7 @@ where
*this.st = State::Stop;
this.inner.error = Some(DispatchError::Disconnect(err));
}
Err(RecvError::StopDispatcher) => {
Err(RecvError::Stop) => {
log::trace!("dispatcher is instructed to stop");
*this.st = State::Stop;
}
@ -607,7 +607,7 @@ where
io::Error::new(io::ErrorKind::Other, "Keep-alive")
.into()
}
RecvError::StopDispatcher => {
RecvError::Stop => {
payload
.1
.set_error(PayloadError::EncodingCorrupted);

View file

@ -7,22 +7,50 @@ use ntex::server::test_server;
use ntex::service::{fn_service, Service, ServiceFactory};
use ntex::util::Bytes;
#[cfg(feature = "openssl")]
fn ssl_acceptor() -> tls_openssl::ssl::SslAcceptor {
use tls_openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
// load ssl keys
let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
builder
.set_private_key_file("./tests/key.pem", SslFiletype::PEM)
.unwrap();
builder
.set_certificate_chain_file("./tests/cert.pem")
.unwrap();
builder.build()
}
#[cfg(feature = "openssl")]
#[ntex::test]
async fn test_string() {
async fn test_openssl_string() {
use ntex::server::openssl;
use tls_openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
let srv = test_server(|| {
fn_service(|io: Io| async move {
ntex::pipeline_factory(fn_service(|io: Io<_>| async move {
let res = io.read_ready().await;
assert!(res.is_ok());
Ok(io)
}))
.and_then(openssl::Acceptor::new(ssl_acceptor()))
.and_then(fn_service(|io: Io<_>| async move {
io.send(Bytes::from_static(b"test"), &BytesCodec)
.await
.unwrap();
Ok::<_, io::Error>(())
})
Ok::<_, Box<dyn std::error::Error>>(())
}))
});
let conn = ntex::connect::Connector::default();
let addr = format!("localhost:{}", srv.addr().port());
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
builder.set_verify(SslVerifyMode::NONE);
let conn = ntex::connect::openssl::Connector::new(builder.build());
let addr = format!("127.0.0.1:{}", srv.addr().port());
let con = conn.call(addr.into()).await.unwrap();
assert_eq!(con.query::<PeerAddr>().get().unwrap(), srv.addr().into());
let item = con.recv(&BytesCodec).await.unwrap().unwrap();
assert_eq!(item, Bytes::from_static(b"test"));
}
#[cfg(feature = "rustls")]

View file

@ -481,6 +481,7 @@ async fn test_ws_transport() {
io.send(ws::Message::Binary(Bytes::from_static(b"text")), &codec)
.await
.unwrap();
let item = io.recv(&codec).await.unwrap().unwrap();
assert_eq!(item, ws::Frame::Binary(Bytes::from_static(b"text")));
}

View file

@ -83,7 +83,6 @@ async fn test_simple() {
#[ntex::test]
async fn test_transport() {
env_logger::init();
let mut srv = test_server(|| {
HttpService::build()
.upgrade(|(req, io, codec): (Request, Io, h1::Codec)| {