Skip to content

Commit 79ef595

Browse files
authored
Merge pull request #13 from akhilles/fix-encoder
Fix Encoder AsyncRead, implement AsyncBufRead
2 parents 50db988 + bf6a2d1 commit 79ef595

File tree

1 file changed

+39
-40
lines changed

1 file changed

+39
-40
lines changed

src/encoder.rs

Lines changed: 39 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use async_std::io::Read as AsyncRead;
1+
use async_std::io::{BufRead as AsyncBufRead, Read as AsyncRead};
22
use async_std::prelude::*;
33
use async_std::task::{ready, Context, Poll};
44

@@ -10,72 +10,71 @@ pin_project_lite::pin_project! {
1010
/// An SSE protocol encoder.
1111
#[derive(Debug)]
1212
pub struct Encoder {
13-
buf: Option<Vec<u8>>,
13+
buf: Box<[u8]>,
14+
cursor: usize,
1415
#[pin]
1516
receiver: async_channel::Receiver<Vec<u8>>,
16-
cursor: usize,
1717
}
1818
}
1919

2020
impl AsyncRead for Encoder {
2121
fn poll_read(
22-
mut self: Pin<&mut Self>,
22+
self: Pin<&mut Self>,
2323
cx: &mut Context<'_>,
2424
buf: &mut [u8],
2525
) -> Poll<io::Result<usize>> {
26-
// Request a new buffer if we don't have one yet.
27-
if let None = self.buf {
28-
self.buf = match ready!(Pin::new(&mut self.receiver).poll_next(cx)) {
26+
let mut this = self.project();
27+
// Request a new buffer if current one is exhausted.
28+
if this.buf.len() <= *this.cursor {
29+
match ready!(this.receiver.as_mut().poll_next(cx)) {
2930
Some(buf) => {
3031
log::trace!("> Received a new buffer with len {}", buf.len());
31-
Some(buf)
32+
*this.buf = buf.into_boxed_slice();
33+
*this.cursor = 0;
3234
}
3335
None => {
3436
log::trace!("> Encoder done reading");
3537
return Poll::Ready(Ok(0));
3638
}
3739
};
38-
};
40+
}
3941

4042
// Write the current buffer to completion.
41-
let local_buf = self.buf.as_mut().unwrap();
42-
let local_len = local_buf.len();
43+
let local_buf = &this.buf[*this.cursor..];
4344
let max = buf.len().min(local_buf.len());
4445
buf[..max].clone_from_slice(&local_buf[..max]);
45-
46-
self.cursor += max;
47-
48-
// Reset values if we're done reading.
49-
if self.cursor == local_len {
50-
self.buf = None;
51-
self.cursor = 0;
52-
};
46+
*this.cursor += max;
5347

5448
// Return bytes read.
5549
Poll::Ready(Ok(max))
5650
}
5751
}
5852

59-
// impl AsyncBufRead for Encoder {
60-
// fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
61-
// match ready!(self.project().receiver.poll_next(cx)) {
62-
// Some(buf) => match &self.buf {
63-
// None => self.project().buf = &mut Some(buf),
64-
// Some(local_buf) => local_buf.extend(buf),
65-
// },
66-
// None => {
67-
// if let None = self.buf {
68-
// self.project().buf = &mut Some(vec![]);
69-
// };
70-
// }
71-
// };
72-
// Poll::Ready(Ok(self.buf.as_ref().unwrap()))
73-
// }
74-
75-
// fn consume(self: Pin<&mut Self>, amt: usize) {
76-
// Pin::new(self).cursor += amt;
77-
// }
78-
// }
53+
impl AsyncBufRead for Encoder {
54+
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
55+
let mut this = self.project();
56+
// Request a new buffer if current one is exhausted.
57+
if this.buf.len() <= *this.cursor {
58+
match ready!(this.receiver.as_mut().poll_next(cx)) {
59+
Some(buf) => {
60+
log::trace!("> Received a new buffer with len {}", buf.len());
61+
*this.buf = buf.into_boxed_slice();
62+
*this.cursor = 0;
63+
}
64+
None => {
65+
log::trace!("> Encoder done reading");
66+
return Poll::Ready(Ok(&[]));
67+
}
68+
};
69+
}
70+
Poll::Ready(Ok(&this.buf[*this.cursor..]))
71+
}
72+
73+
fn consume(self: Pin<&mut Self>, amt: usize) {
74+
let this = self.project();
75+
*this.cursor += amt;
76+
}
77+
}
7978

8079
/// The sending side of the encoder.
8180
#[derive(Debug, Clone)]
@@ -86,7 +85,7 @@ pub fn encode() -> (Sender, Encoder) {
8685
let (sender, receiver) = async_channel::bounded(1);
8786
let encoder = Encoder {
8887
receiver,
89-
buf: None,
88+
buf: Box::default(),
9089
cursor: 0,
9190
};
9291
(Sender(sender), encoder)

0 commit comments

Comments
 (0)