This commit is contained in:
Nikolay Kim 2025-03-27 20:39:52 +01:00
parent 8f81f129ac
commit 57c7d5ef1c
3 changed files with 18 additions and 18 deletions

View file

@ -190,7 +190,13 @@ impl<T> Handler for StreamOpsHandler<T> {
fn error(&mut self, id: usize, err: io::Error) { fn error(&mut self, id: usize, err: io::Error) {
self.inner.with(|streams| { self.inner.with(|streams| {
if let Some(item) = streams.get_mut(id) { if let Some(item) = streams.get_mut(id) {
log::debug!("FD is failed ({}) {:?}, err: {:?}", id, item.fd, err); log::debug!(
"{}: FD is failed ({}) {:?}, err: {:?}",
item.tag(),
id,
item.fd,
err
);
item.context.stopped(Some(err)); item.context.stopped(Some(err));
if item.io.take().is_some() { if item.io.take().is_some() {
close(id as u32, item, &self.inner.api); close(id as u32, item, &self.inner.api);

View file

@ -33,6 +33,12 @@ struct StreamItem<T> {
wr_op: Option<NonZeroU32>, wr_op: Option<NonZeroU32>,
} }
impl<T> StreamItem<T> {
fn tag(&self) -> &'static str {
self.context.tag()
}
}
enum Operation { enum Operation {
Recv { Recv {
id: usize, id: usize,
@ -249,7 +255,7 @@ impl<T> Handler for StreamOpsHandler<T> {
if storage.streams[id].ref_count == 0 { if storage.streams[id].ref_count == 0 {
let mut item = storage.streams.remove(id); let mut item = storage.streams.remove(id);
log::debug!("{}: Drop io ({}), {:?}", item.context.tag(), id, item.fd); log::debug!("{}: Drop io ({}), {:?}", item.tag(), id, item.fd);
if let Some(io) = item.io.take() { if let Some(io) = item.io.take() {
mem::forget(io); mem::forget(io);
@ -273,7 +279,7 @@ impl<T> StreamOpsStorage<T> {
if let Poll::Ready(mut buf) = item.context.get_read_buf() { if let Poll::Ready(mut buf) = item.context.get_read_buf() {
log::debug!( log::debug!(
"{}: Recv resume ({}), {:?} rem: {:?}", "{}: Recv resume ({}), {:?} rem: {:?}",
item.context.tag(), item.tag(),
id, id,
item.fd, item.fd,
buf.remaining_mut() buf.remaining_mut()
@ -306,7 +312,7 @@ impl<T> StreamOpsStorage<T> {
if let Poll::Ready(buf) = item.context.get_write_buf() { if let Poll::Ready(buf) = item.context.get_write_buf() {
log::debug!( log::debug!(
"{}: Send resume ({}), {:?} len: {:?}", "{}: Send resume ({}), {:?} len: {:?}",
item.context.tag(), item.tag(),
id, id,
item.fd, item.fd,
buf.len() buf.len()
@ -396,12 +402,7 @@ impl<T> StreamCtl<T> {
if let Some(rd_op) = item.rd_op { if let Some(rd_op) = item.rd_op {
if !item.flags.contains(Flags::RD_CANCELING) { if !item.flags.contains(Flags::RD_CANCELING) {
log::debug!( log::debug!("{}: Recv to pause ({}), {:?}", item.tag(), self.id, item.fd);
"{}: Recv to pause ({}), {:?}",
item.context.tag(),
self.id,
item.fd
);
item.flags.insert(Flags::RD_CANCELING); item.flags.insert(Flags::RD_CANCELING);
self.inner.api.cancel(rd_op.get()); self.inner.api.cancel(rd_op.get());
} }
@ -426,12 +427,7 @@ impl<T> Drop for StreamCtl<T> {
if storage.streams[self.id].ref_count == 0 { if storage.streams[self.id].ref_count == 0 {
let mut item = storage.streams.remove(self.id); let mut item = storage.streams.remove(self.id);
if let Some(io) = item.io.take() { if let Some(io) = item.io.take() {
log::debug!( log::debug!("{}: Close io ({}), {:?}", item.tag(), self.id, item.fd);
"{}: Close io ({}), {:?}",
item.context.tag(),
self.id,
item.fd
);
mem::forget(io); mem::forget(io);
let id = storage.ops.insert(Operation::Close { tx: None }); let id = storage.ops.insert(Operation::Close { tx: None });

View file

@ -184,10 +184,8 @@ impl<F: ServerConfiguration> HandleCmdState<F> {
if self.next > self.workers.len() { if self.next > self.workers.len() {
self.next = self.workers.len() - 1; self.next = self.workers.len() - 1;
} }
log::debug!("--------- SENDING ITEM");
match self.workers[self.next].send(item) { match self.workers[self.next].send(item) {
Ok(()) => { Ok(()) => {
log::debug!("--------- ITEM SENT");
self.next = (self.next + 1) % self.workers.len(); self.next = (self.next + 1) % self.workers.len();
break; break;
} }