This commit is contained in:
Nikolay Kim 2025-03-27 20:16:51 +01:00
parent b83180347c
commit 2c226f4cf4
7 changed files with 13 additions and 41 deletions

View file

@ -45,9 +45,6 @@ ntex-util = { path = "ntex-util" }
ntex-compio = { path = "ntex-compio" }
ntex-tokio = { path = "ntex-tokio" }
ntex-neon = { git = "https://github.com/ntex-rs/neon.git", branch = "iour-fix" }
#ntex-neon = { path = "../dev/neon" }
[workspace.dependencies]
async-task = "4.5.0"
bitflags = "2"

View file

@ -62,9 +62,7 @@ impl ConnectOps {
let item = Item { fd, sender };
let id = self.0.connects.borrow_mut().insert(item);
self.0
.api
.attach("-", fd, id as u32, Some(Event::writable(0)));
self.0.api.attach(fd, id as u32, Some(Event::writable(0)));
Ok(id)
}
}
@ -95,7 +93,7 @@ impl Handler for ConnectOpsBatcher {
Err(io::Error::from_raw_os_error(err))
};
self.inner.api.detach("-", item.fd, id as u32);
self.inner.api.detach(item.fd, id as u32);
let _ = item.sender.send(res);
}
}
@ -107,7 +105,7 @@ impl Handler for ConnectOpsBatcher {
if connects.contains(id) {
let item = connects.remove(id);
let _ = item.sender.send(Err(err));
self.inner.api.detach("-", item.fd, id as u32);
self.inner.api.detach(item.fd, id as u32);
}
}
}

View file

@ -86,7 +86,6 @@ impl<T: AsRawFd + 'static> StreamOps<T> {
});
self.0.api.attach(
tag,
fd,
stream.id,
Some(Event::new(0, false, false).with_interrupt()),
@ -163,9 +162,7 @@ impl<T> Handler for StreamOpsHandler<T> {
renew_ev.writable = true;
}
self.inner
.api
.modify(item.tag(), item.fd, id as u32, renew_ev);
self.inner.api.modify(item.fd, id as u32, renew_ev);
// delayed drops
if self.inner.delayd_drop.get() {
@ -223,7 +220,7 @@ fn close<T>(
) -> ntex_rt::JoinHandle<io::Result<i32>> {
let fd = item.fd;
item.flags.insert(Flags::CLOSED);
api.detach(item.tag(), fd, id);
api.detach(fd, id);
ntex_rt::spawn_blocking(move || {
syscall!(libc::shutdown(fd, libc::SHUT_RDWR))?;
syscall!(libc::close(fd))
@ -327,7 +324,7 @@ impl<T> StreamCtl<T> {
}
}
self.inner.api.modify(item.tag(), item.fd, self.id, event);
self.inner.api.modify(item.fd, self.id, event);
})
}
}

View file

@ -214,7 +214,7 @@ impl<F: ServerConfiguration> HandleCmdState<F> {
match upd {
Update::Available(worker) => {
self.workers.push(worker);
if self.workers.len() == 1 {
if !self.workers.is_empty() {
self.mgr.resume();
} else {
self.workers.sort();

View file

@ -202,9 +202,7 @@ impl Accept {
let mut timeout = Some(Duration::ZERO);
loop {
println!("------- ACCEPT LOOP");
if let Err(e) = self.poller.wait(&mut events, timeout) {
println!("------- ACCEPT LOOP ERR: {:?}", e);
if e.kind() == io::ErrorKind::Interrupted {
continue;
} else {
@ -217,14 +215,12 @@ impl Accept {
let _ = self.tx.take().unwrap().send(());
}
println!("------- ACCEPTING: {:?}", events.len());
for idx in 0..self.sockets.len() {
if !self.sockets[idx].registered.get() {
println!("------- ACCEPTING NOT REGISTEREED: {:?}", idx);
}
let readd = self.accept(idx);
if readd {
self.add_source(idx);
if self.sockets[idx].registered.get() {
let readd = self.accept(idx);
if readd {
self.add_source(idx);
}
}
}
@ -398,26 +394,19 @@ impl Accept {
}
fn accept(&mut self, token: usize) -> bool {
println!("------- ACCEPTING 1 {:?}", token);
loop {
if let Some(info) = self.sockets.get_mut(token) {
let item = info.sock.accept();
println!("------- ACCEPTING {:?}", item);
//match info.sock.accept() {
match item {
match info.sock.accept() {
Ok(Some(io)) => {
let msg = Connection {
io,
token: info.token,
};
println!("------- ACCEPTED {:?}", msg);
if let Err(msg) = self.srv.process(msg) {
log::trace!("Server is unavailable");
self.backlog.push_back(msg);
self.backpressure(true);
return false;
} else {
log::debug!("------- SENT ACCEPTED");
}
}
Ok(None) => return true,

View file

@ -40,7 +40,6 @@ impl<T> Server<T> {
/// Send item to worker pool
pub fn process(&self, item: T) -> Result<(), T> {
if self.shared.paused.load(Ordering::Acquire) {
println!("--------- PAUSED");
Err(item)
} else if let Err(e) = self.cmd.try_send(ServerCommand::Item(item)) {
match e.into_inner() {

View file

@ -260,10 +260,7 @@ where
T: Send + 'static,
F: ServiceFactory<T> + 'static,
{
println!("------- start worker {:?}", wrk.id);
loop {
//println!("------- run worker {:?}", wrk.id);
let mut recv = std::pin::pin!(wrk.rx.recv());
let fut = poll_fn(|cx| {
wrk.waker.register(cx.waker());
@ -279,11 +276,8 @@ where
}
}
println!("------- waiting socket {:?}", wrk.id);
match ready!(recv.as_mut().poll(cx)) {
Ok(item) => {
println!("------- got {:?}", wrk.id);
let fut = svc.call(item);
let _ = spawn(async move {
let _ = fut.await;
@ -291,8 +285,6 @@ where
Poll::Ready(Ok::<_, F::Error>(true))
}
Err(_) => {
println!("------- failed {:?}", wrk.id);
log::error!("Server is gone");
Poll::Ready(Ok(false))
}