mirror of
https://github.com/ntex-rs/ntex.git
synced 2025-04-03 04:47:39 +03:00
cleanup codec::Framed
This commit is contained in:
parent
20a404ed1e
commit
a440ebb345
10 changed files with 93 additions and 126 deletions
|
@ -17,9 +17,9 @@ path = "src/lib.rs"
|
|||
|
||||
[dependencies]
|
||||
bitflags = "1.2.1"
|
||||
bytes = "0.5.2"
|
||||
futures-core = "0.3.1"
|
||||
futures-sink = "0.3.1"
|
||||
bytes = "0.5.4"
|
||||
futures-core = "0.3.4"
|
||||
futures-sink = "0.3.4"
|
||||
tokio = { version = "0.2.4", default-features=false }
|
||||
tokio-util = { version = "0.2.0", default-features=false, features=["codec"] }
|
||||
log = "0.4"
|
|
@ -195,16 +195,17 @@ impl<T, U> Framed<T, U> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T, U> Framed<T, U> {
|
||||
impl<T, U> Framed<T, U>
|
||||
where
|
||||
T: AsyncWrite + Unpin,
|
||||
U: Encoder,
|
||||
{
|
||||
#[inline]
|
||||
/// Serialize item and Write to the inner buffer
|
||||
pub fn write(
|
||||
&mut self,
|
||||
item: <U as Encoder>::Item,
|
||||
) -> Result<(), <U as Encoder>::Error>
|
||||
where
|
||||
T: AsyncWrite,
|
||||
U: Encoder,
|
||||
{
|
||||
) -> Result<(), <U as Encoder>::Error> {
|
||||
let remaining = self.write_buf.capacity() - self.write_buf.len();
|
||||
if remaining < LW {
|
||||
self.write_buf.reserve(HW - remaining);
|
||||
|
@ -214,6 +215,7 @@ impl<T, U> Framed<T, U> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Check if framed is able to write more data.
|
||||
///
|
||||
/// `Framed` object considers ready if there is free space in write buffer.
|
||||
|
@ -221,6 +223,48 @@ impl<T, U> Framed<T, U> {
|
|||
self.write_buf.len() < HW
|
||||
}
|
||||
|
||||
/// Flush write buffer to underlying I/O stream.
|
||||
pub fn flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), U::Error>> {
|
||||
log::trace!("flushing framed transport");
|
||||
|
||||
while !self.write_buf.is_empty() {
|
||||
log::trace!("writing; remaining={}", self.write_buf.len());
|
||||
|
||||
let n = ready!(Pin::new(&mut self.io).poll_write(cx, &self.write_buf))?;
|
||||
if n == 0 {
|
||||
return Poll::Ready(Err(io::Error::new(
|
||||
io::ErrorKind::WriteZero,
|
||||
"failed to write frame to transport",
|
||||
)
|
||||
.into()));
|
||||
}
|
||||
|
||||
// remove written data
|
||||
self.write_buf.advance(n);
|
||||
}
|
||||
|
||||
// Try flushing the underlying IO
|
||||
ready!(Pin::new(&mut self.io).poll_flush(cx))?;
|
||||
|
||||
log::trace!("framed transport flushed");
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Flush write buffer and shutdown underlying I/O stream.
|
||||
pub fn close(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), U::Error>> {
|
||||
ready!(Pin::new(&mut self.io).poll_flush(cx))?;
|
||||
ready!(Pin::new(&mut self.io).poll_shutdown(cx))?;
|
||||
log::trace!("framed transport flushed and closed");
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> Framed<T, U>
|
||||
where
|
||||
T: AsyncRead + Unpin,
|
||||
U: Decoder,
|
||||
{
|
||||
/// Try to read underlying I/O stream and decode item.
|
||||
pub fn next_item(
|
||||
&mut self,
|
||||
|
@ -267,9 +311,8 @@ impl<T, U> Framed<T, U> {
|
|||
if remaining < LW {
|
||||
self.read_buf.reserve(HW - remaining)
|
||||
}
|
||||
let cnt = match unsafe {
|
||||
Pin::new_unchecked(&mut self.io).poll_read_buf(cx, &mut self.read_buf)
|
||||
} {
|
||||
let cnt = match Pin::new(&mut self.io).poll_read_buf(cx, &mut self.read_buf)
|
||||
{
|
||||
Poll::Pending => return Poll::Pending,
|
||||
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
|
||||
Poll::Ready(Ok(cnt)) => cnt,
|
||||
|
@ -281,62 +324,16 @@ impl<T, U> Framed<T, U> {
|
|||
self.flags.insert(Flags::READABLE);
|
||||
}
|
||||
}
|
||||
|
||||
/// Flush write buffer to underlying I/O stream.
|
||||
pub fn flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), U::Error>>
|
||||
where
|
||||
T: AsyncWrite,
|
||||
U: Encoder,
|
||||
{
|
||||
log::trace!("flushing framed transport");
|
||||
|
||||
while !self.write_buf.is_empty() {
|
||||
log::trace!("writing; remaining={}", self.write_buf.len());
|
||||
|
||||
let n = ready!(unsafe {
|
||||
Pin::new_unchecked(&mut self.io).poll_write(cx, &self.write_buf)
|
||||
})?;
|
||||
|
||||
if n == 0 {
|
||||
return Poll::Ready(Err(io::Error::new(
|
||||
io::ErrorKind::WriteZero,
|
||||
"failed to write frame to transport",
|
||||
)
|
||||
.into()));
|
||||
}
|
||||
|
||||
// remove written data
|
||||
self.write_buf.advance(n);
|
||||
}
|
||||
|
||||
// Try flushing the underlying IO
|
||||
ready!(unsafe { Pin::new_unchecked(&mut self.io).poll_flush(cx) })?;
|
||||
|
||||
log::trace!("framed transport flushed");
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
/// Flush write buffer and shutdown underlying I/O stream.
|
||||
pub fn close(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), U::Error>>
|
||||
where
|
||||
T: AsyncWrite,
|
||||
U: Encoder,
|
||||
{
|
||||
unsafe {
|
||||
ready!(Pin::new_unchecked(&mut self.io).poll_flush(cx))?;
|
||||
ready!(Pin::new_unchecked(&mut self.io).poll_shutdown(cx))?;
|
||||
}
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> Stream for Framed<T, U>
|
||||
where
|
||||
T: AsyncRead,
|
||||
T: AsyncRead + Unpin,
|
||||
U: Decoder,
|
||||
{
|
||||
type Item = Result<U::Item, U::Error>;
|
||||
|
||||
#[inline]
|
||||
fn poll_next(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
|
@ -347,12 +344,13 @@ where
|
|||
|
||||
impl<T, U> Sink<U::Item> for Framed<T, U>
|
||||
where
|
||||
T: AsyncWrite,
|
||||
T: AsyncWrite + Unpin,
|
||||
U: Encoder,
|
||||
U::Error: From<io::Error>,
|
||||
{
|
||||
type Error = U::Error;
|
||||
|
||||
#[inline]
|
||||
fn poll_ready(
|
||||
self: Pin<&mut Self>,
|
||||
_: &mut Context<'_>,
|
||||
|
@ -364,6 +362,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn start_send(
|
||||
mut self: Pin<&mut Self>,
|
||||
item: <U as Encoder>::Item,
|
||||
|
@ -371,6 +370,7 @@ where
|
|||
self.write(item)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn poll_flush(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
|
@ -378,6 +378,7 @@ where
|
|||
self.flush(cx)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn poll_close(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue