Adjust min io timeout (#275)

This commit is contained in:
Nikolay Kim 2024-01-07 21:15:40 +06:00 committed by GitHub
parent 566339ee70
commit 0ba753c967
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 25 additions and 20 deletions

View file

@ -4,6 +4,8 @@
* Use "async fn" in trait for Service definition
* Min timeout more than 1sec
## [0.3.17] - 2023-12-25
* Fix filter leak during Io drop

View file

@ -1030,7 +1030,7 @@ mod tests {
let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"));
sleep(Millis(1500)).await;
sleep(Millis(2000)).await;
// write side must be closed, dispatcher should fail with keep-alive
let flags = state.flags();
@ -1080,7 +1080,7 @@ mod tests {
client.write("12345678");
let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"12345678"));
sleep(Millis(1250)).await;
sleep(Millis(2000)).await;
// write side must be closed, dispatcher should fail with keep-alive
let flags = state.flags();
@ -1149,7 +1149,6 @@ mod tests {
#[ntex::test]
async fn test_read_timeout() {
let _ = env_logger::init();
let (client, server) = IoTest::create();
client.remote_buffer_cap(1024);
@ -1189,13 +1188,13 @@ mod tests {
assert_eq!(buf, Bytes::from_static(b"12345678"));
client.write("1");
sleep(Millis(500)).await;
sleep(Millis(1000)).await;
assert!(!state.flags().contains(Flags::IO_STOPPING));
client.write("23");
sleep(Millis(500)).await;
sleep(Millis(1000)).await;
assert!(!state.flags().contains(Flags::IO_STOPPING));
client.write("4");
sleep(Millis(1100)).await;
sleep(Millis(2000)).await;
// write side must be closed, dispatcher should fail with keep-alive
assert!(state.flags().contains(Flags::IO_STOPPING));

View file

@ -703,17 +703,17 @@ impl<F> Drop for Io<F> {
fn drop(&mut self) {
self.stop_timer();
if !self.0.flags().contains(Flags::IO_STOPPED) {
log::trace!(
"{}: Io is dropped, force stopping io streams {:?}",
self.tag(),
self.0.flags()
);
}
// filter must be dropped, it is unsafe
// and wont be dropped without special attention
if self.1.is_set() {
if !self.0.flags().contains(Flags::IO_STOPPED) {
log::trace!(
"{}: Io is dropped, force stopping io streams {:?}",
self.tag(),
self.0.flags()
);
}
self.force_close();
self.1.drop_filter();
self.0 .0.filter.set(NullFilter::get());

View file

@ -99,9 +99,13 @@ pub(crate) fn register(timeout: Seconds, io: &IoRef) -> TimerHandle {
TIMER.with(|timer| {
// setup current delta
if !timer.running.get() {
timer
.current
.set((now() - timer.base.get()).as_secs() as u32);
let current = (now() - timer.base.get()).as_secs() as u32;
timer.current.set(current);
log::debug!(
"{}: Timer driver does not run, current: {}",
io.tag(),
current
);
}
let hnd = {
@ -128,8 +132,8 @@ pub(crate) fn register(timeout: Seconds, io: &IoRef) -> TimerHandle {
loop {
sleep(SEC).await;
let stop = TIMER.with(|timer| {
let current = timer.current.get() + 1;
timer.current.set(current);
let current = timer.current.get();
timer.current.set(current + 1);
// notify io dispatcher
let mut inner = timer.storage.borrow_mut();

View file

@ -1497,7 +1497,7 @@ mod tests {
for _ in 1..8 {
let random_bytes: Vec<u8> = (0..256).map(|_| rand::random::<u8>()).collect();
client.write(random_bytes);
sleep(Millis(350)).await;
sleep(Millis(750)).await;
}
assert!(mark.load(Ordering::Relaxed) == 1536);
}