diff --git a/ntex-net/src/connect/service.rs b/ntex-net/src/connect/service.rs index 9e6a0549..d8f07693 100644 --- a/ntex-net/src/connect/service.rs +++ b/ntex-net/src/connect/service.rs @@ -228,7 +228,9 @@ mod tests { #[ntex::test] async fn test_connect() { let server = ntex::server::test_server(|| { - ntex_service::fn_service(|_| async { Ok::<_, ()>(()) }) + ntex_service::fn_service(|_| async { + ntex_util::time::sleep(ntex_util::time::Millis(100)).await; + Ok::<_, ()>(()) }) }); let srv = Connector::default().tag("T").memory_pool(PoolId::P5); @@ -242,6 +244,7 @@ mod tests { let result = srv.connect(format!("{}", server.addr())).await; assert!(result.is_ok()); + println!("1-----------------------------------------"); let msg = Connect::new(format!("{}", server.addr())).set_addrs(vec![ format!("127.0.0.1:{}", server.addr().port() - 1) .parse() @@ -250,9 +253,11 @@ mod tests { ]); let result = crate::connect::connect(msg).await; assert!(result.is_ok()); + println!("2-----------------------------------------"); let msg = Connect::new(server.addr()); let result = crate::connect::connect(msg).await; assert!(result.is_ok()); + println!("3-----------------------------------------"); } } diff --git a/ntex-net/src/rt_polling/driver.rs b/ntex-net/src/rt_polling/driver.rs index 58ae377f..59875abb 100644 --- a/ntex-net/src/rt_polling/driver.rs +++ b/ntex-net/src/rt_polling/driver.rs @@ -149,6 +149,13 @@ impl Handler for StreamOpsHandler { Change::Writable => { let item = &mut streams[id]; let result = item.context.with_write_buf(|buf| { + log::debug!( + "{}: writing {:?} SIZE: {:?}, BUF: {:?}", + item.context.tag(), + item.fd, + buf.len(), + buf, + ); let slice = &buf[..]; syscall!( break libc::write(item.fd, slice.as_ptr() as _, slice.len()) @@ -156,6 +163,11 @@ impl Handler for StreamOpsHandler { }); if item.io.is_some() && result.is_pending() { + log::debug!( + "{}: want write {:?}", + item.context.tag(), + item.fd, + ); self.inner.api.register(item.fd, id, Interest::Writable); } } @@ -198,16 +210,17 @@ impl Handler for StreamOpsHandler { fn close(id: usize, fd: RawFd, api: &DriverApi) -> ntex_rt::JoinHandle> { api.unregister_all(fd); ntex_rt::spawn_blocking(move || { - syscall!(libc::shutdown(fd, libc::SHUT_RDWR))?; + //syscall!(libc::shutdown(fd, libc::SHUT_RDWR))?; syscall!(libc::close(fd)) }) } impl StreamCtl { pub(crate) fn close(self) -> impl Future> { - let (io, fd) = - self.with(|streams| (streams[self.id].io.take(), streams[self.id].fd)); + let (context, io, fd) = + self.with(|streams| (streams[self.id].context.clone(), streams[self.id].io.take(), streams[self.id].fd)); let fut = if let Some(io) = io { + log::debug!("{}: Closing ({}), {:?}", context.tag(), self.id, fd); std::mem::forget(io); Some(close(self.id, fd, &self.inner.api)) } else { @@ -299,12 +312,6 @@ impl StreamCtl { self.with(|streams| { let item = &mut streams[self.id]; - log::debug!( - "{}: Resume io write ({}), {:?}", - item.context.tag(), - self.id, - item.fd - ); let result = item.context.with_write_buf(|buf| { log::debug!( "{}: Writing io ({}), buf: {:?}",