Use new values api (#521)

This commit is contained in:
Nikolay Kim 2025-03-13 17:42:59 +05:00 committed by GitHub
parent cfc32ed74f
commit ab5fb624b7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 55 additions and 79 deletions

View file

@ -45,7 +45,8 @@ ntex-util = { path = "ntex-util" }
ntex-compio = { path = "ntex-compio" } ntex-compio = { path = "ntex-compio" }
ntex-tokio = { path = "ntex-tokio" } ntex-tokio = { path = "ntex-tokio" }
ntex-neon = { git = "https://github.com/ntex-rs/neon.git" } #ntex-neon = { git = "https://github.com/ntex-rs/neon.git" }
#ntex-neon = { path = "../dev/neon" }
[workspace.dependencies] [workspace.dependencies]
async-task = "4.5.0" async-task = "4.5.0"

View file

@ -90,27 +90,21 @@ struct ConnectOpsInner {
impl ConnectOps { impl ConnectOps {
pub(crate) fn current() -> Self { pub(crate) fn current() -> Self {
Runtime::with_current(|rt| { Runtime::value(|rt| {
if let Some(s) = rt.get::<Self>() { let mut inner = None;
s rt.driver().register(|api| {
} else { let ops = Rc::new(ConnectOpsInner {
let mut inner = None; api,
rt.driver().register(|api| { connects: RefCell::new(Slab::new()),
let ops = Rc::new(ConnectOpsInner {
api,
connects: RefCell::new(Slab::new()),
});
inner = Some(ops.clone());
Box::new(ConnectOpsBatcher {
inner: ops,
feed: VecDeque::new(),
})
}); });
inner = Some(ops.clone());
Box::new(ConnectOpsBatcher {
inner: ops,
feed: VecDeque::new(),
})
});
let s = ConnectOps(inner.unwrap()); ConnectOps(inner.unwrap())
rt.insert(s.clone());
s
}
}) })
} }

View file

@ -52,28 +52,22 @@ struct StreamOpsInner<T> {
impl<T: AsRawFd + 'static> StreamOps<T> { impl<T: AsRawFd + 'static> StreamOps<T> {
pub(crate) fn current() -> Self { pub(crate) fn current() -> Self {
Runtime::with_current(|rt| { Runtime::value(|rt| {
if let Some(s) = rt.get::<Self>() { let mut inner = None;
s rt.driver().register(|api| {
} else { let ops = Rc::new(StreamOpsInner {
let mut inner = None; api,
rt.driver().register(|api| { feed: Cell::new(Some(VecDeque::new())),
let ops = Rc::new(StreamOpsInner { streams: Cell::new(Some(Box::new(Slab::new()))),
api,
feed: Cell::new(Some(VecDeque::new())),
streams: Cell::new(Some(Box::new(Slab::new()))),
});
inner = Some(ops.clone());
Box::new(StreamOpsHandler {
inner: ops,
feed: VecDeque::new(),
})
}); });
inner = Some(ops.clone());
Box::new(StreamOpsHandler {
inner: ops,
feed: VecDeque::new(),
})
});
let s = StreamOps(inner.unwrap()); StreamOps(inner.unwrap())
rt.insert(s.clone());
s
}
}) })
} }

View file

@ -85,24 +85,17 @@ struct ConnectOpsInner {
impl ConnectOps { impl ConnectOps {
pub(crate) fn current() -> Self { pub(crate) fn current() -> Self {
Runtime::with_current(|rt| { Runtime::value(|rt| {
if let Some(s) = rt.get::<Self>() { let mut inner = None;
s rt.driver().register(|api| {
} else { let ops = Rc::new(ConnectOpsInner {
let mut inner = None; api,
rt.driver().register(|api| { ops: RefCell::new(Slab::new()),
let ops = Rc::new(ConnectOpsInner {
api,
ops: RefCell::new(Slab::new()),
});
inner = Some(ops.clone());
Box::new(ConnectOpsHandler { inner: ops })
}); });
inner = Some(ops.clone());
let s = ConnectOps(inner.unwrap()); Box::new(ConnectOpsHandler { inner: ops })
rt.insert(s.clone()); });
s ConnectOps(inner.unwrap())
}
}) })
} }

View file

@ -59,31 +59,25 @@ struct StreamOpsStorage<T> {
impl<T: AsRawFd + 'static> StreamOps<T> { impl<T: AsRawFd + 'static> StreamOps<T> {
pub(crate) fn current() -> Self { pub(crate) fn current() -> Self {
Runtime::with_current(|rt| { Runtime::value(|rt| {
if let Some(s) = rt.get::<Self>() { let mut inner = None;
s rt.driver().register(|api| {
} else { let mut ops = Slab::new();
let mut inner = None; ops.insert(Operation::Nop);
rt.driver().register(|api| {
let mut ops = Slab::new();
ops.insert(Operation::Nop);
let ops = Rc::new(StreamOpsInner { let ops = Rc::new(StreamOpsInner {
api, api,
feed: RefCell::new(Vec::new()), feed: RefCell::new(Vec::new()),
storage: RefCell::new(StreamOpsStorage { storage: RefCell::new(StreamOpsStorage {
ops, ops,
streams: Slab::new(), streams: Slab::new(),
}), }),
});
inner = Some(ops.clone());
Box::new(StreamOpsHandler { inner: ops })
}); });
inner = Some(ops.clone());
Box::new(StreamOpsHandler { inner: ops })
});
let s = StreamOps(inner.unwrap()); StreamOps(inner.unwrap())
rt.insert(s.clone());
s
}
}) })
} }