This commit is contained in:
Nikolay Kim 2024-08-28 00:26:25 +05:00
parent 6c907d1f45
commit ad195f3002
5 changed files with 16 additions and 16 deletions

View file

@ -144,9 +144,9 @@ impl Filter for Base {
self.0 .0.write_task.wake();
}
if len >= self.0.memory_pool().write_params_high()
&& !flags.contains(Flags::WR_BACKPRESSURE)
&& !flags.contains(Flags::BUF_W_BACKPRESSURE)
{
self.0 .0.insert_flags(Flags::WR_BACKPRESSURE);
self.0 .0.insert_flags(Flags::BUF_W_BACKPRESSURE);
self.0 .0.dispatch_task.wake();
}
}

View file

@ -22,9 +22,9 @@ bitflags::bitflags! {
/// wait while write task flushes buf
const BUF_W_MUST_FLUSH = 0b0000_0001_0000_0000;
/// write buffer is full
const WR_BACKPRESSURE = 0b0000_0010_0000_0000;
const BUF_W_BACKPRESSURE = 0b0000_0010_0000_0000;
/// write task paused
const WR_PAUSED = 0b0000_0100_0000_0000;
@ -37,11 +37,11 @@ bitflags::bitflags! {
impl Flags {
pub(crate) fn is_waiting_for_write(&self) -> bool {
self.intersects(Flags::BUF_W_MUST_FLUSH | Flags::WR_BACKPRESSURE)
self.intersects(Flags::BUF_W_MUST_FLUSH | Flags::BUF_W_BACKPRESSURE)
}
pub(crate) fn waiting_for_write_is_done(&mut self) {
self.remove(Flags::BUF_W_MUST_FLUSH | Flags::WR_BACKPRESSURE);
self.remove(Flags::BUF_W_MUST_FLUSH | Flags::BUF_W_BACKPRESSURE);
}
pub(crate) fn is_read_buf_ready(&self) -> bool {

View file

@ -539,7 +539,7 @@ impl<F> Io<F> {
} else if flags.contains(Flags::DSP_TIMEOUT) {
st.remove_flags(Flags::DSP_TIMEOUT);
Err(RecvError::KeepAlive)
} else if flags.contains(Flags::WR_BACKPRESSURE) {
} else if flags.contains(Flags::BUF_W_BACKPRESSURE) {
Err(RecvError::WriteBackpressure)
} else {
match self.poll_read_ready(cx) {
@ -579,12 +579,12 @@ impl<F> Io<F> {
st.dispatch_task.register(cx.waker());
return Poll::Pending;
} else if len >= st.pool.get().write_params_high() << 1 {
st.insert_flags(Flags::WR_BACKPRESSURE);
st.insert_flags(Flags::BUF_W_BACKPRESSURE);
st.dispatch_task.register(cx.waker());
return Poll::Pending;
}
}
st.remove_flags(Flags::BUF_W_MUST_FLUSH | Flags::WR_BACKPRESSURE);
st.remove_flags(Flags::BUF_W_MUST_FLUSH | Flags::BUF_W_BACKPRESSURE);
Poll::Ready(Ok(()))
}
}
@ -639,7 +639,7 @@ impl<F> Io<F> {
} else if flags.contains(Flags::DSP_TIMEOUT) {
st.remove_flags(Flags::DSP_TIMEOUT);
Poll::Ready(IoStatusUpdate::KeepAlive)
} else if flags.contains(Flags::WR_BACKPRESSURE) {
} else if flags.contains(Flags::BUF_W_BACKPRESSURE) {
Poll::Ready(IoStatusUpdate::WriteBackpressure)
} else {
st.dispatch_task.register(cx.waker());
@ -981,7 +981,7 @@ mod tests {
assert!(format!("{:?}", err).contains("Dispatcher stopped"));
client.write(TEXT);
server.st().insert_flags(Flags::WR_BACKPRESSURE);
server.st().insert_flags(Flags::BUF_W_BACKPRESSURE);
let item = server.recv(&BytesCodec).await.ok().unwrap().unwrap();
assert_eq!(item, TEXT);
}

View file

@ -44,7 +44,7 @@ impl IoRef {
#[inline]
/// Check if write back-pressure is enabled
pub fn is_wr_backpressure(&self) -> bool {
self.0.flags.get().contains(Flags::WR_BACKPRESSURE)
self.0.flags.get().contains(Flags::BUF_W_BACKPRESSURE)
}
#[inline]

View file

@ -317,10 +317,10 @@ impl WriteContext {
flags.waiting_for_write_is_done();
inner.dispatch_task.wake();
}
} else if flags.contains(Flags::WR_BACKPRESSURE)
} else if flags.contains(Flags::BUF_W_BACKPRESSURE)
&& len < inner.pool.get().write_params_high() << 1
{
flags.remove(Flags::WR_BACKPRESSURE);
flags.remove(Flags::BUF_W_BACKPRESSURE);
inner.dispatch_task.wake();
}
@ -374,10 +374,10 @@ impl WriteContext {
}
flags.insert(Flags::WR_PAUSED);
inner.flags.set(flags);
} else if flags.contains(Flags::WR_BACKPRESSURE)
} else if flags.contains(Flags::BUF_W_BACKPRESSURE)
&& len < inner.pool.get().write_params_high() << 1
{
flags.remove(Flags::WR_BACKPRESSURE);
flags.remove(Flags::BUF_W_BACKPRESSURE);
inner.flags.set(flags);
inner.dispatch_task.wake();
}