Skip to content

Commit

Permalink
Fix up infer_from_stream on compressed streams
Browse files Browse the repository at this point in the history
  • Loading branch information
mobiusklein committed Jan 13, 2024
1 parent daae3d4 commit c213497
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 24 deletions.
5 changes: 5 additions & 0 deletions examples/averaging_writer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*!
* Demo how to use the deferred spectrum averaging iterator with `rayon`
* to quickly average over an LC-MS run and write the averaged spectra
* out to an mzML file on disk.
*/
use std::env;
use std::io;
use std::path;
Expand Down
45 changes: 45 additions & 0 deletions examples/compressed_mzml.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*!
* Demo how to read an mzML file that is compressed
*/
use std::env;
use std::fs;
use std::io;
use std::process::exit;
use std::time::Instant;

use mzdata::io::PreBufferedStream;
use mzdata::io::{MzMLReader, RestartableGzDecoder};
use mzdata::prelude::*;

fn main() -> io::Result<()> {
let input = env::args().skip(1).next().unwrap_or_else(|| {
eprintln!("Please provide a file path or '-' for STDIN");
exit(1)
});
let start = Instant::now();
let groups = if input == "-" {
let stream = RestartableGzDecoder::new(io::BufReader::new(PreBufferedStream::new(io::stdin())?));
let reader = MzMLReader::new(stream);
let groups: Vec<_> = reader.into_groups().collect();
groups
} else {
let stream = RestartableGzDecoder::new(io::BufReader::new(fs::File::open(input)?));
let reader = MzMLReader::new(stream);
let groups: Vec<_> = reader.into_groups().collect();
groups
};
let spectra: Vec<_> = groups
.iter()
.flat_map(|g| g.precursor.iter().chain(g.products.iter()))
.collect();
let end = Instant::now();
eprintln!(
"Read {} groups with {} spectra in {:0.3?}",
groups.len(),
spectra.len(),
end - start
);
assert!(spectra.len() > 0);

Ok(())
}
55 changes: 38 additions & 17 deletions examples/from_stdin.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,54 @@
use std::io;
/*!
* Demo how to read a data file from STDIN, and then batch collect its spectra
*/
use std::io::{self, Seek};
use std::time::Instant;

use mzdata::{MzMLReader, MGFReader};
use mzdata::io::{PreBufferedStream, ScanSource, infer_from_stream, MassSpectrometryFormat};
use mzdata::io::{
infer_from_stream, MassSpectrometryFormat, PreBufferedStream, RestartableGzDecoder, ScanSource,
};
use mzdata::{MGFReader, MzMLReader};

fn main() -> io::Result<()> {
let start = Instant::now();
let stream = io::stdin();
let mut stream = PreBufferedStream::new(stream)?;
let (fmt, compressed) = infer_from_stream(&mut stream)?;
if compressed {
panic!("Compression not supported!")
}
stream.seek(io::SeekFrom::Start(0))?;
let groups: Vec<_> = match fmt {
MassSpectrometryFormat::MGF => {
MGFReader::new(stream).into_groups().collect()
},
if compressed {
MGFReader::new(RestartableGzDecoder::new(io::BufReader::new(stream)))
.into_groups()
.collect()
} else {
MGFReader::new(stream).into_groups().collect()
}
}
MassSpectrometryFormat::MzML => {
MzMLReader::new(stream).into_groups().collect()
},
_ => {
panic!("Cannot identify file format")
if compressed {
MzMLReader::new(RestartableGzDecoder::new(io::BufReader::new(stream)))
.into_groups()
.collect()
} else {
MzMLReader::new(stream).into_groups().collect()
}
}
x => {
panic!("Cannot identify file format ({:?})", x)
}
};
let spectra: Vec<_> = groups.iter().flat_map(|g| {
g.precursor.iter().chain(g.products.iter())
}).collect();
let spectra: Vec<_> = groups
.iter()
.flat_map(|g| g.precursor.iter().chain(g.products.iter()))
.collect();
let end = Instant::now();
eprintln!("Read {} groups with {} spectra in {:0.3?}", groups.len(), spectra.len(), end - start);
eprintln!(
"Read {} groups with {} spectra in {:0.3?}",
groups.len(),
spectra.len(),
end - start
);
assert!(spectra.len() > 0);
Ok(())
}
}
27 changes: 27 additions & 0 deletions examples/infer_format.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*!
* Demo the minimum code needed to infer the input file format from a path
* or STDIN using `infer_format`, `infer_from_stream` and
*/
use std::process::exit;
use std::env;
use std::io;

use mzdata::io::{infer_format, infer_from_stream, PreBufferedStream};



fn main() -> io::Result<()> {
let input = env::args().skip(1).next().unwrap_or_else(|| {
eprintln!("Please provide a file path or '-' for STDIN");
exit(1)
});

let (inferred, gzipped) = if input == "-" {
let mut stream = PreBufferedStream::new(io::stdin())?;
infer_from_stream(&mut stream)?
} else {
infer_format(input)?
};
println!("{:?} (gzip: {})", inferred, gzipped);
Ok(())
}
4 changes: 3 additions & 1 deletion examples/mzcat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::time;

use rayon::prelude::*;

use mzdata::io::{mzml, mzmlb};
use mzdata::io::mzml;
#[cfg(feature = "mzmlb")]
use mzdata::io::mzmlb;
use mzdata::prelude::*;
use mzdata::spectrum::MultiLayerSpectrum;

Expand Down
1 change: 1 addition & 0 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ pub use crate::io::traits::{
SpectrumGrouping, SpectrumIterator, StreamingSpectrumIterator,
};
pub use crate::io::utils::{DetailLevel, PreBufferedStream};
pub use compression::RestartableGzDecoder;
19 changes: 14 additions & 5 deletions src/io/infer_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,24 @@ pub fn infer_from_path<P: Into<path::PathBuf>,>(path: P) -> (MassSpectrometryFor
/// Given a stream of bytes, infer the file format and whether or not the
/// stream is GZIP compressed. This assumes the stream is seekable.
pub fn infer_from_stream<R: Read + Seek>(stream: &mut R) -> io::Result<(MassSpectrometryFormat, bool)> {
let mut buf = Vec::with_capacity(100);
buf.resize(100, b'\0');
// We need to read in at least enough bytes to span a complete XML head plus the
// end of an opening tag
let mut buf = Vec::with_capacity(500);
buf.resize(500, b'\0');
let current_pos = stream.stream_position()?;
stream.read_exact(buf.as_mut_slice())?;
// record how many bytes were actually read so we know the upper bound
let bytes_read = stream.read(buf.as_mut_slice())?;
buf.shrink_to(bytes_read);
let is_stream_gzipped = is_gzipped(buf.as_slice());
if is_stream_gzipped {
let mut decoder = GzDecoder::new(buf.as_slice());
let mut decompressed_buf = Vec::new();
decoder.read_to_end(&mut decompressed_buf)?;
// In the worst case, we can't have fewer bytes than those that were read in (minus the size of the gzip header)
// and we assume the compression ratio means we have recouped that. We read in only that many bytes
// decompressed because the decompressor treats an incomplete segment as an error and thus using
// io::Read::read_to_end is not an option.
decompressed_buf.resize(bytes_read, b'\0');
let mut decoder = GzDecoder::new(io::Cursor::new(buf));
decoder.read(&mut decompressed_buf)?;
buf = decompressed_buf;
}
stream.seek(io::SeekFrom::Start(current_pos))?;
Expand Down
8 changes: 7 additions & 1 deletion src/spectrum/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,13 @@ impl<
return match self.queue.len() {
d if d > 1 => self.deque_group(false),
1 => self.deque_group(true),
_ => None,
_ => {
if self.product_scan_mapping.len() > 0 {
Some(self.deque_group_without_precursor())
} else {
None
}
},
};
}
}
Expand Down
Binary file added test/data/small.mzML.gz
Binary file not shown.

0 comments on commit c213497

Please sign in to comment.