Skip to content

Commit

Permalink
Draft pre-buffered stream
Browse files Browse the repository at this point in the history
  • Loading branch information
mobiusklein committed Dec 21, 2023
1 parent 4a48b6a commit 2c72a42
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 8 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,3 @@ harness = false
[package.metadata.docs.rs]
features = ["openblas", "parallelism"]
no-default-features = true

[patch.crates-io]
mzsignal = { path = "../mzsignal/" }
2 changes: 1 addition & 1 deletion src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod infer_format;

pub(crate) mod compression;

pub use crate::io::utils::DetailLevel;
pub use crate::io::utils::{DetailLevel, PreBufferedStream};
pub use crate::io::mgf::{MGFReader, MGFError, MGFWriter};
pub use crate::io::mzml::{MzMLReader, MzMLParserError, MzMLWriter};
#[cfg(feature = "async")]
Expand Down
150 changes: 146 additions & 4 deletions src/io/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,14 @@ use md5::Digest;

type ByteBuffer = io::Cursor<Vec<u8>>;

#[derive(Debug, Clone)]
#[derive(Default)]
#[derive(Debug, Clone, Default)]
pub enum FileWrapper<T: io::Read> {
FileSystem(path::PathBuf),
Stream(T),
#[default]
Empty,
}


/// Controls the level of spectral detail read from an MS data file
#[derive(Debug, Default, Clone, Copy, Hash, PartialEq, Eq)]
pub enum DetailLevel {
Expand All @@ -28,7 +26,7 @@ pub enum DetailLevel {
/// Read all spectral data, including peak data but defer decoding until later if possible
Lazy,
/// Read only the metadata of spectra, ignoring peak data entirely
MetadataOnly
MetadataOnly,
}

#[derive(Debug, Clone, Default)]
Expand Down Expand Up @@ -150,6 +148,124 @@ impl<T: io::Seek + io::Write> io::Seek for MD5HashingStream<T> {
}
}

pub struct PreBufferedStream<R: io::Read> {
stream: R,
buffer: io::Cursor<Vec<u8>>,
buffer_size: usize,
position: usize,
}

impl<R: io::Read> io::Seek for PreBufferedStream<R> {
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
match pos {
io::SeekFrom::Start(offset) => {
if self.position > self.buffer_size {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Seeking after leaving buffered prefix",
));
} else if self.position + offset as usize > self.buffer_size {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Cannot seeking beyond buffered prefix",
));
}
self.position = offset as usize;
let r = self.buffer.seek(pos);
eprintln!("Position {0} -> {1}: {r:?}", self.position, self.buffer.stream_position().unwrap());
r
}
io::SeekFrom::End(_) => {
return Err(io::Error::new(
io::ErrorKind::Unsupported,
"Cannot seek relative the end of PreBufferedStream",
))
}
io::SeekFrom::Current(offset) => {
if self.position > self.buffer_size {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Seeking after leaving buffered prefix",
));
}
if offset < 0 {
if offset.abs() as usize > self.position {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Cannot seek to negative position",
));
} else {
self.position = self.position.saturating_sub(offset.abs() as usize);
let r = self.buffer.seek(io::SeekFrom::Start(self.position as u64));
eprintln!("Position {0} -> {1}: {r:?}", self.position, self.buffer.stream_position().unwrap());
r
}
} else {
if offset as usize + self.position > self.buffer_size {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Cannot seeking beyond buffered prefix",
));
} else {
let r = self.buffer.seek(io::SeekFrom::Current(offset));
eprintln!("Position {0} -> {1}: {r:?}", self.position, self.buffer.stream_position().unwrap());
r
}
}
}
}
}
}

impl<R: io::Read> io::Read for PreBufferedStream<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let n_total = buf.len();
let before = self.position;
let n_remaining = if self.position < self.buffer_size {
let n_from_buffer = self.buffer.read(buf)?;
self.position += n_from_buffer;
n_total.saturating_sub(n_from_buffer)
} else {
n_total
};
if n_remaining > 0 {
let n_rest = self.stream.read(buf)?;
self.position += n_rest;
}
let total_read = self.position - before;
Ok(total_read)
}
}

const BUFFER_SIZE: usize = 2usize.pow(16);

impl<R: io::Read> PreBufferedStream<R> {
pub fn new(stream: R) -> io::Result<Self> {
Self::new_with_buffer_size(stream, BUFFER_SIZE)
}

pub fn new_with_buffer_size(stream: R, buffer_size: usize) -> io::Result<Self> {
let buffer = io::Cursor::new(Vec::with_capacity(buffer_size));
let mut inst = Self {
stream,
buffer_size,
buffer,
position: 0,
};
inst.prefill_buffer()?;
Ok(inst)
}

fn prefill_buffer(&mut self) -> io::Result<usize> {
let mut buffer = self.buffer.get_mut();
buffer.resize(self.buffer_size, 0);
let bytes_read = self.stream.read(&mut buffer)?;
buffer.shrink_to(bytes_read);
self.buffer_size = bytes_read;
Ok(bytes_read)
}
}

#[cfg(test)]
mod test {
use super::*;
Expand All @@ -168,4 +284,30 @@ mod test {
assert_eq!(out, b"foobar");
}
}

#[test]
fn test_prebuffering() -> io::Result<()> {
let mut fh = fs::File::open("./test/data/batching_test.mzML")?;
let mut data = Vec::new();
fh.read_to_end(&mut data)?;
let content = io::Cursor::new(data);
let mut stream = PreBufferedStream::new_with_buffer_size(content, 512)?;

assert_eq!(stream.buffer_size, 512);

let mut buffer = [0u8; 128];
stream.read(&mut buffer)?;
assert_eq!(buffer.len(), 128);
assert!(buffer.starts_with(b"<?xml version=\"1.0\" encoding=\"utf-8\"?>"));

let mut buffer2 = [0u8; 128];
stream.seek(io::SeekFrom::Start(0))?;
stream.read(&mut buffer2)?;

assert_eq!(buffer, buffer2);

assert!(stream.seek(io::SeekFrom::Start(556)).is_err());

Ok(())
}
}

0 comments on commit 2c72a42

Please sign in to comment.