Fix borrow mut panic #204 (#205)

* Fix borrow mut panic #204
This commit is contained in:
Nikolay Kim 2023-05-30 19:38:58 +06:00 committed by GitHub
parent efc817ffa2
commit adb74edf01
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 44 additions and 27 deletions

View file

@ -45,7 +45,7 @@ log = "0.4"
thiserror = "1.0" thiserror = "1.0"
ntex-tokio = { version = "0.2.3", optional = true } ntex-tokio = { version = "0.2.3", optional = true }
ntex-glommio = { version = "0.2.3", optional = true } ntex-glommio = { version = "0.2.4", optional = true }
ntex-async-std = { version = "0.2.2", optional = true } ntex-async-std = { version = "0.2.2", optional = true }
# openssl # openssl

View file

@ -1,5 +1,9 @@
# Changes # Changes
## [0.2.4] - 2023-05-30
* Fix borrow mut panic #204
## [0.2.3] - 2023-04-11 ## [0.2.3] - 2023-04-11
* Chore upgrade glommio to 0.8 * Chore upgrade glommio to 0.8

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ntex-glommio" name = "ntex-glommio"
version = "0.2.3" version = "0.2.4"
authors = ["ntex contributors <team@ntex.rs>"] authors = ["ntex contributors <team@ntex.rs>"]
description = "glommio intergration for ntex framework" description = "glommio intergration for ntex framework"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View file

@ -106,7 +106,8 @@ enum IoWriteState {
} }
enum Shutdown { enum Shutdown {
None(Pin<Box<dyn Future<Output = glommio::Result<(), ()>>>>), Flush,
Close(Pin<Box<dyn Future<Output = glommio::Result<(), ()>>>>),
Stopping(u16), Stopping(u16),
} }
@ -177,11 +178,7 @@ impl Future for WriteTask {
sleep(time) sleep(time)
}; };
let io = this.io.clone(); this.st = IoWriteState::Shutdown(timeout, Shutdown::Flush);
let fut = Box::pin(async move {
io.0.borrow().shutdown(std::net::Shutdown::Write).await
});
this.st = IoWriteState::Shutdown(timeout, Shutdown::None(fut));
self.poll(cx) self.poll(cx)
} }
Poll::Ready(WriteStatus::Terminate) => { Poll::Ready(WriteStatus::Terminate) => {
@ -199,16 +196,18 @@ impl Future for WriteTask {
// use disconnect timeout, otherwise it could hang forever. // use disconnect timeout, otherwise it could hang forever.
loop { loop {
match st { match st {
Shutdown::None(ref mut fut) => { Shutdown::Flush => {
// flush write buffer // flush write buffer
let mut io = this.io.0.borrow_mut(); let mut io = this.io.0.borrow_mut();
match this.state.with_buf(|buf| flush_io(&mut *io, buf, cx)) { match this.state.with_buf(|buf| flush_io(&mut *io, buf, cx)) {
Poll::Ready(Ok(())) => { Poll::Ready(Ok(())) => {
if ready!(fut.poll(cx)).is_err() { let io = this.io.clone();
this.state.close(None); let fut = Box::pin(async move {
return Poll::Ready(()); io.0.borrow()
} .shutdown(std::net::Shutdown::Write)
*st = Shutdown::Stopping(0); .await
});
*st = Shutdown::Close(fut);
continue; continue;
} }
Poll::Ready(Err(err)) => { Poll::Ready(Err(err)) => {
@ -222,6 +221,14 @@ impl Future for WriteTask {
Poll::Pending => (), Poll::Pending => (),
} }
} }
Shutdown::Close(ref mut fut) => {
if ready!(fut.poll(cx)).is_err() {
this.state.close(None);
return Poll::Ready(());
}
*st = Shutdown::Stopping(0);
continue;
}
Shutdown::Stopping(ref mut count) => { Shutdown::Stopping(ref mut count) => {
// read until 0 or err // read until 0 or err
let mut buf = [0u8; 512]; let mut buf = [0u8; 512];
@ -479,11 +486,7 @@ impl Future for UnixWriteTask {
sleep(time) sleep(time)
}; };
let io = this.io.clone(); this.st = IoWriteState::Shutdown(timeout, Shutdown::Flush);
let fut = Box::pin(async move {
io.0.borrow().shutdown(std::net::Shutdown::Write).await
});
this.st = IoWriteState::Shutdown(timeout, Shutdown::None(fut));
self.poll(cx) self.poll(cx)
} }
Poll::Ready(WriteStatus::Terminate) => { Poll::Ready(WriteStatus::Terminate) => {
@ -501,16 +504,18 @@ impl Future for UnixWriteTask {
// use disconnect timeout, otherwise it could hang forever. // use disconnect timeout, otherwise it could hang forever.
loop { loop {
match st { match st {
Shutdown::None(ref mut fut) => { Shutdown::Flush => {
// flush write buffer // flush write buffer
let mut io = this.io.0.borrow_mut(); let mut io = this.io.0.borrow_mut();
match this.state.with_buf(|buf| flush_io(&mut *io, buf, cx)) { match this.state.with_buf(|buf| flush_io(&mut *io, buf, cx)) {
Poll::Ready(Ok(())) => { Poll::Ready(Ok(())) => {
if ready!(fut.poll(cx)).is_err() { let io = this.io.clone();
this.state.close(None); let fut = Box::pin(async move {
return Poll::Ready(()); io.0.borrow()
} .shutdown(std::net::Shutdown::Write)
*st = Shutdown::Stopping(0); .await
});
*st = Shutdown::Close(fut);
continue; continue;
} }
Poll::Ready(Err(err)) => { Poll::Ready(Err(err)) => {
@ -524,6 +529,14 @@ impl Future for UnixWriteTask {
Poll::Pending => (), Poll::Pending => (),
} }
} }
Shutdown::Close(ref mut fut) => {
if ready!(fut.poll(cx)).is_err() {
this.state.close(None);
return Poll::Ready(());
}
*st = Shutdown::Stopping(0);
continue;
}
Shutdown::Stopping(ref mut count) => { Shutdown::Stopping(ref mut count) => {
// read until 0 or err // read until 0 or err
let mut buf = [0u8; 512]; let mut buf = [0u8; 512];

View file

@ -1,7 +1,7 @@
//! Utilities for futures //! Utilities for futures
use std::{future::Future, mem, pin::Pin, task::Context, task::Poll}; use std::{future::Future, mem, pin::Pin, task::Context, task::Poll};
pub use futures_core::Stream; pub use futures_core::{Stream, TryFuture};
pub use futures_sink::Sink; pub use futures_sink::Sink;
mod either; mod either;

View file

@ -61,7 +61,7 @@ ntex-rt = "0.4.9"
ntex-io = "0.2.10" ntex-io = "0.2.10"
ntex-tls = "0.2.4" ntex-tls = "0.2.4"
ntex-tokio = { version = "0.2.3", optional = true } ntex-tokio = { version = "0.2.3", optional = true }
ntex-glommio = { version = "0.2.3", optional = true } ntex-glommio = { version = "0.2.4", optional = true }
ntex-async-std = { version = "0.2.1", optional = true } ntex-async-std = { version = "0.2.1", optional = true }
async-oneshot = "0.5.0" async-oneshot = "0.5.0"