Fix ws::stream::StreamDecoder, decodes buffer before reading from io #27

This commit is contained in:
Nikolay Kim 2020-09-22 18:09:10 +06:00
parent bf3e8a3de5
commit 9b88f7d0ff
3 changed files with 36 additions and 17 deletions

View file

@ -1,6 +1,8 @@
# Changes
## [0.1.24] - Unreleased
## [0.1.24] - 2020-09-22
* Fix ws::stream::StreamDecoder, decodes buffer before reading from io #27
* Drop deprecated ntex::framed mod

View file

@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.1.24-dev"
version = "0.1.24"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"
@ -39,11 +39,11 @@ cookie = ["coo-kie", "coo-kie/percent-encode"]
ntex-codec = "0.1.2"
ntex-rt = "0.1.1"
ntex-rt-macros = "0.1"
ntex-router = "0.3.5"
ntex-router = "0.3.7"
ntex-service = "0.1.3"
ntex-macros = "0.1"
actix-threadpool = "0.3.1"
actix-threadpool = "0.3.3"
base64 = "0.12"
bitflags = "1.2.1"
bytes = "0.5.6"
@ -70,7 +70,7 @@ sha-1 = "0.9.1"
slab = "0.4.2"
serde = { version = "1.0", features=["derive"] }
serde_json = "1.0"
serde_urlencoded = "0.6.1"
serde_urlencoded = "0.7.0"
socket2 = "0.3.12"
url = "2.1"
time = { version = "0.2.11", default-features = false, features = ["std"] }

View file

@ -62,22 +62,31 @@ where
#[inline]
fn poll_next(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let mut this = self.as_mut().project();
match ready!(this.stream.poll_next(cx)) {
Some(Ok(buf)) => {
this.buf.extend(&buf);
loop {
if !this.buf.is_empty() {
match this.codec.decode(&mut this.buf) {
Ok(Some(item)) => Poll::Ready(Some(Ok(item))),
Ok(None) => Poll::Pending,
Err(err) => Poll::Ready(Some(Err(err.into()))),
Ok(Some(item)) => return Poll::Ready(Some(Ok(item))),
Ok(None) => (),
Err(err) => return Poll::Ready(Some(Err(err.into()))),
}
}
Some(Err(err)) => Poll::Ready(Some(Err(StreamError::Stream(err)))),
None => Poll::Ready(None),
match this.stream.poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Some(Ok(buf))) => {
this.buf.extend(&buf);
this = self.as_mut().project();
}
Poll::Ready(Some(Err(err))) => {
return Poll::Ready(Some(Err(StreamError::Stream(err))))
}
Poll::Ready(None) => return Poll::Ready(None),
}
}
}
}
@ -180,13 +189,21 @@ mod tests {
let mut buf = BytesMut::new();
let mut codec = Codec::new().client_mode();
codec
.encode(Message::Text("test".to_string()), &mut buf)
.encode(Message::Text("test1".to_string()), &mut buf)
.unwrap();
codec
.encode(Message::Text("test2".to_string()), &mut buf)
.unwrap();
tx.send(Ok::<_, ()>(buf.split().freeze())).unwrap();
let frame = StreamExt::next(&mut decoder).await.unwrap().unwrap();
match frame {
Frame::Text(data) => assert_eq!(data, b"test"[..]),
Frame::Text(data) => assert_eq!(data, b"test1"[..]),
_ => panic!(),
}
let frame = StreamExt::next(&mut decoder).await.unwrap().unwrap();
match frame {
Frame::Text(data) => assert_eq!(data, b"test2"[..]),
_ => panic!(),
}
}