mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-04 21:37:58 +03:00
wip
This commit is contained in:
parent
38c5c6920f
commit
5233d615c8
2 changed files with 22 additions and 10 deletions
|
@ -228,7 +228,9 @@ mod tests {
|
||||||
#[ntex::test]
|
#[ntex::test]
|
||||||
async fn test_connect() {
|
async fn test_connect() {
|
||||||
let server = ntex::server::test_server(|| {
|
let server = ntex::server::test_server(|| {
|
||||||
ntex_service::fn_service(|_| async { Ok::<_, ()>(()) })
|
ntex_service::fn_service(|_| async {
|
||||||
|
ntex_util::time::sleep(ntex_util::time::Millis(100)).await;
|
||||||
|
Ok::<_, ()>(()) })
|
||||||
});
|
});
|
||||||
|
|
||||||
let srv = Connector::default().tag("T").memory_pool(PoolId::P5);
|
let srv = Connector::default().tag("T").memory_pool(PoolId::P5);
|
||||||
|
@ -242,6 +244,7 @@ mod tests {
|
||||||
let result = srv.connect(format!("{}", server.addr())).await;
|
let result = srv.connect(format!("{}", server.addr())).await;
|
||||||
assert!(result.is_ok());
|
assert!(result.is_ok());
|
||||||
|
|
||||||
|
println!("1-----------------------------------------");
|
||||||
let msg = Connect::new(format!("{}", server.addr())).set_addrs(vec![
|
let msg = Connect::new(format!("{}", server.addr())).set_addrs(vec![
|
||||||
format!("127.0.0.1:{}", server.addr().port() - 1)
|
format!("127.0.0.1:{}", server.addr().port() - 1)
|
||||||
.parse()
|
.parse()
|
||||||
|
@ -250,9 +253,11 @@ mod tests {
|
||||||
]);
|
]);
|
||||||
let result = crate::connect::connect(msg).await;
|
let result = crate::connect::connect(msg).await;
|
||||||
assert!(result.is_ok());
|
assert!(result.is_ok());
|
||||||
|
println!("2-----------------------------------------");
|
||||||
|
|
||||||
let msg = Connect::new(server.addr());
|
let msg = Connect::new(server.addr());
|
||||||
let result = crate::connect::connect(msg).await;
|
let result = crate::connect::connect(msg).await;
|
||||||
assert!(result.is_ok());
|
assert!(result.is_ok());
|
||||||
|
println!("3-----------------------------------------");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -149,6 +149,13 @@ impl<T> Handler for StreamOpsHandler<T> {
|
||||||
Change::Writable => {
|
Change::Writable => {
|
||||||
let item = &mut streams[id];
|
let item = &mut streams[id];
|
||||||
let result = item.context.with_write_buf(|buf| {
|
let result = item.context.with_write_buf(|buf| {
|
||||||
|
log::debug!(
|
||||||
|
"{}: writing {:?} SIZE: {:?}, BUF: {:?}",
|
||||||
|
item.context.tag(),
|
||||||
|
item.fd,
|
||||||
|
buf.len(),
|
||||||
|
buf,
|
||||||
|
);
|
||||||
let slice = &buf[..];
|
let slice = &buf[..];
|
||||||
syscall!(
|
syscall!(
|
||||||
break libc::write(item.fd, slice.as_ptr() as _, slice.len())
|
break libc::write(item.fd, slice.as_ptr() as _, slice.len())
|
||||||
|
@ -156,6 +163,11 @@ impl<T> Handler for StreamOpsHandler<T> {
|
||||||
});
|
});
|
||||||
|
|
||||||
if item.io.is_some() && result.is_pending() {
|
if item.io.is_some() && result.is_pending() {
|
||||||
|
log::debug!(
|
||||||
|
"{}: want write {:?}",
|
||||||
|
item.context.tag(),
|
||||||
|
item.fd,
|
||||||
|
);
|
||||||
self.inner.api.register(item.fd, id, Interest::Writable);
|
self.inner.api.register(item.fd, id, Interest::Writable);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -198,16 +210,17 @@ impl<T> Handler for StreamOpsHandler<T> {
|
||||||
fn close(id: usize, fd: RawFd, api: &DriverApi) -> ntex_rt::JoinHandle<io::Result<i32>> {
|
fn close(id: usize, fd: RawFd, api: &DriverApi) -> ntex_rt::JoinHandle<io::Result<i32>> {
|
||||||
api.unregister_all(fd);
|
api.unregister_all(fd);
|
||||||
ntex_rt::spawn_blocking(move || {
|
ntex_rt::spawn_blocking(move || {
|
||||||
syscall!(libc::shutdown(fd, libc::SHUT_RDWR))?;
|
//syscall!(libc::shutdown(fd, libc::SHUT_RDWR))?;
|
||||||
syscall!(libc::close(fd))
|
syscall!(libc::close(fd))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> StreamCtl<T> {
|
impl<T> StreamCtl<T> {
|
||||||
pub(crate) fn close(self) -> impl Future<Output = io::Result<()>> {
|
pub(crate) fn close(self) -> impl Future<Output = io::Result<()>> {
|
||||||
let (io, fd) =
|
let (context, io, fd) =
|
||||||
self.with(|streams| (streams[self.id].io.take(), streams[self.id].fd));
|
self.with(|streams| (streams[self.id].context.clone(), streams[self.id].io.take(), streams[self.id].fd));
|
||||||
let fut = if let Some(io) = io {
|
let fut = if let Some(io) = io {
|
||||||
|
log::debug!("{}: Closing ({}), {:?}", context.tag(), self.id, fd);
|
||||||
std::mem::forget(io);
|
std::mem::forget(io);
|
||||||
Some(close(self.id, fd, &self.inner.api))
|
Some(close(self.id, fd, &self.inner.api))
|
||||||
} else {
|
} else {
|
||||||
|
@ -299,12 +312,6 @@ impl<T> StreamCtl<T> {
|
||||||
self.with(|streams| {
|
self.with(|streams| {
|
||||||
let item = &mut streams[self.id];
|
let item = &mut streams[self.id];
|
||||||
|
|
||||||
log::debug!(
|
|
||||||
"{}: Resume io write ({}), {:?}",
|
|
||||||
item.context.tag(),
|
|
||||||
self.id,
|
|
||||||
item.fd
|
|
||||||
);
|
|
||||||
let result = item.context.with_write_buf(|buf| {
|
let result = item.context.with_write_buf(|buf| {
|
||||||
log::debug!(
|
log::debug!(
|
||||||
"{}: Writing io ({}), buf: {:?}",
|
"{}: Writing io ({}), buf: {:?}",
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue