From c26db8954fbc112f44643effda235eb5393daae8 Mon Sep 17 00:00:00 2001 From: Joshua Klein Date: Sun, 7 Jan 2024 18:01:12 -0500 Subject: [PATCH] Clean up group iteration --- Cargo.toml | 3 + examples/random_access_iter.rs | 55 +++++ src/io/mzmlb/reader.rs | 14 +- src/io/traits.rs | 53 ++--- src/meta/file_description.rs | 31 ++- src/params.rs | 78 ++++++- src/prelude.rs | 5 +- src/spectrum.rs | 23 +- src/spectrum/bindata/array.rs | 11 + src/spectrum/group.rs | 408 +++++++++++++++++++++++---------- 10 files changed, 504 insertions(+), 177 deletions(-) create mode 100644 examples/random_access_iter.rs diff --git a/Cargo.toml b/Cargo.toml index 8a7f899..525bff7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,9 @@ required-features = ["parallelism"] name = "averaging_writer" required-features = ["parallelism", "mzsignal", "nalgebra"] +[[example]] +name = "random_access_iter" +required-features = ["nalgebra"] [lib] name = "mzdata" diff --git a/examples/random_access_iter.rs b/examples/random_access_iter.rs new file mode 100644 index 0000000..ca825c5 --- /dev/null +++ b/examples/random_access_iter.rs @@ -0,0 +1,55 @@ +use std::{env, io, path}; + +use mzdata::io::mzml; +use mzdata::prelude::*; + +fn main() -> io::Result<()> { + let path = path::PathBuf::from( + env::args() + .nth(1) + .expect("Please pass an MS data file path"), + // "test/data/batching_test.mzML" + ); + + let mut reader = mzml::MzMLReader::open_path(path)?; + + let n_spectra = reader.len(); + + // Find the spectrum at the midpoint of the run + let spec = reader.get_spectrum_by_index(n_spectra / 2).unwrap(); + eprintln!( + "Midpoint spectrum {} (level {}) at time {}", + spec.id(), + spec.ms_level(), + spec.start_time() + ); + + // Jump the iterator to that point in time + reader.start_from_time(spec.start_time())?; + let s = reader.next().unwrap(); + eprintln!("Resuming at {} (level {}) at time {}", s.id(), s.ms_level(), s.start_time()); + + // Convert the iterator into a group iterator + let mut group_iter = reader.into_groups(); + // Jump the group iterator to that point in time (If an MSn spectrum was found, the next MS1 may be shown instead) + group_iter.start_from_time(spec.start_time())?; + let g = group_iter.next().unwrap(); + eprintln!( + "Resuming at group having {:?} at time {:?}", + g.earliest_spectrum().and_then(|s| Some(s.id())), + g.earliest_spectrum().and_then(|s| Some(s.start_time())) + ); + + // Convert the group iterator into an averaging group iterator + let mut avg_iter = group_iter.averaging(1, 200.0, 2200.0, 0.005); + // Jump the group iterator to that point in time (If an MSn spectrum was found, the next MS1 may be shown instead) + avg_iter.start_from_time(spec.start_time())?; + let g = avg_iter.next().unwrap(); + eprintln!( + "Resuming at group having {:?} at time {:?}", + g.earliest_spectrum().and_then(|s| Some(s.id())), + g.earliest_spectrum().and_then(|s| Some(s.start_time())) + ); + + Ok(()) +} diff --git a/src/io/mzmlb/reader.rs b/src/io/mzmlb/reader.rs index 16fd8be..3f0faeb 100644 --- a/src/io/mzmlb/reader.rs +++ b/src/io/mzmlb/reader.rs @@ -687,9 +687,9 @@ pub struct MzMLbReaderType< } impl<'a, 'b: 'a, C: CentroidPeakAdapting + BuildFromArrayMap, D: DeconvolutedPeakAdapting + BuildFromArrayMap> MzMLbReaderType { - /// Create a new `[MzMLbReader]` with an internal cache size of `chunk_size` elements + /// Create a new [`MzMLbReader`] with an internal cache size of `chunk_size` elements /// per data array to reduce the number of disk reads needed to populate spectra, and - /// sets the `[DetailLevel]`. + /// sets the [`DetailLevel`]. pub fn with_chunk_size_and_detail_level>( path: &P, chunk_size: usize, @@ -733,7 +733,7 @@ impl<'a, 'b: 'a, C: CentroidPeakAdapting + BuildFromArrayMap, D: DeconvolutedPea Ok(inst) } - /// Create a new `[MzMLbReader]` with an internal cache size of `chunk_size` elements + /// Create a new [`MzMLbReader`] with an internal cache size of `chunk_size` elements /// per data array to reduce the number of disk reads needed to populate spectra. /// /// The default chunk size is 2**20 elements, which can use as much as 8.4 MB for 64-bit @@ -772,7 +772,7 @@ impl<'a, 'b: 'a, C: CentroidPeakAdapting + BuildFromArrayMap, D: DeconvolutedPea Ok(buf) } - /// Create a new `[MzMLbReader]` with the default caching behavior. + /// Create a new [`MzMLbReader`] with the default caching behavior. pub fn new>(path: &P) -> io::Result { Self::with_chunk_size(path, ExternalDataRegistry::default_chunk_size()) } @@ -870,7 +870,9 @@ impl<'a, 'b: 'a, C: CentroidPeakAdapting + BuildFromArrayMap, D: DeconvolutedPea } } -/// [`MzMLReaderType`] instances are [`Iterator`]s over [`MultiLayerSpectrum`] +/// [`MzMLbReaderType`] instances are [`Iterator`]s over [`MultiLayerSpectrum`], like all +/// file format readers. This involves advancing the position of the internal mzML file +/// reader in-place without seeking. impl Iterator for MzMLbReaderType { type Item = MultiLayerSpectrum; @@ -992,7 +994,7 @@ impl Self { diff --git a/src/io/traits.rs b/src/io/traits.rs index 5850edf..59d21d3 100644 --- a/src/io/traits.rs +++ b/src/io/traits.rs @@ -368,6 +368,27 @@ pub enum SpectrumAccessError { IOError(#[source] Option), } +impl From for io::Error { + fn from(value: SpectrumAccessError) -> Self { + let s = value.to_string(); + match value { + SpectrumAccessError::SpectrumNotFound => io::Error::new(io::ErrorKind::NotFound, s), + SpectrumAccessError::SpectrumIdNotFound(_) => io::Error::new(io::ErrorKind::NotFound, s), + SpectrumAccessError::SpectrumIndexNotFound(_) => io::Error::new(io::ErrorKind::NotFound, s), + SpectrumAccessError::IOError(e) => { + match e { + Some(e) => { + e + }, + None => { + io::Error::new(io::ErrorKind::Other, s) + }, + } + }, + } + } +} + /// An extension of [`ScanSource`] that supports relocatable iteration relative to a /// specific spectrum coordinate or identifier. pub trait RandomAccessSpectrumIterator< @@ -667,6 +688,7 @@ pub trait SpectrumGrouping< } } +/// Analogous to to [`RandomAccessSpectrumIterator`], but for [`SpectrumGrouping`] implementations. pub trait RandomAccessSpectrumGroupingIterator< C: CentroidLike + Default = CentroidPeak, D: DeconvolutedCentroidLike + Default = DeconvolutedPeak, @@ -677,30 +699,9 @@ pub trait RandomAccessSpectrumGroupingIterator< fn start_from_id(&mut self, id: &str) -> Result<&Self, SpectrumAccessError>; fn start_from_index(&mut self, index: usize) -> Result<&Self, SpectrumAccessError>; fn start_from_time(&mut self, time: f64) -> Result<&Self, SpectrumAccessError>; + fn reset_state(&mut self); } -impl< - R: RandomAccessSpectrumIterator, - C: CentroidLike + Default, - D: DeconvolutedCentroidLike + Default, - S: SpectrumLike, - G: SpectrumGrouping, - > RandomAccessSpectrumGroupingIterator for SpectrumGroupingIterator -{ - fn start_from_id(&mut self, id: &str) -> Result<&Self, SpectrumAccessError> { - self.start_from_id(id) - } - - fn start_from_index(&mut self, index: usize) -> Result<&Self, SpectrumAccessError> { - self.start_from_index(index) - } - - fn start_from_time(&mut self, time: f64) -> Result<&Self, SpectrumAccessError> { - self.start_from_time(time) - } -} - - /// A collection of spectra held in memory but providing an interface /// identical to a data file. This structure owns its data, so in order @@ -711,7 +712,7 @@ pub struct MemoryScanSource< D: DeconvolutedCentroidLike + Default = DeconvolutedPeak, S: SpectrumLike = MultiLayerSpectrum, > { - spectra: Vec, + spectra: VecDeque, position: usize, offsets: OffsetIndex, _c: PhantomData, @@ -724,7 +725,7 @@ impl< S: SpectrumLike + Clone, > MemoryScanSource { - pub fn new(spectra: Vec) -> Self { + pub fn new(spectra: VecDeque) -> Self { let mut offsets = OffsetIndex::new("spectrum".to_string()); spectra.iter().enumerate().for_each(|(i, s)| { offsets.insert(s.id().to_string(), i as u64); @@ -834,9 +835,9 @@ impl< C: CentroidLike + Default, D: DeconvolutedCentroidLike + Default, S: SpectrumLike + Clone, - > From> for MemoryScanSource + > From> for MemoryScanSource { - fn from(value: Vec) -> Self { + fn from(value: VecDeque) -> Self { Self::new(value) } } diff --git a/src/meta/file_description.rs b/src/meta/file_description.rs index 2ac904b..0114cbe 100644 --- a/src/meta/file_description.rs +++ b/src/meta/file_description.rs @@ -1,5 +1,5 @@ use crate::impl_param_described; -use crate::params::{Param, ParamDescribed, ParamList}; +use crate::params::{Param, ParamDescribed, ParamList, CURIE, ControlledVocabulary}; #[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct SourceFile { @@ -17,6 +17,35 @@ pub struct FileDescription { pub source_files: Vec, } +impl FileDescription { + pub fn new(contents: ParamList, source_files: Vec) -> Self { + Self { + contents, + source_files, + } + } + + /// Checks to see if the "MS1 spectrum" term is present in the file contents + /// + /// **Note**: This does not actually inspect the spectra in the file, only the metadata, + /// which may be incorrect/missing. + pub fn has_ms1_spectra(&self) -> bool { + self.get_param_by_curie(&CURIE::new(ControlledVocabulary::MS, 1000579)).is_some() + } + + /// Checks to see if the "MSn spectrum" term is present in the file contents. + /// + /// **Note**: This does not actually inspect the spectra in the file, only the metadata, + /// which may be incorrect/missing. + pub fn has_msn_spectra(&self) -> bool { + self.get_param_by_curie(&CURIE::new(ControlledVocabulary::MS, 1000580)).is_some() + } + + pub fn has_contents(&self) -> bool { + self.contents.len() > 0 + } +} + impl_param_described!(SourceFile); impl ParamDescribed for FileDescription { diff --git a/src/params.rs b/src/params.rs index 8636625..f8c10b3 100644 --- a/src/params.rs +++ b/src/params.rs @@ -4,8 +4,11 @@ use std::borrow::Cow; use std::collections::HashMap; use std::fmt::Display; +use std::num; use std::str::{self, FromStr}; +use thiserror::Error; + #[doc(hidden)] #[derive(Debug, Clone, PartialEq, PartialOrd)] pub enum ValueType { @@ -15,6 +18,61 @@ pub enum ValueType { Other(Box>) } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct CURIE { + controlled_vocabulary: ControlledVocabulary, + accession: u32 +} + +impl CURIE { + pub const fn new(cv_id: ControlledVocabulary, accession: u32) -> Self { Self { controlled_vocabulary: cv_id, accession } } +} + +impl PartialEq for CURIE { + fn eq(&self, other: &T) -> bool { + if other.is_controlled() { + false + } else { + if other.controlled_vocabulary().unwrap() != self.controlled_vocabulary { + false + } else if other.accession().unwrap() != self.accession { + false + } else { + true + } + } + } +} + +#[derive(Debug, Error)] +pub enum CURIEParsingError { + #[error("{0} is not a recognized controlled vocabulary")] + UnknownControlledVocabulary(#[from] #[source] ControlledVocabularyResolutionError), + #[error("Failed to parse accession number {0}")] + AccessionParsingError(#[from] #[source] num::ParseIntError), + #[error("Did not detect a namespace separator ':' token")] + MissingNamespaceSeparator +} + +impl FromStr for CURIE { + type Err = CURIEParsingError; + + fn from_str(s: &str) -> Result { + let mut tokens = s.split(":"); + let cv = tokens.next().unwrap(); + let accession = tokens.next(); + if accession.is_none() { + Err(CURIEParsingError::MissingNamespaceSeparator) + } else{ + + let cv: ControlledVocabulary = cv.parse::()?; + + let accession = accession.unwrap().parse()?; + Ok(CURIE::new(cv, accession)) + } + } +} + pub fn curie_to_num(curie: &str) -> (Option, Option) { let mut parts = curie.split(':'); let prefix = match parts.next() { @@ -46,10 +104,6 @@ pub trait ParamLike { } } - // fn parse(&self) -> Result { - // self.value().parse::() - // } - fn parse(&self) -> Result { self.value().parse::() } @@ -353,15 +407,13 @@ impl ControlledVocabulary { } #[doc(hidden)] -#[derive(Debug, Clone)] -pub enum ControlledVocabularyResolutionError {} - -impl Display for ControlledVocabularyResolutionError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(format!("{:?}", self).as_str()) - } +#[derive(Debug, Clone, Error)] +pub enum ControlledVocabularyResolutionError { + #[error("Unrecognized controlled vocabulary {0}")] + UnknownControlledVocabulary(String) } + impl FromStr for ControlledVocabulary { type Err = ControlledVocabularyResolutionError; @@ -394,6 +446,10 @@ pub trait ParamDescribed { self.params().iter().find(|¶m| param.name == name) } + fn get_param_by_curie(&self, curie: &CURIE) -> Option<&Param> { + self.params().iter().find(|¶m| curie == param) + } + fn get_param_by_accession(&self, accession: &str) -> Option<&Param> { let (cv, acc_num) = curie_to_num(accession); return self diff --git a/src/prelude.rs b/src/prelude.rs index efa0fdc..dd8301c 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -1,7 +1,7 @@ //! A set of foundational traits used throughout the library. pub use crate::io::traits::{ MZFileReader, RandomAccessSpectrumIterator, SpectrumAccessError, ScanSource, ScanWriter, SeekRead, - SpectrumGrouping, SpectrumIterator, + SpectrumGrouping, SpectrumIterator, RandomAccessSpectrumGroupingIterator, }; pub use crate::meta::MSDataFileMetadata; @@ -9,6 +9,9 @@ pub use crate::params::{ParamDescribed, ParamLike}; pub use crate::spectrum::{IonProperties, PrecursorSelection, SpectrumLike}; pub use crate::spectrum::bindata::{ByteArrayView, ByteArrayViewMut, BuildArrayMapFrom, BuildFromArrayMap}; +#[cfg(feature = "mzsignal")] +pub use crate::spectrum::group::SpectrumGroupAveraging; + #[doc(hidden)] pub use std::convert::TryInto; #[doc(hidden)] diff --git a/src/spectrum.rs b/src/spectrum.rs index d9bf3d4..5a20b8c 100644 --- a/src/spectrum.rs +++ b/src/spectrum.rs @@ -52,23 +52,26 @@ for spectrum in reader { */ -pub(crate) mod scan_properties; pub mod bindata; +pub(crate) mod chromatogram; pub(crate) mod group; +pub(crate) mod scan_properties; pub(crate) mod spectrum; -pub(crate) mod chromatogram; pub mod utils; -pub use crate::spectrum::scan_properties::*; pub use crate::spectrum::bindata::{ArrayType, BinaryArrayMap, BinaryDataArrayType, DataArray}; -pub use crate::spectrum::spectrum::{ - SpectrumLike, PeakDataLevel, - SpectrumConversionError, SpectrumProcessingError, - CentroidSpectrum, RawSpectrum, Spectrum, DeconvolutedSpectrum, - MultiLayerSpectrum, CentroidSpectrumType, DeconvolutedSpectrumType}; pub use crate::spectrum::chromatogram::{Chromatogram, ChromatogramLike}; +pub use crate::spectrum::scan_properties::*; +pub use crate::spectrum::spectrum::{ + CentroidSpectrum, CentroidSpectrumType, DeconvolutedSpectrum, DeconvolutedSpectrumType, + MultiLayerSpectrum, PeakDataLevel, RawSpectrum, Spectrum, SpectrumConversionError, + SpectrumLike, SpectrumProcessingError, +}; -pub use group::{SpectrumGroup, SpectrumGroupIter, SpectrumGroupingIterator}; +pub use group::{SpectrumGroup, SpectrumGroupIter, SpectrumGroupingIterator, SpectrumGroupIntoIter}; #[cfg(feature = "mzsignal")] -pub use group::{average_spectra, SpectrumAveragingIterator, DeferredSpectrumAveragingIterator}; \ No newline at end of file +pub use group::{ + average_spectra, DeferredSpectrumAveragingIterator, SpectrumAveragingIterator, + SpectrumGroupAveraging +}; diff --git a/src/spectrum/bindata/array.rs b/src/spectrum/bindata/array.rs index 426d997..572659d 100644 --- a/src/spectrum/bindata/array.rs +++ b/src/spectrum/bindata/array.rs @@ -419,4 +419,15 @@ mod test { assert_eq!(view.len(), 19800); Ok(()) } + + #[test] + fn test_decode_empty() { + let mut da = DataArray::wrap(&ArrayType::MZArray, BinaryDataArrayType::Float64, Vec::new()); + da.compression = BinaryCompressionType::Zlib; + + assert_eq!(da.data.len(), 0); + assert_eq!(da.data_len().unwrap(), 0); + assert_eq!(da.decode().unwrap().len(), 0); + assert_eq!(da.to_f64().unwrap().len(), 0); + } } \ No newline at end of file diff --git a/src/spectrum/group.rs b/src/spectrum/group.rs index f298878..7f122d3 100644 --- a/src/spectrum/group.rs +++ b/src/spectrum/group.rs @@ -1,12 +1,16 @@ use std::{ collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, - marker::PhantomData, mem, + marker::PhantomData, + mem, }; use mzpeaks::{CentroidLike, CentroidPeak, DeconvolutedCentroidLike, DeconvolutedPeak}; use crate::{ - io::{traits::SpectrumGrouping, RandomAccessSpectrumIterator, ScanSource, SpectrumAccessError}, + io::{ + traits::{RandomAccessSpectrumGroupingIterator, SpectrumGrouping}, + RandomAccessSpectrumIterator, SpectrumAccessError, + }, SpectrumLike, }; @@ -31,13 +35,17 @@ where deconvoluted_type: PhantomData, } -impl SpectrumGroup +impl IntoIterator for SpectrumGroup where C: CentroidLike + Default, D: DeconvolutedCentroidLike + Default, S: SpectrumLike + Default, { - pub fn into_iter(self) -> SpectrumGroupIntoIter { + type Item = S; + + type IntoIter = SpectrumGroupIntoIter; + + fn into_iter(self) -> Self::IntoIter { SpectrumGroupIntoIter::new(self) } } @@ -61,21 +69,26 @@ enum SpectrumGroupIterState { Done, } - pub struct SpectrumGroupIntoIter< C: CentroidLike + Default = CentroidPeak, D: DeconvolutedCentroidLike + Default = DeconvolutedPeak, S: SpectrumLike + Default = MultiLayerSpectrum, - G: SpectrumGrouping = SpectrumGroup + G: SpectrumGrouping = SpectrumGroup, > { group: G, state: SpectrumGroupIterState, _c: PhantomData, _d: PhantomData, - _s: PhantomData + _s: PhantomData, } -impl + Default, G: SpectrumGrouping> Iterator for SpectrumGroupIntoIter { +impl< + C: CentroidLike + Default, + D: DeconvolutedCentroidLike + Default, + S: SpectrumLike + Default, + G: SpectrumGrouping, + > Iterator for SpectrumGroupIntoIter +{ type Item = S; fn next(&mut self) -> Option { @@ -136,7 +149,6 @@ impl< _c: PhantomData, _d: PhantomData, _s: PhantomData, - } } @@ -145,20 +157,19 @@ impl< } } - /// Iterate over the spectra in [`SpectrumGroup`] pub struct SpectrumGroupIter< 'a, C: CentroidLike + Default = CentroidPeak, D: DeconvolutedCentroidLike + Default = DeconvolutedPeak, S: SpectrumLike = MultiLayerSpectrum, - G: SpectrumGrouping = SpectrumGroup + G: SpectrumGrouping = SpectrumGroup, > { group: &'a G, state: SpectrumGroupIterState, _c: PhantomData, _d: PhantomData, - _s: PhantomData + _s: PhantomData, } impl< @@ -166,7 +177,7 @@ impl< C: CentroidLike + Default, D: DeconvolutedCentroidLike + Default, S: SpectrumLike + 'a, - G: SpectrumGrouping + G: SpectrumGrouping, > Iterator for SpectrumGroupIter<'a, C, D, S, G> { type Item = &'a S; @@ -229,7 +240,6 @@ impl< _c: PhantomData, _d: PhantomData, _s: PhantomData, - } } @@ -284,7 +294,7 @@ where } } -#[derive(Default)] +#[derive(Default, Debug)] pub(crate) struct GenerationTracker { generation_to_id: HashMap>, id_to_generation: HashMap, @@ -366,6 +376,9 @@ impl GenerationTracker { } } + +const MAX_GROUP_DEPTH: u32 = 512u32; + /** A wrapper for [`Iterator`]-implementors that will batch together all MSn spectra with their associated MS1 spectrum, producing [`SpectrumGroup`] @@ -374,6 +387,7 @@ instances. This type emulates the same interface that [`Iterator`] exposes, save that instead of yield individual [`Spectrum`](crate::spectrum::spectrum::Spectrum), it yields [`SpectrumGroup`] instead. */ +#[derive(Debug)] pub struct SpectrumGroupingIterator< R: Iterator, C: CentroidLike + Default = CentroidPeak, @@ -388,6 +402,7 @@ pub struct SpectrumGroupingIterator< buffering: usize, highest_ms_level: u8, generation: usize, + depth: u32, passed_first_ms1: bool, phantom: PhantomData, centroid_type: PhantomData, @@ -420,11 +435,13 @@ impl< queue: VecDeque::new(), highest_ms_level: 0, generation: 0, + depth: 0, passed_first_ms1: false, } } fn add_product(&mut self, scan: S) { + self.depth += 1; if let Some(precursor) = scan.precursor() { match precursor.precursor_id.as_ref() { Some(prec_id) => { @@ -541,6 +558,14 @@ impl< } } + fn deque_group_without_precursor(&mut self) -> G { + let mut group = G::default(); + self.flush_all_products(&mut group); + self.generation += 1; + self.depth = 0; + group + } + fn deque_group(&mut self, flush_all: bool) -> Option { let mut group = G::default(); if let Some(precursor) = self.queue.pop_front() { @@ -565,6 +590,7 @@ impl< if flush_all { self.flush_all_products(&mut group); } + self.depth = 0; Some(group) } else { None @@ -575,6 +601,9 @@ impl< self.product_scan_mapping.clear(); self.queue.clear(); self.generation_tracker.clear(); + self.generation = 0; + self.depth = 0; + self.passed_first_ms1 = false; } /** @@ -583,31 +612,36 @@ impl< full. */ pub fn next_group(&mut self) -> Option { - if let Some(spectrum) = self.source.next() { - let level = spectrum.ms_level(); - if level > self.highest_ms_level { - self.highest_ms_level = level; - } - if level > 1 { - self.add_product(spectrum); - self.next_group() - } else if self.add_precursor(spectrum) { - self.deque_group(false) + loop { + if let Some(spectrum) = self.source.next() { + let level = spectrum.ms_level(); + if level > self.highest_ms_level { + self.highest_ms_level = level; + } + if level > 1 { + self.add_product(spectrum); + if self.depth > MAX_GROUP_DEPTH { + return Some(self.deque_group_without_precursor()); + + } + } else if self.add_precursor(spectrum) { + return self.deque_group(false); + + } } else { - self.next_group() - } - } else { - match self.queue.len() { - d if d > 1 => self.deque_group(false), - 1 => self.deque_group(true), - _ => None, + return match self.queue.len() { + d if d > 1 => self.deque_group(false), + 1 => self.deque_group(true), + _ => None, + }; + } } } } impl< - R: ScanSource, + R: Iterator, C: CentroidLike + Default, D: DeconvolutedCentroidLike + Default, S: SpectrumLike, @@ -660,8 +694,34 @@ impl< } } +impl< + R: RandomAccessSpectrumIterator, + C: CentroidLike + Default, + D: DeconvolutedCentroidLike + Default, + S: SpectrumLike, + G: SpectrumGrouping + Default, + > RandomAccessSpectrumGroupingIterator for SpectrumGroupingIterator +{ + fn start_from_id(&mut self, id: &str) -> Result<&Self, SpectrumAccessError> { + self.start_from_id(id) + } + + fn start_from_index(&mut self, index: usize) -> Result<&Self, SpectrumAccessError> { + self.start_from_index(index) + } + + fn start_from_time(&mut self, time: f64) -> Result<&Self, SpectrumAccessError> { + self.start_from_time(time) + } + + fn reset_state(&mut self) { + self.clear() + } +} + #[cfg(feature = "mzsignal")] mod mzsignal_impl { + use std::borrow::Cow; use std::sync::Arc; use crate::spectrum::bindata::{to_bytes, BuildArrayMapFrom, BuildFromArrayMap}; @@ -791,29 +851,31 @@ mod mzsignal_impl { } } - #[derive(Debug)] + #[derive(Debug, Default)] pub struct SpectrumAveragingContext< C: CentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, D: DeconvolutedCentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, - G: SpectrumGrouping> + G: SpectrumGrouping>, > { pub group: G, ms1_context: Vec, _c: PhantomData, - _d: PhantomData + _d: PhantomData, } impl< C: CentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, D: DeconvolutedCentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, - G: SpectrumGrouping> + G: SpectrumGrouping>, > SpectrumAveragingContext { - pub fn new( - group: G, - ms1_context: Vec, - ) -> Self { - Self { group, ms1_context, _c: PhantomData, _d: PhantomData} + pub fn new(group: G, ms1_context: Vec) -> Self { + Self { + group, + ms1_context, + _c: PhantomData, + _d: PhantomData, + } } pub fn reprofile_with(mut self, reprofiler: &PeakSetReprofiler, fwhm: f32) -> Self { @@ -861,13 +923,7 @@ mod mzsignal_impl { (self.group, pair_avgd) } - pub fn average_with( - self, - averager: &mut SignalAverager, - ) -> ( - G, - ArrayPair<'static>, - ) { + pub fn average_with(self, averager: &mut SignalAverager) -> (G, ArrayPair<'static>) { averager.array_pairs.clear(); for arrays in self.ms1_context.iter() { @@ -907,10 +963,37 @@ mod mzsignal_impl { } } + impl< + C: CentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, + D: DeconvolutedCentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, + G: SpectrumGrouping>, + > SpectrumGrouping> for SpectrumAveragingContext + { + fn precursor(&self) -> Option<&MultiLayerSpectrum> { + self.group.precursor() + } + + fn precursor_mut(&mut self) -> Option<&mut MultiLayerSpectrum> { + self.group.precursor_mut() + } + + fn set_precursor(&mut self, prec: MultiLayerSpectrum) { + self.group.set_precursor(prec); + } + + fn products(&self) -> &[MultiLayerSpectrum] { + self.group.products() + } + + fn products_mut(&mut self) -> &mut Vec> { + self.group.products_mut() + } + } + pub struct DeferredSpectrumAveragingIterator< C: CentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, D: DeconvolutedCentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, - R: Iterator, + R: Iterator, G: SpectrumGrouping>, > { source: R, @@ -923,7 +1006,7 @@ mod mzsignal_impl { impl< C: CentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, D: DeconvolutedCentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, - R: Iterator, + R: Iterator, G: SpectrumGrouping>, > Iterator for DeferredSpectrumAveragingIterator { @@ -937,14 +1020,11 @@ mod mzsignal_impl { impl< C: CentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, D: DeconvolutedCentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, - R: Iterator, - G: SpectrumGrouping> + R: Iterator, + G: SpectrumGrouping>, > DeferredSpectrumAveragingIterator { - pub fn new( - source: R, - averaging_width_index: usize, - ) -> Self { + pub fn new(source: R, averaging_width_index: usize) -> Self { let mut inst = Self { source, averaging_width_index, @@ -956,6 +1036,13 @@ mod mzsignal_impl { inst } + fn reset_buffers(&mut self) { + self.buffer.clear(); + self.context_buffer.clear(); + self.output_buffer.clear(); + self.initial_feed(); + } + pub fn into_inner(self) -> R { self.source } @@ -972,10 +1059,7 @@ mod mzsignal_impl { self.averaging_width_index * 2 + 1 } - fn copy_out_arrays( - &self, - group: &G, - ) -> Option { + fn copy_out_arrays(&self, group: &G) -> Option { group.precursor().and_then(|scan| { if scan.signal_continuity() == SignalContinuity::Profile { if let Some(array_map) = scan.raw_arrays() { @@ -1013,6 +1097,8 @@ mod mzsignal_impl { }) } + /// Populate the context tracking buffers and pre-fill the output queue + /// with the first productions fn initial_feed(&mut self) { for _ in 0..self.capacity() { let group_opt = self.source.next(); @@ -1080,36 +1166,29 @@ mod mzsignal_impl { 'lifespan, C: CentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, D: DeconvolutedCentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, - R: ScanSource>, + G: SpectrumGrouping>, + R: Iterator, > { - source: SpectrumGroupingIterator< - R, - C, - D, - MultiLayerSpectrum, - SpectrumGroup>, - >, + source: R, averaging_width_index: usize, - buffer: VecDeque>>, - output_buffer: VecDeque>>, + buffer: VecDeque, + output_buffer: VecDeque, averager: SignalAverager<'lifespan>, + reprofiler: PeakSetReprofiler, + _c: PhantomData, + _d: PhantomData, } impl< 'lifespan, C: CentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, D: DeconvolutedCentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, - R: ScanSource>, - > SpectrumAveragingIterator<'lifespan, C, D, R> + G: SpectrumGrouping>, + R: Iterator, + > SpectrumAveragingIterator<'lifespan, C, D, G, R> { pub fn new( - source: SpectrumGroupingIterator< - R, - C, - D, - MultiLayerSpectrum, - SpectrumGroup>, - >, + source: R, averaging_width_index: usize, mz_start: f64, mz_end: f64, @@ -1121,16 +1200,25 @@ mod mzsignal_impl { buffer: VecDeque::with_capacity(averaging_width_index * 2 + 1), output_buffer: VecDeque::with_capacity(averaging_width_index * 2 + 1), averager: SignalAverager::new(mz_start, mz_end, dx), + reprofiler: PeakSetReprofiler::new(mz_start, mz_end, dx), + _c: PhantomData, + _d: PhantomData, }; inst.initial_feed(); inst } - pub fn into_inner(self) -> SpectrumGroupingIterator { + fn reset_buffers(&mut self) { + self.buffer.clear(); + self.output_buffer.clear(); + self.initial_feed(); + } + + pub fn into_inner(self) -> R { self.source } - pub fn get_ref(&self) -> &SpectrumGroupingIterator { + pub const fn get_ref(&self) -> &R { &self.source } @@ -1142,10 +1230,7 @@ mod mzsignal_impl { self.averaging_width_index * 2 + 1 } - fn copy_out_arrays( - &self, - group: &SpectrumGroup>, - ) -> Option> { + fn copy_out_arrays(&self, group: &G) -> Option> { group.precursor().and_then(|scan| { if scan.signal_continuity() == SignalContinuity::Profile { if let Some(array_map) = scan.raw_arrays() { @@ -1160,9 +1245,15 @@ mod mzsignal_impl { if let Some(peaks) = scan.peaks.as_ref() { let fpeaks: Vec<_> = peaks .iter() - .map(|p| FittedPeak::from(p.as_centroid())) + .map(|p| { + let fp = FittedPeak::from(p.as_centroid()); + PeakShapeModel { + peak: Cow::Owned(fp), + shape: PeakShape::Gaussian, + } + }) .collect(); - let signal = reprofile(fpeaks.iter(), self.averager.dx); + let signal = self.reprofiler.reprofile(&fpeaks); Some(ArrayPair::from(( signal.mz_array.to_vec(), signal.intensity_array.to_vec(), @@ -1178,6 +1269,8 @@ mod mzsignal_impl { }) } + /// Populate the context tracking buffers and pre-fill the output queue + /// with the first productions fn initial_feed(&mut self) { for _ in 0..self.capacity() { let group_opt = self.source.next(); @@ -1206,10 +1299,10 @@ mod mzsignal_impl { } } - fn _next_group_no_pop(&mut self) -> Option>> { + fn _next_group_no_pop(&mut self) -> Option { let array_map = self.average_spectra(); if let Some(mut group) = self.buffer.pop_front() { - group.precursor.as_mut().and_then(|precursor| { + group.precursor_mut().and_then(|precursor| { precursor.arrays = Some(array_map); Some(()) }); @@ -1219,9 +1312,9 @@ mod mzsignal_impl { } } - fn update_group(&mut self) -> Option>> { + fn update_group(&mut self) -> Option { let result = self._next_group_no_pop(); - if let Some(group) = self.source.next_group() { + if let Some(group) = self.source.next() { self.averager.pop(); if let Some(pair) = self.copy_out_arrays(&group) { self.averager.push(pair); @@ -1252,7 +1345,7 @@ mod mzsignal_impl { array_map } - fn next_group(&mut self) -> Option>> { + fn next_group(&mut self) -> Option { if let Some(group) = self.output_buffer.pop_front() { Some(group) } else { @@ -1265,10 +1358,11 @@ mod mzsignal_impl { 'lifespan, C: CentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, D: DeconvolutedCentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, - R: RandomAccessSpectrumIterator>, - > Iterator for SpectrumAveragingIterator<'lifespan, C, D, R> + G: SpectrumGrouping>, + R: Iterator, + > Iterator for SpectrumAveragingIterator<'lifespan, C, D, G, R> { - type Item = SpectrumGroup>; + type Item = G; fn next(&mut self) -> Option { self.next_group() @@ -1279,39 +1373,40 @@ mod mzsignal_impl { 'lifespan, C: CentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, D: DeconvolutedCentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, - R: RandomAccessSpectrumIterator>, - > SpectrumAveragingIterator<'lifespan, C, D, R> + G: SpectrumGrouping>, + R: RandomAccessSpectrumGroupingIterator, G>, + > RandomAccessSpectrumGroupingIterator, G> + for SpectrumAveragingIterator<'lifespan, C, D, G, R> { - pub fn start_from_id(&mut self, id: &str) -> Result<&Self, SpectrumAccessError> { + fn reset_state(&mut self) { + self.source.reset_state(); + self.reset_buffers() + } + + fn start_from_id(&mut self, id: &str) -> Result<&Self, SpectrumAccessError> { match self.source.start_from_id(id) { Ok(_) => { - self.buffer.clear(); - self.output_buffer.clear(); - self.source.clear(); + self.reset_state(); Ok(self) } Err(err) => Err(err), } } - pub fn start_from_index(&mut self, index: usize) -> Result<&Self, SpectrumAccessError> { + fn start_from_index(&mut self, index: usize) -> Result<&Self, SpectrumAccessError> { match self.source.start_from_index(index) { Ok(_) => { - self.buffer.clear(); - self.output_buffer.clear(); - self.source.clear(); + self.reset_state(); Ok(self) } Err(err) => Err(err), } } - pub fn start_from_time(&mut self, time: f64) -> Result<&Self, SpectrumAccessError> { + fn start_from_time(&mut self, time: f64) -> Result<&Self, SpectrumAccessError> { match self.source.start_from_time(time) { Ok(_) => { - self.buffer.clear(); - self.output_buffer.clear(); - self.source.clear(); + self.reset_state(); Ok(self) } Err(err) => Err(err), @@ -1323,17 +1418,62 @@ mod mzsignal_impl { 'lifespan, C: CentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, D: DeconvolutedCentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, - R: ScanSource>, + R: RandomAccessSpectrumGroupingIterator, G>, + G: SpectrumGrouping>, > - SpectrumGroupingIterator< - R, + RandomAccessSpectrumGroupingIterator< C, D, MultiLayerSpectrum, - SpectrumGroup>, - > + SpectrumAveragingContext, + > for DeferredSpectrumAveragingIterator + { + fn start_from_id(&mut self, id: &str) -> Result<&Self, SpectrumAccessError> { + match self.source.start_from_id(id) { + Ok(_) => { + self.reset_state(); + Ok(self) + } + Err(err) => Err(err), + } + } + + fn start_from_index(&mut self, index: usize) -> Result<&Self, SpectrumAccessError> { + match self.source.start_from_index(index) { + Ok(_) => { + self.reset_state(); + Ok(self) + } + Err(err) => Err(err), + } + } + + fn start_from_time(&mut self, time: f64) -> Result<&Self, SpectrumAccessError> { + match self.source.start_from_time(time) { + Ok(_) => { + self.reset_state(); + Ok(self) + } + Err(err) => Err(err), + } + } + + fn reset_state(&mut self) { + self.source.reset_state(); + self.reset_buffers(); + } + } + + /// Adds signal averaging to an [`Iterator`] that produces [`SpectrumGrouping`] implementations + /// of the appropriate type. + pub trait SpectrumGroupAveraging< + 'lifespan, + C: CentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, + D: DeconvolutedCentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, + G: SpectrumGrouping>, + >: Iterator + Sized { - /// Average MS1 spectra across `[SpectrumGroup]` from this iterator + /// Average MS1 spectra across [`SpectrumGrouping`](crate::io::traits::SpectrumGrouping) from this iterator /// /// # Arguments /// @@ -1342,25 +1482,34 @@ mod mzsignal_impl { /// * `mz_end` - The maximum m/z to average up to /// * `dx` - The m/z spacing in the averaged spectra /// - /// - pub fn averaging( + fn averaging( self, averaging_width_index: usize, mz_start: f64, mz_end: f64, dx: f64, - ) -> SpectrumAveragingIterator<'lifespan, C, D, R> { + ) -> SpectrumAveragingIterator<'lifespan, C, D, G, Self> { SpectrumAveragingIterator::new(self, averaging_width_index, mz_start, mz_end, dx) } - pub fn averaging_deferred( + /// Create an iterator that defers averaging MS1 spectra across [`SpectrumGrouping`](crate::io::traits::SpectrumGrouping) + /// to a future point by separately returning the machinery for consuming the prepared data, i.e. on another thread. + /// + /// # Arguments + /// + /// * `averaging_width_index` - The number of groups before and after the current group to average MS1 scans across + /// * `mz_start` - The minimum m/z to average from + /// * `mz_end` - The maximum m/z to average up to + /// * `dx` - The m/z spacing in the averaged spectra + /// + fn averaging_deferred( self, averaging_width_index: usize, mz_start: f64, mz_end: f64, dx: f64, ) -> ( - DeferredSpectrumAveragingIterator>>, + DeferredSpectrumAveragingIterator, SignalAverager<'lifespan>, PeakSetReprofiler, ) { @@ -1370,10 +1519,25 @@ mod mzsignal_impl { (iter, averager, reprofiler) } } + + impl< + 'lifespan, + T, + C: CentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, + D: DeconvolutedCentroidLike + Default + BuildArrayMapFrom + BuildFromArrayMap, + G: SpectrumGrouping>, + > SpectrumGroupAveraging<'lifespan, C, D, G> for T + where + T: Iterator, + { + } } #[cfg(feature = "mzsignal")] -pub use mzsignal_impl::{average_spectra, SpectrumAveragingIterator, DeferredSpectrumAveragingIterator}; +pub use mzsignal_impl::{ + average_spectra, DeferredSpectrumAveragingIterator, SpectrumAveragingIterator, + SpectrumGroupAveraging, +}; #[cfg(test)] mod test {