diff --git a/src/io/tdf/mod.rs b/src/io/tdf/mod.rs index df85a8d..44c8d3d 100644 --- a/src/io/tdf/mod.rs +++ b/src/io/tdf/mod.rs @@ -10,4 +10,5 @@ mod arrays; mod sql; mod reader; -pub use reader::{TDFFrameReader, TDFFrameReaderType, TDFSpectrumReader, TDFSpectrumReaderType, is_tdf}; \ No newline at end of file +pub use reader::{TDFFrameReader, TDFFrameReaderType, TDFSpectrumReader, TDFSpectrumReaderType, is_tdf}; +pub use sql::{ChromatographyData, SQLTrace}; \ No newline at end of file diff --git a/src/io/tdf/reader.rs b/src/io/tdf/reader.rs index be4f756..966965c 100644 --- a/src/io/tdf/reader.rs +++ b/src/io/tdf/reader.rs @@ -46,13 +46,13 @@ use timsrust::{ Metadata, TimsRustError, }; -use super::arrays::consolidate_peaks; pub use super::arrays::FrameToArraysMapper; use super::constants::{InstrumentSource, MsMsType}; use super::sql::{ FromSQL, PasefPrecursor, RawTDFSQLReader, SQLDIAFrameMsMsWindow, SQLFrame, SQLPasefFrameMsMs, SQLPrecursor, TDFMSnFacet, }; +use super::{arrays::consolidate_peaks, sql::ChromatographyData}; const PEAK_MERGE_TOLERANCE: Tolerance = Tolerance::Da(0.01); @@ -174,10 +174,8 @@ pub struct TDFFrameReaderType< _d: PhantomData, } - type DIAFrameWindowMap = HashMap>, BuildIdentityHasher>; - impl, D: FeatureLike + KnownCharge> TDFFrameReaderType { @@ -316,10 +314,7 @@ impl, D: FeatureLike + KnownC Ok((entry_count_guess, frame_types)) } - fn build_window_index( - &self, - ) -> Result - { + fn build_window_index(&self) -> Result { let conn = self.tdf_reader.connection(); let dia_windows = SQLDIAFrameMsMsWindow::read_from(&conn, [])?; let mut window_groups: HashMap>, _> = @@ -450,8 +445,7 @@ impl, D: FeatureLike + KnownC .extend_from_slice(&entry.frame.summed_intensities.to_le_bytes()); } ChromatogramType::BasePeakChromatogram => { - intensity_array - .extend_from_slice(&entry.frame.max_intensity.to_le_bytes()); + intensity_array.extend_from_slice(&entry.frame.max_intensity.to_le_bytes()); } _ => { unimplemented!() @@ -597,6 +591,21 @@ impl, D: FeatureLike + KnownC Ok(None) } } + + pub fn get_trace_reader(&self) -> Result { + let path = self + .metadata + .path + .parent() + .expect(".tdf file did not have an enclosing directory") + .join(super::sql::ChromatographyData::FILE_NAME); + let handle = super::sql::ChromatographyData::new(&path).map_err(|e| { + TimsRustError::MetadataReaderError(timsrust::readers::MetadataReaderError::SqlError( + e.into(), + )) + })?; + Ok(handle) + } } // Metadata construction routine @@ -1249,6 +1258,10 @@ impl< pub fn peak_merging_tolerance_mut(&mut self) -> &mut Tolerance { &mut self.peak_merging_tolerance } + + pub fn get_trace_reader(&self) -> Result { + self.frame_reader.get_trace_reader() + } } pub type TDFSpectrumReader = TDFSpectrumReaderType< @@ -1277,7 +1290,6 @@ fn index_to_precursor( let im = metadata.im_converter.convert(prec.scan_average); - let p = inverse_reduce_ion_mobility_param(im); ion.add_param(p); let mut act = Activation::default(); diff --git a/src/io/tdf/sql.rs b/src/io/tdf/sql.rs index 5f7a9af..2460bdd 100644 --- a/src/io/tdf/sql.rs +++ b/src/io/tdf/sql.rs @@ -494,3 +494,202 @@ impl TDFMSnFacet { } } } + +#[derive(Debug, Default, Clone, PartialEq)] +pub struct SQLTrace { + pub id: usize, + pub description: String, + pub instrument: String, + pub instrument_id: String, + pub trace_type: usize, + pub unit: usize, + pub time_offset: f64, + pub times: Option>, + pub intensities: Option>, +} + +impl SQLTrace { + #[allow(clippy::too_many_arguments)] + pub fn new( + id: usize, + description: String, + instrument: String, + instrument_id: String, + trace_type: usize, + unit: usize, + time_offset: f64, + times: Option>, + intensities: Option>, + ) -> Self { + Self { + id, + description, + instrument, + instrument_id, + trace_type, + unit, + time_offset, + times, + intensities, + } + } + + pub fn load_trace_data(&mut self, connection: &Connection) -> Result<&Self, Error> { + let mut q = + connection.prepare("SELECT Times, Intensities FROM TraceChunks WHERE Id = ?")?; + let (mut times, mut intensities) = q + .query([self.id])? + .mapped(|row| { + let times = row.get::>(0)?; + let mut time_acc = Vec::new(); + let mut buf = [0u8; 8]; + for chunk in times.chunks_exact(8) { + buf.copy_from_slice(chunk); + time_acc.push(f64::from_le_bytes(buf)); + } + + let ints = row.get::>(1)?; + let mut intensity_acc = Vec::new(); + let mut buf = [0u8; 4]; + for chunk in ints.chunks_exact(4) { + buf.copy_from_slice(chunk); + intensity_acc.push(f32::from_le_bytes(buf)); + } + + Ok((time_acc, intensity_acc)) + }) + .flatten() + .reduce(|(mut ta, mut ia), (t, i)| { + ta.extend_from_slice(&t); + ia.extend_from_slice(&i); + (ta, ia) + }) + .unwrap_or_default(); + + if !times.is_sorted() { + let mut indices: Vec<_> = (0..times.len()).collect(); + indices.sort_by(|a, b| times[*a].total_cmp(×[*b])); + const TOMBSTONE: usize = usize::MAX; + + for idx in 0..times.len() { + if indices[idx] != TOMBSTONE { + let mut current_idx = idx; + loop { + let next_idx = indices[current_idx]; + indices[current_idx] = TOMBSTONE; + if indices[next_idx] == TOMBSTONE { + break; + } + times.swap(current_idx, next_idx); + intensities.swap(current_idx, next_idx); + current_idx = next_idx; + } + } + } + } + + self.times = Some(times); + self.intensities = Some(intensities); + Ok(self) + } + + pub fn len(&self) -> usize { + self.times.as_ref().map(|s| s.len()).unwrap_or_default() + } + + pub fn is_empty(&self) -> bool { + self.times + .as_ref() + .map(|s| s.is_empty()) + .unwrap_or_default() + } + + pub fn iter(&self) -> impl Iterator + '_ { + self.times + .as_ref() + .into_iter() + .flatten() + .copied() + .zip(self.intensities.as_ref().into_iter().flatten().copied()) + } +} + +impl FromSQL for SQLTrace { + fn from_row(row: &Row<'_>) -> Result { + let this = Self::new( + row.get(0)?, + row.get(1)?, + row.get(2)?, + row.get(3)?, + row.get(4)?, + row.get(5)?, + row.get(6)?, + None, + None, + ); + Ok(this) + } + + fn get_sql() -> String { + "SELECT Id, Description, Instrument, InstrumentId, Type, Unit, TimeOffset FROM TraceSources" + .into() + } +} + +// chromatography-data.sqlite +pub struct ChromatographyData { + pub connection: ReentrantMutex, +} + +impl ChromatographyData { + pub const FILE_NAME: &str = "chromatography-data.sqlite"; + + pub fn new(data_path: &Path) -> Result { + let connection = ReentrantMutex::new(Connection::open(data_path)?); + let this = Self { connection }; + this.valid_schema()?; + Ok(this) + } + + fn valid_schema(&self) -> Result { + Ok(self.has_table("TraceSources")? && self.has_table("TraceChunks")?) + } + + pub fn connection(&self) -> ReentrantMutexGuard<'_, Connection> { + self.connection.try_lock().unwrap() + } + + fn has_table(&self, table: &str) -> Result { + self.connection().query_row( + "SELECT name FROM sqlite_master WHERE type='table' AND name=?", + [table], + |_| Ok(true), + ) + } + + pub fn traces(&mut self, include_data: bool) -> Result, Error> { + let mut traces = SQLTrace::read_from(&self.connection(), [])?; + if include_data { + for t in traces.iter_mut() { + t.load_trace_data(&self.connection())?; + } + } + Ok(traces) + } + + pub fn get_by_name( + &mut self, + name: &str, + include_data: bool, + ) -> Result, Error> { + let mut trace = SQLTrace::read_from_where(&self.connection(), [name], "Description = ?")? + .into_iter() + .next(); + if include_data { + if let Some(t) = trace.as_mut() { + t.load_trace_data(&self.connection())?; + } + } + Ok(trace) + } +}