Skip to content

Commit

Permalink
feature: add chromatogram data reader to bruker_tdf
Browse files Browse the repository at this point in the history
  • Loading branch information
mobiusklein committed Feb 15, 2025
1 parent 49b7ff0 commit 87132fa
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 11 deletions.
3 changes: 2 additions & 1 deletion src/io/tdf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ mod arrays;
mod sql;
mod reader;

pub use reader::{TDFFrameReader, TDFFrameReaderType, TDFSpectrumReader, TDFSpectrumReaderType, is_tdf};
pub use reader::{TDFFrameReader, TDFFrameReaderType, TDFSpectrumReader, TDFSpectrumReaderType, is_tdf};
pub use sql::{ChromatographyData, SQLTrace};
32 changes: 22 additions & 10 deletions src/io/tdf/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ use timsrust::{
Metadata, TimsRustError,
};

use super::arrays::consolidate_peaks;
pub use super::arrays::FrameToArraysMapper;
use super::constants::{InstrumentSource, MsMsType};
use super::sql::{
FromSQL, PasefPrecursor, RawTDFSQLReader, SQLDIAFrameMsMsWindow, SQLFrame, SQLPasefFrameMsMs,
SQLPrecursor, TDFMSnFacet,
};
use super::{arrays::consolidate_peaks, sql::ChromatographyData};

const PEAK_MERGE_TOLERANCE: Tolerance = Tolerance::Da(0.01);

Expand Down Expand Up @@ -174,10 +174,8 @@ pub struct TDFFrameReaderType<
_d: PhantomData<D>,
}


type DIAFrameWindowMap = HashMap<u32, Vec<Arc<SQLDIAFrameMsMsWindow>>, BuildIdentityHasher<u32>>;


impl<C: FeatureLike<MZ, IonMobility>, D: FeatureLike<Mass, IonMobility> + KnownCharge>
TDFFrameReaderType<C, D>
{
Expand Down Expand Up @@ -316,10 +314,7 @@ impl<C: FeatureLike<MZ, IonMobility>, D: FeatureLike<Mass, IonMobility> + KnownC
Ok((entry_count_guess, frame_types))
}

fn build_window_index(
&self,
) -> Result<DIAFrameWindowMap, Error>
{
fn build_window_index(&self) -> Result<DIAFrameWindowMap, Error> {
let conn = self.tdf_reader.connection();
let dia_windows = SQLDIAFrameMsMsWindow::read_from(&conn, [])?;
let mut window_groups: HashMap<u32, Vec<Arc<SQLDIAFrameMsMsWindow>>, _> =
Expand Down Expand Up @@ -450,8 +445,7 @@ impl<C: FeatureLike<MZ, IonMobility>, D: FeatureLike<Mass, IonMobility> + KnownC
.extend_from_slice(&entry.frame.summed_intensities.to_le_bytes());
}
ChromatogramType::BasePeakChromatogram => {
intensity_array
.extend_from_slice(&entry.frame.max_intensity.to_le_bytes());
intensity_array.extend_from_slice(&entry.frame.max_intensity.to_le_bytes());
}
_ => {
unimplemented!()
Expand Down Expand Up @@ -597,6 +591,21 @@ impl<C: FeatureLike<MZ, IonMobility>, D: FeatureLike<Mass, IonMobility> + KnownC
Ok(None)
}
}

pub fn get_trace_reader(&self) -> Result<ChromatographyData, TimsRustError> {
let path = self
.metadata
.path
.parent()
.expect(".tdf file did not have an enclosing directory")
.join(super::sql::ChromatographyData::FILE_NAME);
let handle = super::sql::ChromatographyData::new(&path).map_err(|e| {
TimsRustError::MetadataReaderError(timsrust::readers::MetadataReaderError::SqlError(
e.into(),
))
})?;
Ok(handle)
}
}

// Metadata construction routine
Expand Down Expand Up @@ -1249,6 +1258,10 @@ impl<
pub fn peak_merging_tolerance_mut(&mut self) -> &mut Tolerance {
&mut self.peak_merging_tolerance
}

pub fn get_trace_reader(&self) -> Result<ChromatographyData, TimsRustError> {
self.frame_reader.get_trace_reader()
}
}

pub type TDFSpectrumReader = TDFSpectrumReaderType<
Expand Down Expand Up @@ -1277,7 +1290,6 @@ fn index_to_precursor(

let im = metadata.im_converter.convert(prec.scan_average);


let p = inverse_reduce_ion_mobility_param(im);
ion.add_param(p);
let mut act = Activation::default();
Expand Down
199 changes: 199 additions & 0 deletions src/io/tdf/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,3 +494,202 @@ impl TDFMSnFacet {
}
}
}

#[derive(Debug, Default, Clone, PartialEq)]
pub struct SQLTrace {
pub id: usize,
pub description: String,
pub instrument: String,
pub instrument_id: String,
pub trace_type: usize,
pub unit: usize,
pub time_offset: f64,
pub times: Option<Vec<f64>>,
pub intensities: Option<Vec<f32>>,
}

impl SQLTrace {
#[allow(clippy::too_many_arguments)]
pub fn new(
id: usize,
description: String,
instrument: String,
instrument_id: String,
trace_type: usize,
unit: usize,
time_offset: f64,
times: Option<Vec<f64>>,
intensities: Option<Vec<f32>>,
) -> Self {
Self {
id,
description,
instrument,
instrument_id,
trace_type,
unit,
time_offset,
times,
intensities,
}
}

pub fn load_trace_data(&mut self, connection: &Connection) -> Result<&Self, Error> {
let mut q =
connection.prepare("SELECT Times, Intensities FROM TraceChunks WHERE Id = ?")?;
let (mut times, mut intensities) = q
.query([self.id])?
.mapped(|row| {
let times = row.get::<usize, Vec<u8>>(0)?;
let mut time_acc = Vec::new();
let mut buf = [0u8; 8];
for chunk in times.chunks_exact(8) {
buf.copy_from_slice(chunk);
time_acc.push(f64::from_le_bytes(buf));
}

let ints = row.get::<usize, Vec<u8>>(1)?;
let mut intensity_acc = Vec::new();
let mut buf = [0u8; 4];
for chunk in ints.chunks_exact(4) {
buf.copy_from_slice(chunk);
intensity_acc.push(f32::from_le_bytes(buf));
}

Ok((time_acc, intensity_acc))
})
.flatten()
.reduce(|(mut ta, mut ia), (t, i)| {
ta.extend_from_slice(&t);
ia.extend_from_slice(&i);
(ta, ia)
})
.unwrap_or_default();

if !times.is_sorted() {
let mut indices: Vec<_> = (0..times.len()).collect();
indices.sort_by(|a, b| times[*a].total_cmp(&times[*b]));
const TOMBSTONE: usize = usize::MAX;

for idx in 0..times.len() {
if indices[idx] != TOMBSTONE {
let mut current_idx = idx;
loop {
let next_idx = indices[current_idx];
indices[current_idx] = TOMBSTONE;
if indices[next_idx] == TOMBSTONE {
break;
}
times.swap(current_idx, next_idx);
intensities.swap(current_idx, next_idx);
current_idx = next_idx;
}
}
}
}

self.times = Some(times);
self.intensities = Some(intensities);
Ok(self)
}

pub fn len(&self) -> usize {
self.times.as_ref().map(|s| s.len()).unwrap_or_default()
}

pub fn is_empty(&self) -> bool {
self.times
.as_ref()
.map(|s| s.is_empty())
.unwrap_or_default()
}

pub fn iter(&self) -> impl Iterator<Item = (f64, f32)> + '_ {
self.times
.as_ref()
.into_iter()
.flatten()
.copied()
.zip(self.intensities.as_ref().into_iter().flatten().copied())
}
}

impl FromSQL for SQLTrace {
fn from_row(row: &Row<'_>) -> Result<Self, Error> {
let this = Self::new(
row.get(0)?,
row.get(1)?,
row.get(2)?,
row.get(3)?,
row.get(4)?,
row.get(5)?,
row.get(6)?,
None,
None,
);
Ok(this)
}

fn get_sql() -> String {
"SELECT Id, Description, Instrument, InstrumentId, Type, Unit, TimeOffset FROM TraceSources"
.into()
}
}

// chromatography-data.sqlite
pub struct ChromatographyData {
pub connection: ReentrantMutex<Connection>,
}

impl ChromatographyData {
pub const FILE_NAME: &str = "chromatography-data.sqlite";

pub fn new(data_path: &Path) -> Result<Self, Error> {
let connection = ReentrantMutex::new(Connection::open(data_path)?);
let this = Self { connection };
this.valid_schema()?;
Ok(this)
}

fn valid_schema(&self) -> Result<bool, Error> {
Ok(self.has_table("TraceSources")? && self.has_table("TraceChunks")?)
}

pub fn connection(&self) -> ReentrantMutexGuard<'_, Connection> {
self.connection.try_lock().unwrap()
}

fn has_table(&self, table: &str) -> Result<bool, Error> {
self.connection().query_row(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
[table],
|_| Ok(true),
)
}

pub fn traces(&mut self, include_data: bool) -> Result<Vec<SQLTrace>, Error> {
let mut traces = SQLTrace::read_from(&self.connection(), [])?;
if include_data {
for t in traces.iter_mut() {
t.load_trace_data(&self.connection())?;
}
}
Ok(traces)
}

pub fn get_by_name(
&mut self,
name: &str,
include_data: bool,
) -> Result<Option<SQLTrace>, Error> {
let mut trace = SQLTrace::read_from_where(&self.connection(), [name], "Description = ?")?
.into_iter()
.next();
if include_data {
if let Some(t) = trace.as_mut() {
t.load_trace_data(&self.connection())?;
}
}
Ok(trace)
}
}

0 comments on commit 87132fa

Please sign in to comment.