1
0
Fork 0
mirror of https://github.com/ntex-rs/ntex.git synced 2025-04-06 06:17:40 +03:00

fix WBackPressureEnabled/Disabled handling

This commit is contained in:
Nikolay Kim 2021-02-22 17:08:23 +06:00
parent 92dacafe06
commit 10ddcb0d59

View file

@ -215,7 +215,7 @@ where
Poll::Ready(item) => {
this.fut.set(None);
slf.shared.inflight.set(slf.shared.inflight.get() - 1);
let _ = slf.handle_result(item);
slf.handle_result(item);
}
}
}
@ -265,7 +265,7 @@ where
match this.fut.as_mut().as_pin_mut().unwrap().poll(cx) {
Poll::Ready(res) => {
this.fut.set(None);
ready!(slf.handle_result(res));
slf.handle_result(res);
}
Poll::Pending => {
slf.shared.inflight.set(slf.shared.inflight.get() + 1)
@ -297,7 +297,7 @@ where
match this.fut.as_mut().as_pin_mut().unwrap().poll(cx) {
Poll::Ready(res) => {
this.fut.set(None);
ready!(slf.handle_result(res));
slf.handle_result(res);
}
Poll::Pending => {
slf.shared.inflight.set(slf.shared.inflight.get() + 1)
@ -356,17 +356,10 @@ where
crate::rt::spawn(fut.map(move |item| shared.handle_result(item, &st)));
}
fn handle_result(
&self,
item: Result<Option<<U as Encoder>::Item>, S::Error>,
) -> Poll<()> {
fn handle_result(&self, item: Result<Option<<U as Encoder>::Item>, S::Error>) {
match self.state.write_result(item, &self.shared.codec) {
Ok(true) => (),
Ok(false) => {
// instruct write task to notify dispatcher when data is flushed
self.state.enable_write_backpressure();
return Poll::Pending;
}
Ok(false) => self.state.enable_write_backpressure(),
Err(Either::Left(err)) => {
self.error.set(Some(err));
}
@ -374,7 +367,6 @@ where
self.shared.error.set(Some(DispatcherError::Encoder(err)))
}
}
Poll::Ready(())
}
fn poll_service(&self, srv: &S, cx: &mut Context<'_>) -> Poll<PollService<U>> {