diff --git a/CHANGELOG.md b/CHANGELOG.md index d752be9..6d3cbc0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [0.1.4] - TBD +## [0.1.4] - 2024-07-29 ### Changed @@ -18,10 +18,17 @@ not work well for very large datasets with millions of variants and it causes ov - Fixed in-place `fillna` in `from_plink_table` in `LDMatrix` to conform to latest `pandas` API. - Update `run_shell_script` to check for and capture errors. - Refactored code to slightly reduce import/load times. +- Cleaned up `load_data` method of `LDMatrix` and subsumed functionality in `load_rows`. ### Added - Added extra validation checks in `LDMatrix` to ensure that the index pointer is formatted correctly. +- `LDLinearOperator` class to allow for efficient linear algebra operations on the LD matrix without +representing the full symmetric matrix in memory. +- Added utility methods to `LDMatrix` class to allow for computing eigenvalues, performing SVD, etc. +- Added `Min eigenvalue` to the attributes of LD matrices. +- Added support to slice/retrieve entries of LD matrix by using SNP rsIDs. +- Added support to reading LD matrices from AWS s3 storage. ## [0.1.3] - 2024-05-21 diff --git a/bin/magenpy_ld b/bin/magenpy_ld index 892f7e5..a8dd20f 100644 --- a/bin/magenpy_ld +++ b/bin/magenpy_ld @@ -85,6 +85,13 @@ parser.add_argument('--storage-dtype', dest='storage_dtype', type=str, default='int8', help='The data type for the entries of the LD matrix.', choices={'float32', 'float64', 'int16', 'int8'}) +# Other options: +parser.add_argument('--compute-spectral-properties', dest='compute_spectral', + default=False, + action='store_true', + help='Compute and store the spectral properties of the ' + 'LD matrix (e.g. eigenvalues, eigenvectors).') + # Add arguments for the compressor: parser.add_argument('--compressor', dest='compressor', type=str, default='zstd', help='The compressor name or compression algorithm to use for the LD matrix.', @@ -226,6 +233,7 @@ ld_mat = g.compute_ld(args.estimator, dtype=args.storage_dtype, compressor_name=args.compressor, compression_level=args.compression_level, + compute_spectral_properties=args.compute_spectral, **ld_kwargs) # Store metadata (if provided): diff --git a/docs/commandline/magenpy_ld.md b/docs/commandline/magenpy_ld.md index 31e3ff2..a59e54a 100644 --- a/docs/commandline/magenpy_ld.md +++ b/docs/commandline/magenpy_ld.md @@ -17,36 +17,36 @@ Which outputs the following help message: ```bash -********************************************** - _ __ ___ __ _ __ _ ___ _ __ _ __ _ _ -| '_ ` _ \ / _` |/ _` |/ _ \ '_ \| '_ \| | | | -| | | | | | (_| | (_| | __/ | | | |_) | |_| | -|_| |_| |_|\__,_|\__, |\___|_| |_| .__/ \__, | - |___/ |_| |___/ -Modeling and Analysis of Genetics data in python -Version: 0.1.0 | Release date: April 2024 -Author: Shadi Zabad, McGill University -********************************************** -< Compute LD matrix and output in Zarr format > + ********************************************** + _ __ ___ __ _ __ _ ___ _ __ _ __ _ _ + | '_ ` _ \ / _` |/ _` |/ _ \ '_ \| '_ \| | | | + | | | | | | (_| | (_| | __/ | | | |_) | |_| | + |_| |_| |_|\__,_|\__, |\___|_| |_| .__/ \__, | + |___/ |_| |___/ + Modeling and Analysis of Genetics data in python + Version: 0.1.4 | Release date: June 2024 + Author: Shadi Zabad, McGill University + ********************************************** + < Compute LD matrix and store in Zarr format > -usage: magenpy_ld [-h] [--estimator {shrinkage,windowed,block,sample}] --bfile BED_FILE [--keep KEEP_FILE] [--extract EXTRACT_FILE] - [--backend {plink,xarray}] [--temp-dir TEMP_DIR] --output-dir OUTPUT_DIR [--min-maf MIN_MAF] [--min-mac MIN_MAC] - [--genome-build GENOME_BUILD] [--metadata METADATA] [--storage-dtype {int8,float32,int16,float64}] - [--compressor {zstd,lz4,zlib,gzip}] [--compression-level COMPRESSION_LEVEL] [--ld-window LD_WINDOW] [--ld-window-kb LD_WINDOW_KB] - [--ld-window-cm LD_WINDOW_CM] [--ld-blocks LD_BLOCKS] [--genmap-Ne GENMAP_NE] [--genmap-sample-size GENMAP_SS] - [--shrinkage-cutoff SHRINK_CUTOFF] +usage: magenpy_ld [-h] [--estimator {shrinkage,block,windowed,sample}] --bfile BED_FILE [--keep KEEP_FILE] [--extract EXTRACT_FILE] + [--backend {xarray,plink}] [--temp-dir TEMP_DIR] --output-dir OUTPUT_DIR [--min-maf MIN_MAF] [--min-mac MIN_MAC] + [--genome-build GENOME_BUILD] [--metadata METADATA] [--storage-dtype {float64,float32,int16,int8}] + [--compute-spectral-properties] [--compressor {lz4,zlib,zstd,gzip}] [--compression-level COMPRESSION_LEVEL] + [--ld-window LD_WINDOW] [--ld-window-kb LD_WINDOW_KB] [--ld-window-cm LD_WINDOW_CM] [--ld-blocks LD_BLOCKS] + [--genmap-Ne GENMAP_NE] [--genmap-sample-size GENMAP_SS] [--shrinkage-cutoff SHRINK_CUTOFF] -Commandline arguments for LD matrix computation +Commandline arguments for LD matrix computation and storage options: -h, --help show this help message and exit - --estimator {shrinkage,windowed,block,sample} + --estimator {shrinkage,block,windowed,sample} The LD estimator (windowed, shrinkage, block, sample) --bfile BED_FILE The path to a plink BED file. --keep KEEP_FILE A plink-style keep file to select a subset of individuals to compute the LD matrices. --extract EXTRACT_FILE A plink-style extract file to select a subset of SNPs to compute the LD matrix for. - --backend {plink,xarray} + --backend {xarray,plink} The backend software used to compute the Linkage-Disequilibrium between variants. --temp-dir TEMP_DIR The temporary directory where we store intermediate files. --output-dir OUTPUT_DIR @@ -55,12 +55,15 @@ options: --min-mac MIN_MAC The minimum minor allele count for variants included in the LD matrix. --genome-build GENOME_BUILD The genome build for the genotype data (recommend storing as metadata). - --metadata METADATA A comma-separated string with metadata keys and values. This is used to store information about the genotype data from which - the LD matrix was computed, such as the biobank/samples, cohort characteristics (e.g. ancestry), etc. Keys and values should - be separated by =, such that inputs are in the form of:--metadata Biobank=UKB,Ancestry=EUR,Date=April2024 - --storage-dtype {int8,float32,int16,float64} + --metadata METADATA A comma-separated string with metadata keys and values. This is used to store information about the genotype data + from which the LD matrix was computed, such as the biobank/samples, cohort characteristics (e.g. ancestry), etc. + Keys and values should be separated by =, such that inputs are in the form of:--metadata + Biobank=UKB,Ancestry=EUR,Date=April2024 + --storage-dtype {float64,float32,int16,int8} The data type for the entries of the LD matrix. - --compressor {zstd,lz4,zlib,gzip} + --compute-spectral-properties + Compute and store the spectral properties of the LD matrix (e.g. eigenvalues, eigenvectors). + --compressor {lz4,zlib,zstd,gzip} The compressor name or compression algorithm to use for the LD matrix. --compression-level COMPRESSION_LEVEL The compression level to use for the entries of the LD matrix (1-9). @@ -78,5 +81,6 @@ options: The sample size for the dataset used to infer the genetic map. --shrinkage-cutoff SHRINK_CUTOFF The cutoff value below which we assume that the correlation between variants is zero. + ``` \ No newline at end of file diff --git a/docs/commandline/magenpy_simulate.md b/docs/commandline/magenpy_simulate.md index 9ef0440..b160f7b 100644 --- a/docs/commandline/magenpy_simulate.md +++ b/docs/commandline/magenpy_simulate.md @@ -18,22 +18,22 @@ Which outputs the following help message: ```bash -********************************************** - _ __ ___ __ _ __ _ ___ _ __ _ __ _ _ -| '_ ` _ \ / _` |/ _` |/ _ \ '_ \| '_ \| | | | -| | | | | | (_| | (_| | __/ | | | |_) | |_| | -|_| |_| |_|\__,_|\__, |\___|_| |_| .__/ \__, | - |___/ |_| |___/ -Modeling and Analysis of Genetics data in python -Version: 0.1.0 | Release date: April 2024 -Author: Shadi Zabad, McGill University -********************************************** -< Simulate complex quantitative or case-control traits > + ******************************************************** + _ __ ___ __ _ __ _ ___ _ __ _ __ _ _ + | '_ ` _ \ / _` |/ _` |/ _ \ '_ \| '_ \| | | | + | | | | | | (_| | (_| | __/ | | | |_) | |_| | + |_| |_| |_|\__,_|\__, |\___|_| |_| .__/ \__, | + |___/ |_| |___/ + Modeling and Analysis of Genetics data in python + Version: 0.1.4 | Release date: June 2024 + Author: Shadi Zabad, McGill University + ******************************************************** + < Simulate complex quantitative or case-control traits > usage: magenpy_simulate [-h] --bfile BED_FILE [--keep KEEP_FILE] [--extract EXTRACT_FILE] [--backend {plink,xarray}] [--temp-dir TEMP_DIR] - --output-file OUTPUT_FILE [--output-simulated-beta] [--min-maf MIN_MAF] [--min-mac MIN_MAC] --h2 H2 [--mix-prop MIX_PROP] - [--prop-causal PROP_CAUSAL] [--var-mult VAR_MULT] [--phenotype-likelihood {binomial,gaussian}] [--prevalence PREVALENCE] - [--seed SEED] + --output-file OUTPUT_FILE [--output-simulated-beta] [--min-maf MIN_MAF] [--min-mac MIN_MAC] --h2 H2 + [--mix-prop MIX_PROP] [--prop-causal PROP_CAUSAL] [--var-mult VAR_MULT] + [--phenotype-likelihood {gaussian,binomial}] [--prevalence PREVALENCE] [--seed SEED] Commandline arguments for the complex trait simulator @@ -53,13 +53,13 @@ options: --min-maf MIN_MAF The minimum minor allele frequency for variants included in the simulation. --min-mac MIN_MAC The minimum minor allele count for variants included in the simulation. --h2 H2 Trait heritability. Ranges between 0. and 1., inclusive. - --mix-prop MIX_PROP Mixing proportions for the mixture density (comma separated). For example, for the spike-and-slab mixture density, with the - proportion of causal variants set to 0.1, you can specify: "--mix-prop 0.9,0.1 --var-mult 0,1". + --mix-prop MIX_PROP Mixing proportions for the mixture density (comma separated). For example, for the spike-and-slab mixture density, + with the proportion of causal variants set to 0.1, you can specify: "--mix-prop 0.9,0.1 --var-mult 0,1". --prop-causal PROP_CAUSAL, -p PROP_CAUSAL The proportion of causal variants in the simulation. See --mix-prop for more complex architectures specification. --var-mult VAR_MULT, -d VAR_MULT Multipliers on the variance for each mixture component. - --phenotype-likelihood {binomial,gaussian} + --phenotype-likelihood {gaussian,binomial} The likelihood for the simulated trait: gaussian (e.g. quantitative) or binomial (e.g. case-control). --prevalence PREVALENCE The prevalence of cases (or proportion of positives) for binary traits. Ranges between 0. and 1. diff --git a/magenpy/GWADataLoader.py b/magenpy/GWADataLoader.py index a6a81fb..a3853e4 100644 --- a/magenpy/GWADataLoader.py +++ b/magenpy/GWADataLoader.py @@ -84,7 +84,7 @@ def __init__(self, :param drop_duplicated: If True, drop SNPs with duplicated rsID. :param phenotype_likelihood: The likelihood of the phenotype (e.g. `gaussian`, `binomial`). :param sumstats_files: The path to the summary statistics file(s). The path may be a wildcard. - :param sumstats_format: The format for the summary statistics. Currently supports the following + :param sumstats_format: The format for the summary statistics. Currently, supports the following formats: `plink1.9`, `plink2`, `magenpy`, `fastGWA`, `COJO`, `SAIGE`, or `GWASCatalog` for the standard summary statistics format (also known as `ssf` or `gwas-ssf`). :param ld_store_files: The path to the LD matrices. This may be a wildcard to accommodate reading data @@ -592,7 +592,11 @@ def read_ld(self, ld_store_paths): return if not iterable(ld_store_paths): - ld_store_files = get_filenames(ld_store_paths, extension='.zgroup') + if 's3://' in ld_store_paths: + from .utils.system_utils import glob_s3_path + ld_store_files = glob_s3_path(ld_store_paths) + else: + ld_store_files = get_filenames(ld_store_paths, extension='.zgroup') else: ld_store_files = ld_store_paths @@ -634,6 +638,7 @@ def compute_ld(self, dtype='int8', compressor_name='zstd', compression_level=7, + compute_spectral_properties=False, **ld_kwargs): """ Compute the Linkage-Disequilibrium (LD) matrix or SNP-by-SNP Pearson @@ -650,6 +655,7 @@ def compute_ld(self, and integer quantized data types int8 and int16). :param compressor_name: The name of the compression algorithm to use for the LD matrix. :param compression_level: The compression level to use for the entries of the LD matrix (1-9). + :param compute_spectral_properties: If True, compute the spectral properties of the LD matrix. :param ld_kwargs: keyword arguments for the various LD estimators. Consult the implementations of `WindowedLD`, `ShrinkageLD`, and `BlockLD` for details. """ @@ -663,6 +669,7 @@ def compute_ld(self, dtype=dtype, compressor_name=compressor_name, compression_level=compression_level, + compute_spectral_properties=compute_spectral_properties, **ld_kwargs) for c, g in tqdm(sorted(self.genotype.items(), key=lambda x: x[0]), total=len(self.genotype), @@ -672,7 +679,8 @@ def compute_ld(self, def get_ld_matrices(self): """ - :return: The LD matrices computed for each chromosome. + :return: A dictionary containing the chromosome ID as key and corresponding LD matrices + as value. """ return self.ld @@ -849,6 +857,7 @@ def to_individual_table(self): :return: A plink-style dataframe of individual IDs, in the form of Family ID (FID) and Individual ID (IID). """ + assert self.sample_table is not None return self.sample_table.get_individual_table() @@ -858,6 +867,8 @@ def to_phenotype_table(self): Individual ID (IID), and phenotype value. """ + assert self.sample_table is not None + return self.sample_table.get_phenotype_table() def to_snp_table(self, col_subset=None, per_chromosome=False, resource='auto'): @@ -879,6 +890,7 @@ def to_snp_table(self, col_subset=None, per_chromosome=False, resource='auto'): # Sanity checks: assert resource in ('auto', 'genotype', 'ld', 'sumstats') + if resource != 'auto': if resource == 'genotype' and self.genotype is None: raise ValueError("Genotype matrix is not available!") diff --git a/magenpy/GenotypeMatrix.py b/magenpy/GenotypeMatrix.py index 1c9b511..9ada03f 100644 --- a/magenpy/GenotypeMatrix.py +++ b/magenpy/GenotypeMatrix.py @@ -343,6 +343,7 @@ def compute_ld(self, dtype='int8', compressor_name='zstd', compression_level=7, + compute_spectral_properties=False, **ld_kwargs): """ @@ -359,6 +360,8 @@ def compute_ld(self, :param compression_level: The compression level for the Zarr array (1-9) :param ld_kwargs: keyword arguments for the various LD estimators. Consult the implementations of `WindowedLD`, `ShrinkageLD`, and `BlockLD` for details. + :param compute_spectral_properties: If True, compute and store information about the eigenvalues of + the LD matrix. """ from .stats.ld.estimator import SampleLD, WindowedLD, ShrinkageLD, BlockLD @@ -382,7 +385,8 @@ def compute_ld(self, temp_dir=tmp_ld_dir.name, dtype=dtype, compressor_name=compressor_name, - compression_level=compression_level) + compression_level=compression_level, + compute_spectral_properties=compute_spectral_properties) def set_sample_table(self, sample_table): """ diff --git a/magenpy/LDLinearOperator.py b/magenpy/LDLinearOperator.py new file mode 100644 index 0000000..9060faf --- /dev/null +++ b/magenpy/LDLinearOperator.py @@ -0,0 +1,130 @@ +from scipy.sparse.linalg import LinearOperator +import numpy as np + + +class LDLinearOperator(LinearOperator): + """ + A class that represents a LinearOperator to facilitate performing matrix-vector + multiplication with a Linkage-Disequilibrium (LD) matrix without representing the + entire matrix in memory. The object is initialized with the data array of an LD matrix, + the index pointer array, and other parameters to specify the behavior of the operator. + + For instance, you may customize the operator to exclude the diagonal entries of the LD + matrix by setting `include_diag=False`, or exclude the lower triangle by setting + `lower_triangle=False`. + + .. note:: + The LD linear operator assumes that the underlying LD matrix is square and symmetric. + Currently, does not support non-square matrices (which may arise in certain situations). + This is left for future work. + + :ivar ld_indptr: The index pointer array for the CSR matrix. + :ivar ld_data: The data array for the CSR matrix (supported data types are float32, float64, int8, int16). + :param diag_add: A scalar or vector of the same shape as the matrix with quantities to add to + the diagonal of the matrix being represented by the linear operator. This is used to support + cases where a diagonal matrix needs to be added before the matrix-vector multiplication. + :ivar dtype: The data type for the entries of the LD matrix. + :ivar shape: The shape of the LD matrix. + :ivar include_diag: If True, the diagonal entries of the LD matrix are included in the computation. + :ivar lower_triangle: If True, the lower triangle of the LD matrix is included in the computation. + :ivar upper_triangle: If True, the upper triangle of the LD matrix is included in the computation. + :ivar threads: The number of threads to use for the computation (experimental). + + """ + def __init__(self, + ld_indptr, + ld_data, + diag_add=None, + dtype='float32', + include_diag=True, + lower_triangle=True, + upper_triangle=True, + threads=1): + """ + Initialize an LDLinearOperator object by passing the index pointer array and data array. + + :param ld_indptr: The index pointer array for the CSR matrix. + :param ld_data: The data array for the CSR matrix. + :param diag_add: A scalar or vector of the same shape as the matrix with quantities to add to + the diagonal of the matrix being represented by the linear operator. + :param dtype: The data type for the entries of the LD matrix (supported data types are float32, float64. + and integer quantized data types int8 and int16). + :param include_diag: If True, the diagonal entries of the LD matrix are included in the computation. + :param lower_triangle: If True, the lower triangle of the LD matrix is included in the computation. + :param upper_triangle: If True, the upper triangle of the LD matrix is included in the computation. + :param threads: The number of threads to use for the computation (experimental). + + """ + + self.ld_indptr = ld_indptr + self.ld_data = ld_data + self.shape = (ld_indptr.shape[0] - 1, ld_indptr.shape[0] - 1) + self.dtype = np.dtype(dtype) + + self.diag_add = None + self.set_diag_add(diag_add) + + self.include_diag = include_diag + self.upper_triangle = upper_triangle + self.lower_triangle = lower_triangle + self.threads = threads + + @property + def dequantization_scale(self): + """ + :return: The dequantization scale for the LD data (if quantized to integer data types). + If the data is not quantized, returns 1. + """ + if np.issubdtype(self.ld_data.dtype, np.integer): + return 1./np.iinfo(self.ld_data.dtype).max + else: + return 1. + + def set_diag_add(self, diag_add): + """ + Set the elements of the diagonal matrix to be added to the LD matrix. + + :param diag_add: A scalar or vector of the same shape as the matrix with quantities to add to + the diagonal of the matrix being represented by the linear operator. + + """ + + if diag_add is not None: + assert np.isscalar(diag_add) or diag_add.shape[0] == self.shape[0] + + self.diag_add = diag_add + + def _matvec(self, x): + """ + Perform matrix-vector multiplication with the LD matrix. + + :param x: The input vector to multiply with the LD matrix. + :return: A numpy array representing the result of the matrix-vector multiplication. + """ + + assert x.shape[0] == self.shape[0] + + from .stats.ld.c_utils import ld_dot + + flatten = len(x.shape) > 1 + + out_vec = ld_dot(self.ld_indptr, + self.ld_data, + x.flatten() if flatten else x, + x.dtype.type(self.dequantization_scale), + self.lower_triangle, + self.upper_triangle, + self.include_diag, + self.threads + ) + + if self.diag_add is not None: + out_vec += x*self.diag_add + + if flatten: + return out_vec.reshape(-1, 1) + else: + return out_vec + + def _rmatvec(self, x): + return self._matvec(x) diff --git a/magenpy/LDMatrix.py b/magenpy/LDMatrix.py index b58aced..89507d3 100644 --- a/magenpy/LDMatrix.py +++ b/magenpy/LDMatrix.py @@ -35,7 +35,7 @@ class LDMatrix(object): * `a2`: The array containing the reference alleles. * `maf`: The array containing the minor allele frequencies. * `bp`: The array containing the base pair positions. - * `cm`: The array containing the centi Morgan positions. + * `cm`: The array containing the centi Morgan distance along the chromosome. * `ldscore`: The array containing the LD scores. * `attrs`: A JSON-style metadata object containing general information about how the LD matrix was calculated, including the chromosome number, sample size, genome build, LD estimator, @@ -81,34 +81,75 @@ def __init__(self, zarr_group, symmetric=False): @classmethod def from_path(cls, ld_store_path): """ - Initialize an `LDMatrix` object from a pre-computed Zarr group store. - :param ld_store_path: The path to the Zarr array store on the filesystem. + Initialize an `LDMatrix` object from a pre-computed Zarr group store. This is a genetic method + that can work with both cloud-based stores (e.g. s3 storage) or local filesystems. + + :param ld_store_path: The path to the Zarr array store. !!! seealso "See Also" - * [from_dir][magenpy.LDMatrix.LDMatrix.from_dir] + * [from_directory][magenpy.LDMatrix.LDMatrix.from_directory] + * [from_s3][magenpy.LDMatrix.LDMatrix.from_s3] + + :return: An `LDMatrix` object. """ - for level in range(2): - try: - ld_group = zarr.open_group(ld_store_path, mode='r') - return cls(ld_group) - except zarr.hierarchy.GroupNotFoundError as e: - if level < 1: - ld_store_path = osp.dirname(ld_store_path) - else: - raise e + if 's3://' in ld_store_path: + return cls.from_s3(ld_store_path) + else: + return cls.from_directory(ld_store_path) @classmethod - def from_dir(cls, ld_store_path): + def from_s3(cls, s3_path, cache_size=None): + """ + Initialize an `LDMatrix` object from a Zarr group store hosted on AWS s3 storage. + + :param s3_path: The path to the Zarr group store on AWS s3. s3 paths are expected + to be of the form `s3://bucket-name/path/to/zarr-store`. + :param cache_size: The size of the cache for the Zarr store (in bytes). + + .. note:: + Requires installing the `s3fs` package to access the Zarr store on AWS s3. + + !!! seealso "See Also" + * [from_path][magenpy.LDMatrix.LDMatrix.from_path] + * [from_directory][magenpy.LDMatrix.LDMatrix.from_directory] + + :return: An `LDMatrix` object. + + """ + + import s3fs + + s3 = s3fs.S3FileSystem(anon=True, client_kwargs=dict(region_name='us-east-2')) + store = s3fs.S3Map(root=s3_path.replace('s3://', ''), s3=s3, check=False) + cache = zarr.LRUStoreCache(store, max_size=cache_size) + ld_group = zarr.open_group(store=cache, mode='r') + + return cls(ld_group) + + @classmethod + def from_directory(cls, dir_path): """ Initialize an `LDMatrix` object from a Zarr array store. - :param ld_store_path: The path to the Zarr array store on the filesystem. + :param dir_path: The path to the Zarr array store on the local filesystem. !!! seealso "See Also" + * [from_s3][magenpy.LDMatrix.LDMatrix.from_s3] * [from_path][magenpy.LDMatrix.LDMatrix.from_path] + + :return: An `LDMatrix` object. """ - return cls.from_path(ld_store_path) + + for level in range(2): + try: + ld_group = zarr.open_group(dir_path, mode='r') + return cls(ld_group) + except zarr.hierarchy.GroupNotFoundError as e: + if level < 1: + dir_path = osp.dirname(dir_path) + else: + raise e @classmethod def from_csr(cls, @@ -486,11 +527,16 @@ def from_ragged_zarr_matrix(cls, @property def n_snps(self): """ - :return: The number of variants in the LD matrix. If the matrix is loaded and filtered, - we return the number of variants remaining after applying the filter. + :return: The number of variants in the LD matrix. If a mask is set, we return the + number of variants included in the mask. """ if self._mat is not None: return self._mat.shape[0] + elif self._mask is not None: + if self._mask.dtype == bool: + return self._mask.sum() + else: + return len(self._mask) else: return self.stored_n_snps @@ -602,6 +648,16 @@ def sample_size(self): """ return self.get_store_attr('Sample size') + @property + def dequantization_scale(self): + """ + :return: The dequantization scale for the quantized LD matrix. If the matrix is not quantized, returns 1. + """ + if np.issubdtype(self.stored_dtype, np.integer): + return 1./np.iinfo(self.stored_dtype).max + else: + return 1. + @property def genome_build(self): """ @@ -718,7 +774,14 @@ def window_size(self): :return: The number of variants in the LD window for each SNP. """ - return np.diff(self.indptr) + + if self.in_memory and self.is_symmetric: + indptr = self.indptr + else: + from .stats.ld.c_utils import get_symmetrized_indptr + indptr, _ = get_symmetrized_indptr(self.indptr[:]) + + return np.diff(indptr) @property def n_neighbors(self): @@ -781,12 +844,8 @@ def row_indices(self): """ :return: The row indices of the non-zero elements of the sparse, CSR representation of the LD matrix """ - if self.in_memory: - # TODO: Check that this behaves correctly if some entries are zero but not eliminated. - return self.csr_matrix.nonzero()[0] - else: - indptr = self.indptr - return np.repeat(np.arange(len(indptr) - 1), np.diff(indptr)) + indptr = self.indptr + return np.repeat(np.arange(len(indptr) - 1), np.diff(indptr)) @property def indptr(self): @@ -975,28 +1034,26 @@ def compute_ld_scores(self, """ if chunk_size is None: - chunk_size = self.stored_n_snps + chunk_size = self.n_snps if annotation_matrix is None: annotation_matrix = np.ones((self.n_snps, 1), dtype=np.float32) ld_scores = np.zeros((self.n_snps, annotation_matrix.shape[1])) - for chunk_idx in range(int(np.ceil(self.stored_n_snps / chunk_size))): + for chunk_idx in range(int(np.ceil(self.n_snps / chunk_size))): start_row = chunk_idx*chunk_size end_row = (chunk_idx + 1)*chunk_size - csr_mat = self.load_rows(start_row=start_row, + csr_mat = self.load_data(start_row=start_row, end_row=end_row, return_symmetric=False, - fill_diag=False, + return_square=False, + keep_original_shape=True, + return_as_csr=True, dtype=np.float32) - # If a mask is set, apply it to the matrix: - if self._mask is not None: - csr_mat = csr_mat[self._mask, :][:, self._mask] - mat_sq = csr_mat.power(2) if corrected: @@ -1022,17 +1079,26 @@ def multiply(self, vec): """ Multiply the LD matrix with an input vector `vec`. + :param vec: The input vector to multiply with the LD matrix. + !!! seealso "See Also" * [dot][magenpy.LDMatrix.LDMatrix.dot] :return: The product of the LD matrix with the input vector. """ - return self.csr_matrix.dot(vec) + + if self.in_memory: + return self.csr_matrix.dot(vec) + else: + ld_opr = self.get_linear_operator() + return ld_opr.dot(vec) def dot(self, vec): """ Multiply the LD matrix with an input vector `vec`. + :param vec: The input vector to multiply with the LD matrix. + !!! seealso "See Also" * [multiply][magenpy.LDMatrix.LDMatrix.multiply] @@ -1041,11 +1107,247 @@ def dot(self, vec): """ return self.multiply(vec) + def perform_svd(self, **svds_kwargs): + """ + Perform the Singular Value Decomposition (SVD) on the LD matrix. + This method is a wrapper around the `scipy.sparse.linalg.svds` function and provides + utilities to perform SVD with a LinearOperator for large-scale LD matrix, so that + the matrices don't need to be fully represented in memory. + + :param svds_kwargs: Additional keyword arguments to pass to the `scipy.sparse.linalg.svds` function. + + :return: The result of the SVD decomposition of the LD matrix. + """ + + from scipy.sparse.linalg import svds + + if self.in_memory and not np.issubdtype(self.dtype, np.integer): + mat = self.csr_matrix + else: + mat = self.get_linear_operator() + + return svds(mat, **svds_kwargs) + + def estimate_min_eigenvalue(self, + block_size=None, + block_size_cm=None, + block_size_kb=None, + blocks=None, + return_block_boundaries=False, + assign_to_variants=False): + """ + Estimate the smallest eigen value for the LD matrix. This is useful for + computing understanding the spectral properties of the LD matrix and detecting potential + issues for downstream applications that leverage the LD matrix. For instance, many LD + matrices are not positive semi-definite (PSD) and this manifests in having negative eigenvalues. + This function can be used to detect such issues. + + To perform this computation efficiently, we leverage fast ARPACK routines provided by `scipy` to + compute only the smallest eigenvalue of the LD matrix. Another advantage of this implementation + is that it doesn't require symmetric or dequantized LD matrices. The `LDLinearOperator` class + can be used to perform all the computations without symmetrizing or dequantizing the matrix beforehand, + which should make it more efficient in terms of memory and CPU resources. + + Furthermore, this function supports computing eigenvalues for sub-blocks of the LD matrix, + by simply providing one of the following parameters: + * `block_size`: Number of variants per block + * `block_size_cm`: Block size in centi-Morgans + * `block_size_kb` Block size in kilobases + + :param block_size: An integer specifying the block size or number of variants to + compute the minimum eigenvalue for. If provided, we compute minimum eigenvalues for each block in the + LD matrix separately, instead of the minimum eigenvalue for the entire matrix. This can be useful for + large LD matrices that don't fit in memory or in cases where information about local blocks is needed. + :param block_size_cm: The block size in centi-Morgans (cM) to compute the minimum eigenvalue for. + :param block_size_kb: The block size in kilo-base pairs (kb) to compute the minimum eigenvalue for. + :param blocks: An array or list specifying the block boundaries to compute the minimum eigenvalue for. + If there are B blocks, then the array should be of size B + 1, with the entries specifying the start position + of each block. + :param return_block_boundaries: If True, return the block boundaries used to compute the minimum eigenvalue. + :param assign_to_variants: If True, assign the minimum eigenvalue to the variants used to compute it. + + :return: The smallest eigenvalue(s) of the LD matrix or sub-blocks of the LD matrix. If `assign_to_variants` + is set to True, then return an array of size `n_snps` mapping the minimum eigenvalue to each variant. + """ + + from scipy.sparse.linalg import eigsh, ArpackNoConvergence + + n_snps = self.stored_n_snps + + if blocks is not None: + block_iter = blocks + elif block_size is not None: + + block_iter = np.clip(np.array(range(0, n_snps + (n_snps % block_size), block_size)), + a_min=0, a_max=n_snps) + + # If the last remaining block is too small, merge it with the previous block: + if block_iter[-1] - block_iter[-2] < 50: + block_iter[-2] = block_iter[-1] + block_iter = block_iter[:-1] + + elif block_size_cm is not None or block_size_kb is not None: + + if block_size_cm is not None: + dist = self.get_metadata('cm', apply_mask=False) + else: + dist = self.get_metadata('bp', apply_mask=False) / 1000 + + dist -= dist[0] + + block_iter = np.concatenate([[0], + np.where((dist[1:] // block_size_cm) != + (dist[:-1] // block_size_cm))[0] + 1, + ]) + + # If the last remaining block is too small, merge it with the previous block: + if n_snps - block_iter[-1] < 50: + block_iter[-1] = n_snps + else: + block_iter = np.concatenate([block_iter, [n_snps]]) + + else: + block_iter = [0, n_snps] + + if assign_to_variants: + eigs_per_var = np.zeros(n_snps, dtype=np.float32) + else: + eigs = [] + + def fast_min_eig(lop, **eigsh_kwargs): + """ + A helper function to compute the smallest eigenvalue of the LD matrix efficiently. + This function uses a trick to shift the eigenvalues to the right by the maximum eigenvalue + and then computes the smallest eigenvalue of the shifted matrix. This is a more stable + and efficient approach for computing the smallest eigenvalue of the LD matrix. + + :param lop: The LDLinearOperator object to compute the smallest eigenvalue for. + + :return: The smallest eigenvalue of the LD matrix. + + """ + lop.set_diag_add(None) + lm_max = eigsh(lop, k=1, which='LM', return_eigenvectors=False, **eigsh_kwargs)[0] + lop.set_diag_add(-lm_max) + lm_max_hat = eigsh(lop, k=1, which='LM', return_eigenvectors=False, **eigsh_kwargs)[0] + lop.set_diag_add(None) + + return lm_max + lm_max_hat + + for bidx in range(len(block_iter) - 1): + + start = block_iter[bidx] + end = block_iter[bidx + 1] + + mat = self.get_linear_operator(start_row=start, end_row=end) + + try: + + eig = fast_min_eig(mat) + except ArpackNoConvergence: + # If there are convergence issues, try to re-run by increasing + # the number of iterations or decreasing the tolerance threshold: + if mat.shape[0]*10 < 10000: + eig = fast_min_eig(mat, maxiter=10000) + else: + eig = fast_min_eig(mat, tol=1e-4) + + if assign_to_variants: + eigs_per_var[start:end] = eig + else: + eigs.append(eig) + + if assign_to_variants: + + if self._mask is not None: + eigs_per_var = eigs_per_var[self._mask] + + if return_block_boundaries: + return eigs_per_var, block_iter + else: + return eigs_per_var + elif return_block_boundaries: + return eigs, block_iter + else: + if len(eigs) == 1: + return eigs[0] + else: + return eigs + + def get_lambda_min(self, aggregate=None): + """ + A utility method to compute the `lambda_min` value for the LD matrix. `lambda_min` is the smallest + eigenvalue of the LD matrix and this quantity can be useful to know about in some applications. + The function retrieves minimum eigenvalue (if pre-computed and stored) per block and maps it + to each variant in the corresponding block. If minimum eigenvalues per block are not available, + we use global minimum eigenvalue (either from matrix attributes or we compute it on the spot). + + Before returning the `lambda_min` value to the user, we apply the following transformation: + + abs(min(lambda_min, 0.)) + + This implies that if the minimum eigenvalue is positive, we just return 0. for `lambda_min`. We are mainly + interested in negative eigenvalues here (if they exist). + + :param aggregate: A summary of the minimum eigenvalue across variants or across blocks (if available). + Supported aggregation functions are `mean_block`, `median_block`, `min_block`, and `min`. If `min` is selected, + we return the minimum eigenvalue for the entire matrix (rather than sub-blocks of it). + + :return: The `lambda_min` value for the LD matrix. + """ + + if aggregate is not None: + assert aggregate in ('mean_block', 'median_block', 'min_block', 'min') + + # Get the attributes of the LD store: + store_attrs = self.list_store_attributes() + + if aggregate in ('mean_block', 'median_block', 'min_block'): + assert 'Min eigenvalue per block' in store_attrs, ( + 'Aggregating lambda_min across blocks ' + 'requires that these blocks are pre-defined.') + + lambda_min = 0. + + if aggregate == 'min' or 'Min eigenvalue per block' not in store_attrs: + + if 'Min eigenvalue' in store_attrs: + lambda_min = self.get_store_attr('Min eigenvalue') + else: + lambda_min = self.estimate_min_eigenvalue() + + elif 'Min eigenvalue per block' in store_attrs: + + # If we have eigenvalues per block, map the block value to each variant: + block_eigs = self.get_store_attr('Min eigenvalue per block') + + if aggregate is None: + + indices = np.arange(block_eigs['Block boundaries'][-1]) + # Use searchsorted to find which boundary each index belongs to: + value_indices = np.searchsorted(block_eigs['Block boundaries'], indices, side='right') - 1 + lambda_min = np.array(block_eigs['Min eigenvalue'])[value_indices] + + if self._mask is not None: + lambda_min = lambda_min[self._mask] + + elif aggregate == 'mean_block': + lambda_min = np.mean(block_eigs['Min eigenvalue']) + elif aggregate == 'median_block': + lambda_min = np.median(block_eigs['Min eigenvalue']) + elif aggregate == 'min_block': + lambda_min = np.min(block_eigs['Min eigenvalue']) + + return np.abs(np.minimum(lambda_min, 0.)) + def estimate_uncompressed_size(self, dtype=None): """ Provide an estimate of size of the uncompressed LD matrix in megabytes (MB). - This is only a rough estimate. Depending on how the LD matrix is loaded, the actual size - may be much larger than this estimate. + This is only a rough estimate. Depending on how the LD matrix is loaded, actual memory + usage may be larger than this estimate. + + :param dtype: The data type for the entries of the LD matrix. If None, the stored data type is used + to determine the size of the data in memory. :return: The estimated size of the uncompressed LD matrix in MB. @@ -1059,6 +1361,7 @@ def estimate_uncompressed_size(self, dtype=None): def get_metadata(self, key, apply_mask=True): """ Get the metadata associated with each variant in the LD matrix. + :param key: The key for the metadata item. :param apply_mask: If True, apply the mask (e.g. filter) to the metadata. @@ -1081,11 +1384,14 @@ def get_store_attr(self, attr): :return: The value for the attribute. :raises KeyError: if the attribute is not set. """ - try: - return self._zg.attrs[attr] - except KeyError: - print(f"Warning: Attribute '{attr}' is not set!") - return None + return self._zg.attrs[attr] + + def list_store_attributes(self): + """ + Get all the attributes associated with the LD matrix. + :return: A list of all the attributes. + """ + return list(self._zg.attrs.keys()) def set_store_attr(self, attr, value): """ @@ -1163,26 +1469,73 @@ def update_rows_inplace(self, new_csr, start_row=None, end_row=None): if np.issubdtype(self.stored_dtype, np.integer) and np.issubdtype(new_csr.dtype, np.floating): self._zg['matrix/data'][data_start:data_end] = quantize(new_csr.data, int_dtype=self.stored_dtype) else: - self._zg['matrix/data'][data_start:data_end] = new_csr.data.astype(self.stored_dtype) + self._zg['matrix/data'][data_start:data_end] = new_csr.data.astype(self.stored_dtype, copy=False) + + def get_linear_operator(self, + start_row=None, + end_row=None, + **linop_kwargs): + """ + Use `scipy.sparse` interface to construct a `LinearOperator` object representing the LD matrix. + This is useful for performing various linear algebra routines on the LD matrix without + necessarily symmetrizing or de-quantizing it beforehand. For instance, this operator can + be used to compute matrix-vector products with the LD matrix, solve linear systems, + perform SVD, compute eigenvalues, etc. + + !!! seealso "See Also" + * [LDLinearOperator][magenpy.LDMatrix.LDLinearOperator] + + .. note :: + For now, start and end row positions are always with reference to the original matrix + (i.e. without applying any masks or filters). + + :param start_row: The start row to load to memory (if loading a subset of the matrix). + :param end_row: The end row (not inclusive) to load to memory (if loading a subset of the matrix). + :param linop_kwargs: Keyword arguments to pass to the `LDLinearOperator` constructor. + + :return: A scipy `LinearOperator` object representing the LD matrix. + """ + + ld_data = self.load_data(start_row=start_row, end_row=end_row, return_square=True) + + from .LDLinearOperator import LDLinearOperator + + return LDLinearOperator(ld_data[1], ld_data[0], **linop_kwargs) def load_data(self, - return_symmetric=False, + start_row=None, + end_row=None, dtype=None, - return_as_csr=False): + return_square=True, + return_symmetric=False, + return_as_csr=False, + keep_original_shape=False): """ A utility function to load and process the LD matrix data. This function is particularly useful for filtering, symmetrizing, and dequantizing the LD matrix - after it's loaded into memory. + after it's loaded to memory. - TODO: Support loading subset of rows of the matrix, similar to `load_rows` (i.e. replace `load_rows` - with newer implementation). + .. note :: + Start and end row positions are always with reference to the stored on-disk LD matrix. - :param return_symmetric: If True, return a full symmetric representation of the LD matrix. + TODO: Implement caching mechanism for non-CSR-formatted data. + + :param start_row: The start row to load to memory (if loading a subset of the matrix). + :param end_row: The end row (not inclusive) to load to memory (if loading a subset of the matrix). :param dtype: The data type for the entries of the LD matrix. + :param return_square: If True, return a square representation of the LD matrix. This flag is used in + conjunction with the `start_row` and `end_row` parameters. In particular, if `end_row` is less than the + number of variants and `return_square=False`, then we return a rectangular slice of the LD matrix + corresponding to the rows requested by the user. + :param return_symmetric: If True, return a full symmetric representation of the LD matrix. :param return_as_csr: If True, return the data in the CSR format. + :param keep_original_shape: This flag is used in conjunction with the `return_as_csr` flag. If set to + True, we return the CSR matrix with the same shape as the original LD matrix, with the only + non-zero entries are those in the requested `start_row:end_row` region. If set to False, we return + a sub-matrix with the requested data only. :return: A tuple of the index pointer array, the data array, and the leftmost index array. If - `return_as_csr` is set to True, we return the data as a CSR matrix. + `return_as_csr` is set to True, we return the data as a CSR matrix instead. """ # -------------- Step 1: Preparing input data type -------------- @@ -1196,33 +1549,67 @@ def load_data(self, else: dequantize_data = False + # -------------- Step 2: Pre-process data boundaries (if provided) -------------- + n_snps = self.stored_n_snps + + start_row = start_row or 0 + end_row = end_row or n_snps + + assert start_row >= 0 + end_row = min(end_row, n_snps) + # -------------- Step 2: Fetch the indptr array -------------- # Get the index pointer array: - indptr = self._zg['matrix/indptr'][:] + indptr = self._zg['matrix/indptr'][start_row:end_row + 1] - # -------------- Step 3: Checking data masking -------------- + # Determine the start and end positions in the data matrix + # based on the requested start and end rows: + data_start = indptr[0] + data_end = indptr[-1] - # Filter the index pointer array based on the mask: - if self._mask is not None: + # If the user is requesting a subset of the matrix, then we need to adjust + # the index pointer accordingly: + if start_row > 0: + # Zero out all index pointers before `start_row`: + indptr = np.clip(indptr - data_start, a_min=0, a_max=None) - if np.issubdtype(self._mask.dtype, np.integer): - mask = np.zeros(self.stored_n_snps, dtype=np.int8) - mask[self._mask] = 1 + # -------------- Step 3: Loading and filtering data array -------------- + + data = self._zg['matrix/data'][data_start:data_end] + + # Filter the data and index pointer arrays based on the mask (if set): + if self._mask is not None or (end_row < n_snps and return_square): + + mask = np.zeros(n_snps, dtype=np.int8) + + # Two cases to consider: + + # 1) If the mask is not set: + if self._mask is None: + + # If the returned matrix should be square: + if return_square: + mask[start_row:end_row] = 1 + else: + mask[start_row:] = 1 + + new_size = end_row - start_row else: - mask = self._mask + # If the mask is set: - from .stats.ld.c_utils import filter_ut_csr_matrix + mask[self._mask] = 1 - data_mask, indptr = filter_ut_csr_matrix(indptr, mask) - # .oindex and .vindex are slow and convert to integer indices in the background, - # which unnecessarily increases memory usage. Unfortunately, here we have to load the entire - # data and index it using the boolean array afterward. - # Something to be improved in the future. :) - data = self._zg['matrix/data'][:][data_mask] - del data_mask - else: - data = self._zg['matrix/data'][:] + # If return square, ensure that elements after end row are set to 0 in the mask: + if return_square: + mask[end_row:] = 0 + + # Compute new size: + new_size = mask[start_row:end_row].sum() + + from .stats.ld.c_utils import filter_ut_csr_matrix_inplace + + data, indptr = filter_ut_csr_matrix_inplace(indptr, data, mask[start_row:], new_size) # -------------- Step 4: Symmetrizing input matrix -------------- @@ -1258,167 +1645,29 @@ def load_data(self, (np.diff(indptr) + leftmost_idx).astype(np.int32), data.shape[0]) + if keep_original_shape: + shape = (n_snps, n_snps) + indices += start_row + indptr = np.concatenate([np.zeros(start_row, dtype=indptr.dtype), + indptr, + np.ones(n_snps - end_row, dtype=indptr.dtype) * indptr[-1]]) + elif return_square or end_row == n_snps: + shape = (indptr.shape[0] - 1, indptr.shape[0] - 1) + else: + shape = (indptr.shape[0] - 1, n_snps) + return csr_matrix( ( data, indices, indptr ), + shape=shape, dtype=dtype ) else: return data, indptr, leftmost_idx - def load_rows(self, - start_row=None, - end_row=None, - return_symmetric=False, - fill_diag=False, - keep_shape=True, - dtype=None): - """ - A utility function to allow for loading a subset of the LD matrix. - By specifying `start_row` and `end_row`, the user can process or inspect small - blocks of the LD matrix without loading the whole thing into memory. - - !!! note - This method does not perform any filtering on the stored data. - To access the LD matrix with filtering, use `.load()` or `load_data`. - - !!! seealso "See Also" - * [load_data][magenpy.LDMatrix.LDMatrix.load_data] - * [load][magenpy.LDMatrix.LDMatrix.load] - - :param start_row: The start row to load to memory - :param end_row: The end row (not inclusive) to load to memory - :param return_symmetric: If True, return a full symmetric representation of the LD matrix. - :param fill_diag: If True, fill the diagonal of the LD matrix with ones. - :param keep_shape: If True, return the LD matrix with the same shape as the original. Here, - entries that are outside the requested start_row:end_row region will be zeroed out. - :param dtype: The data type for the entries of the LD matrix. - - :return: The requested sub-matrix of the LD matrix. - """ - - # Determine the final data type for the LD matrix entries - # and whether we need to perform dequantization or not depending on - # the stored data type and the requested data type. - if dtype is None: - dtype = self.stored_dtype - dequantize_data = False - else: - dtype = np.dtype(dtype) - if np.issubdtype(self.stored_dtype, np.integer) and np.issubdtype(dtype, np.floating): - dequantize_data = True - else: - dequantize_data = False - - # Sanity checking + forming the dimensions of the - # requested sub-matrix: - n_snps = self.stored_n_snps - - start_row = start_row or 0 - end_row = end_row or n_snps - - # Sanity checking: - assert start_row >= 0 - end_row = min(end_row, n_snps) - - # Load the index pointer from disk: - indptr = self._zg['matrix/indptr'][:] - - # Determine the start and end positions in the data matrix - # based on the requested start and end rows: - data_start = indptr[start_row] - data_end = indptr[end_row] - - # If the user is requesting a subset of the matrix, then we need to adjust - # the index pointer accordingly: - if start_row > 0 or end_row < n_snps: - # Zero out all index pointers before `start_row`: - indptr = np.clip(indptr - data_start, a_min=0, a_max=None) - # Adjust all index pointers after `end_row`: - indptr[end_row+1:] = (data_end - data_start) - - # Extract the data for the requested rows: - csr_data = self._zg['matrix/data'][data_start:data_end] - - # If we need to de-quantize the data, do it now: - if dequantize_data: - csr_data = dequantize(csr_data, float_dtype=dtype) - - # Construct a CSR matrix from the loaded data, updated indptr, and indices: - - # Get the indices array: - if self.in_memory: - # If the matrix (or a version of it) is already loaded, - # then set the `in_memory` flag to False before fetching the indices. - self.in_memory = False - indices = self.indices - self.in_memory = True - else: - indices = self.indices - - from scipy.sparse import csr_matrix, diags - - mat = csr_matrix( - ( - csr_data, - indices[data_start:data_end], - indptr - ), - shape=(n_snps, n_snps), - dtype=dtype - ) - - # Determine the "invalid" value for the purposes of reconstructing - # the symmetric matrix: - if np.issubdtype(dtype, np.integer): - # For integers, we don't use the minimum value during quantization - # because we would like to have the zero point at exactly zero. So, - # we can use this value as our alternative to `nan`. - invalid_value = np.iinfo(dtype).min - identity_val = np.iinfo(dtype).max - else: - invalid_value = np.nan - identity_val = 1 - - if return_symmetric: - - # First, replace explicit zeros with invalid value (this is a hack to prevent scipy - # from eliminating those zeros when making the matrix symmetric): - mat.data[mat.data == 0] = invalid_value - - # Add the matrix transpose to make it symmetric: - mat += mat.T - - # If the user requested filling the diagonals, do it here: - if fill_diag: - diag_vals = np.concatenate([np.zeros(start_row, dtype=dtype), - identity_val*np.ones(end_row - start_row, dtype=dtype), - np.zeros(n_snps - end_row, dtype=dtype)]) - mat += diags(diag_vals, dtype=dtype, shape=mat.shape) - - # Replace the invalid values with zeros again: - if np.isnan(invalid_value): - mat.data[np.isnan(mat.data)] = 0 - else: - mat.data[mat.data == invalid_value] = 0 - - return mat - elif fill_diag: - diag_vals = np.concatenate([np.zeros(start_row, dtype=dtype), - identity_val*np.ones(end_row - start_row, dtype=dtype), - np.zeros(n_snps - end_row, dtype=dtype)]) - mat += diags(diag_vals, dtype=dtype, shape=mat.shape) - - # If the shape remains the same, return the matrix as is. - # Otherwise, return the requested sub-matrix: - if keep_shape: - return mat - else: - return mat[start_row:end_row, :] - def load(self, force_reload=False, return_symmetric=True, @@ -1428,46 +1677,55 @@ def load(self, Load the LD matrix from on-disk storage in the form of Zarr arrays to memory, in the form of sparse CSR matrices. - !!! seealso "See Also" - * [load_data][magenpy.LDMatrix.LDMatrix.load_data] - * [load_rows][magenpy.LDMatrix.LDMatrix.load_rows] - :param force_reload: If True, it will reload the data even if it is already in memory. :param return_symmetric: If True, return a full symmetric representation of the LD matrix. :param dtype: The data type for the entries of the LD matrix. - :return: The LD matrix as a sparse CSR matrix. + !!! seealso "See Also" + * [load_data][magenpy.LDMatrix.LDMatrix.load_data] + + :return: The LD matrix as a `scipy` sparse CSR matrix. """ if dtype is not None: dtype = np.dtype(dtype) + else: + dtype = self.dtype - if self.in_memory: - # If the LD matrix is already in memory: + if not force_reload and self.in_memory and return_symmetric == self.is_symmetric: + # If the LD matrix is already in memory and the requested symmetry is the same, + # then we don't need to reload the matrix. Here, we only transform its entries it to + # conform to the requested data types of the user: - if (return_symmetric == self.is_symmetric) and not force_reload: - # If the requested symmetry is the same as the one already loaded, - # and the user asked not to force a reload, then do nothing. + # If the requested data type differs from the stored one, we need to cast the data: + if dtype is not None and self._mat.data.dtype != dtype: - # If the currently loaded LD matrix has float entries and the user wants - # the return type to be another floating point, then just cast and return. - # Otherwise, we have to reload the matrix: if np.issubdtype(self._mat.data.dtype, np.floating) and np.issubdtype(dtype, np.floating): + # The user requested casting the data to different floating point precision: self._mat.data = self._mat.data.astype(dtype) - return - elif self._mat.data.dtype == dtype: - return + if np.issubdtype(self._mat.data.dtype, np.integer) and np.issubdtype(dtype, np.integer): + # The user requested casting the data to different integer format: + self._mat.data = quantize(dequantize(self._mat.data), int_dtype=dtype) + elif np.issubdtype(self._mat.data.dtype, np.floating) and np.issubdtype(dtype, np.integer): + # The user requested quantizing the data from floats to integers: + self._mat.data = quantize(self._mat.data, int_dtype=dtype) + else: + # The user requested de-quantizing the data from integers to floats: + self._mat.data = dequantize(self._mat.data) - # If we are re-loading the matrix, make sure to release the current one: - self.release() + else: + # If we are re-loading the matrix, make sure to release the current one: + self.release() - self._mat = self.load_data(return_symmetric=return_symmetric, - dtype=dtype, - return_as_csr=True) + self._mat = self.load_data(return_symmetric=return_symmetric, + dtype=dtype, + return_as_csr=True) - # Update the flags: - self.in_memory = True - self.is_symmetric = return_symmetric + # Update the flags: + self.in_memory = True + self.is_symmetric = return_symmetric + + return self._mat def release(self): """ @@ -1498,8 +1756,10 @@ def get_row(self, index, return_indices=False): indptr = self.indptr[:] start_idx, end_idx = indptr[index], indptr[index + 1] if return_indices: - return self.data[start_idx:end_idx], np.arange(index + 1, - index + 1 + (indptr[index + 1] - indptr[index])) + return self.data[start_idx:end_idx], np.arange( + index + 1, + index + 1 + (indptr[index + 1] - indptr[index]) + ) else: return self.data[start_idx:end_idx] @@ -1570,8 +1830,67 @@ def __setstate__(self, state): def __len__(self): return self.n_snps - def __getitem__(self, index): - return self.get_row(index) + def __getitem__(self, item): + """ + Access the LD matrix entries via the `[]` operator. + This implementation supports the following types of indexing: + * Accessing a single row of the LD matrix by specifying numeric index or SNP rsID. + * Accessing a single entry of the LD matrix by specifying numeric indices or SNP rsIDs. + + Example usages: + + >>> ldm[0] + >>> ldm['rs123'] + >>> ldm['rs123', 'rs456'] + """ + + dq_scale = self.dequantization_scale + + if isinstance(item, tuple): + assert len(item) == 2 + assert type(item[0]) is type(item[1]) + + if isinstance(item[0], str): + + # If they're the same variant: + if item[0] == item[1]: + return 1. + + # Extract the indices of the two variants: + snps = self.snps.tolist() + + try: + index_1 = snps.index(item[0]) + except ValueError: + raise ValueError(f"Invalid SNP ID: {item[0]}") + + try: + index_2 = snps.index(item[1]) + except ValueError: + raise ValueError(f"Invalid SNP ID: {item[1]}") + + else: + index_1, index_2 = item + + index_1, index_2 = sorted([index_1, index_2]) + + if index_1 == index_2: + return 1. + if index_2 - index_1 > self.window_size[index_1]: + return 0. + else: + row = self.get_row(index_1) + return dq_scale*row[index_2 - index_1 - 1] + + if isinstance(item, int): + return dq_scale*self.get_row(item) + elif isinstance(item, str): + try: + index = self.snps.tolist().index(item) + except ValueError: + raise ValueError(f"Invalid SNP ID: {item}") + + return dq_scale*self.get_row(index) def __iter__(self): """ diff --git a/magenpy/plot/ld.py b/magenpy/plot/ld.py index a146a55..4e7f1d4 100644 --- a/magenpy/plot/ld.py +++ b/magenpy/plot/ld.py @@ -4,38 +4,46 @@ def plot_ld_matrix(ldm: LDMatrix, - row_subset=None, - display='full', + variant_subset=None, + start_row=None, + end_row=None, + symmetric=False, cmap='OrRd', include_colorbar=True): """ Plot a heatmap representing the LD matrix or portions of it. :param ldm: An instance of `LDMatrix`. - :param row_subset: A boolean or integer index array for the subset of rows/columns to extract from the LD matrix. - :param display: A string indicating what part of the matrix to display. Can be 'full', 'upper', 'lower'. - If upper, only the upper triangle of the matrix will be displayed. - If lower, only the lower triangle will be displayed. + :param variant_subset: A list of variant rsIDs to subset the LD matrix. + :param start_row: The starting row index for the LD matrix plot. + :param end_row: The ending row index (not inclusive) for the LD matrix plot. + :param symmetric: If True, plot the symmetric version of the LD matrix. + Otherwise, plot the upper triangular part. :param cmap: The color map for the LD matrix plot. :param include_colorbar: If True, include a colorbar in the plot. """ - if row_subset is None: - row_subset = np.arange(ldm.shape[0]) + curr_mask = None - # TODO: Figure out a way to do this without loading the entire matrix: - ldm.load(return_symmetric=True, dtype='float32') + if variant_subset is not None: + curr_mask = ldm.get_mask() + ldm.reset_mask() + ldm.filter_snps(variant_subset) - mat = ldm.csr_matrix[row_subset, :][:, row_subset].toarray() + ld_mat = ldm.load_data(start_row=start_row, + end_row=end_row, + return_symmetric=symmetric, + return_square=True, + return_as_csr=True, + dtype=np.float32).todense() - if display == 'upper': - mat = np.triu(mat, k=1) - elif display == 'lower': - mat = np.tril(mat, k=1) - - plt.imshow(mat, cmap=cmap, vmin=-1., vmax=1.) + plt.imshow(ld_mat, cmap=cmap, vmin=-1., vmax=1.) if include_colorbar: plt.colorbar() + # Reset the original mask: + if curr_mask is not None: + ldm.set_mask(curr_mask) + plt.axis('off') diff --git a/magenpy/stats/ld/c_utils.pyx b/magenpy/stats/ld/c_utils.pyx index 3b04823..6d9d72c 100644 --- a/magenpy/stats/ld/c_utils.pyx +++ b/magenpy/stats/ld/c_utils.pyx @@ -26,6 +26,47 @@ ctypedef fused noncomplex_numeric: cnp.float64_t +cdef extern from "ld_utils.hpp" nogil: + bint blas_supported() noexcept nogil + bint omp_supported() noexcept nogil + + void ld_matrix_dot[T, U, I](int c_size, + I* ld_indptr, + U* ld_data, + T* vec, + T* out, + T dq_scale, + bint include_lower_triangle, + bint include_upper_triangle, + bint include_diag, + int threads) + + +cpdef ld_dot(integral[::1] ld_indptr, + noncomplex_numeric[::1] ld_data, + floating[::1] vec, + floating dq_scale = 1., + bint include_lower_triangle = 1, + bint include_upper_triangle = 1, + bint include_diag = 1, + int threads = 1): + + cdef: + floating[::1] out = np.zeros_like(vec) + + ld_matrix_dot(vec.shape[0], + &ld_indptr[0], + &ld_data[0], + &vec[0], + &out[0], + dq_scale, + include_lower_triangle, + include_upper_triangle, + include_diag, + threads) + + return np.asarray(out) + cpdef find_tagging_variants(int[::1] variant_indices, integral[::1] indptr, noncomplex_numeric[::1] data, @@ -88,7 +129,7 @@ cpdef prune_ld_ut(integral[::1] indptr, if numeric_abs(data[curr_data_idx]) >= r_threshold: keep[curr_row + i + 1] = 0 - return np.asarray(keep, dtype=bool) + return np.asarray(keep).view(bool) @cython.boundscheck(False) @@ -319,19 +360,77 @@ cpdef symmetrize_ut_csr_matrix(integral[::1] indptr, curr_data_idx = indptr[curr_row] + curr_col - curr_row - 1 new_idx_1 = new_indptr[curr_row] + curr_col - leftmost_col[curr_row] - new_idx_2 = new_indptr[curr_col] + curr_row - leftmost_col[curr_col] - new_data[new_idx_1] = data[curr_data_idx] - new_data[new_idx_2] = data[curr_data_idx] + + # To allow for non-square matrices, we need to check if the column index is within bounds: + if curr_col < curr_shape: + new_idx_2 = new_indptr[curr_col] + curr_row - leftmost_col[curr_col] + new_data[new_idx_2] = data[curr_data_idx] return np.asarray(new_data), np.asarray(new_idx[0]), np.asarray(new_idx[1]) +cpdef filter_ut_csr_matrix_inplace(integral[::1] indptr, + noncomplex_numeric[::1] data, + char[::1] bool_mask, + int new_size): + """ + Given an upper triangular CSR matrix represented by the data and indptr arrays, this function filters + its entries with a boolean mask. + + 1. The non-zero elements are contiguous along the diagonal of each row (starting from + the diagonal + 1). + 2. The diagonal elements aren't present in the upper triangular matrix. + + .. note:: + This function modifies the input data array in place. + + :param indptr: The index pointer array for the CSR matrix to be filtered. + :param data: The data array for the CSR matrix to be filtered. + :param bool_mask: A boolean mask indicating which elements (rows) of the matrix to keep. + :param new_size: The new size of the filtered matrix. + + :return: A tuple with the new filtered data array and the new indptr array. + """ + + cdef: + int64_t i, curr_row, row_bound, new_indptr_idx = 1, new_data_idx = 0, curr_shape=indptr.shape[0] - 1 + int64_t[::1] new_indptr = np.zeros(new_size + 1, dtype=np.int64) + + with nogil: + # For each row in the current matrix: + for curr_row in range(curr_shape): + + # If the row is to be included in the new matrix: + if bool_mask[curr_row]: + + # Useful quantity to convert the data array index `i` to the + # equivalent row index in the `bool` mask: + row_bound = curr_row - indptr[curr_row] + 1 + + # For the new indptr array, copy the value from the previous row: + new_indptr[new_indptr_idx] = new_indptr[new_indptr_idx - 1] + + # For each entry for this row in the data array + for i in range(indptr[curr_row], indptr[curr_row + 1]): + + # If the entry isn't filtered, make sure it's included in the new matrix + # And increase the `indptr` by one unit: + if bool_mask[row_bound + i]: + data[new_data_idx] = data[i] + new_data_idx += 1 + new_indptr[new_indptr_idx] += 1 + + new_indptr_idx += 1 + + return np.asarray(data)[:new_data_idx], np.asarray(new_indptr) + + @cython.boundscheck(False) @cython.wraparound(False) @cython.nonecheck(False) @cython.exceptval(check=False) -cpdef filter_ut_csr_matrix(integral[::1] indptr, char[::1] bool_mask): +cpdef generate_data_mask_ut_csr_matrix(integral[::1] indptr, char[::1] bool_mask): """ This is a utility function to generate a mask with the purpose of filtering the data array of upper-triangular CSR matrices. The function also generates a new @@ -377,7 +476,7 @@ cpdef filter_ut_csr_matrix(integral[::1] indptr, char[::1] bool_mask): new_indptr_idx += 1 - return np.asarray(data_mask).astype(bool, copy=False), np.asarray(new_indptr) + return np.asarray(data_mask).view(bool), np.asarray(new_indptr) @cython.boundscheck(False) @cython.wraparound(False) diff --git a/magenpy/stats/ld/estimator.py b/magenpy/stats/ld/estimator.py index 058e053..fdb6322 100644 --- a/magenpy/stats/ld/estimator.py +++ b/magenpy/stats/ld/estimator.py @@ -20,6 +20,7 @@ class SampleLD(object): * [BlockLD][magenpy.stats.ld.estimator.BlockLD] :ivar genotype_matrix: The genotype matrix, an instance of `GenotypeMatrix` or its children. + :ivar ld_boundaries: The LD boundaries for each variant in the LD matrix. """ @@ -30,6 +31,7 @@ def __init__(self, genotype_matrix): """ self.genotype_matrix = genotype_matrix + self.ld_boundaries = None # Ensure that the genotype matrix has data for a single chromosome only: if self.genotype_matrix.chromosome is None: @@ -53,8 +55,13 @@ def compute_ld_boundaries(self): :return: A 2xM matrix of LD boundaries. """ - m = self.genotype_matrix.n_snps - return np.array((np.zeros(m), np.ones(m)*m)).astype(np.int64) + + if self.ld_boundaries is None: + + m = self.genotype_matrix.n_snps + self.ld_boundaries = np.array((np.zeros(m), np.ones(m)*m)).astype(np.int64) + + return self.ld_boundaries def compute_plink_window_thresholds(self, ld_boundaries=None): """ @@ -117,7 +124,8 @@ def compute(self, delete_original=True, dtype='int8', compressor_name='zstd', - compression_level=7): + compression_level=7, + compute_spectral_properties=False): """ A utility method to compute the LD matrix and store in Zarr array format. The computes the LD matrix and stores it in Zarr array format, set its attributes, @@ -131,6 +139,8 @@ def compute(self, and integer quantized data types int8 and int16). :param compressor_name: The name of the compressor to use for the LD matrix. :param compression_level: The compression level to use for the LD matrix (1-9). + :param compute_spectral_properties: If True, compute and store information about the eigenvalues of + the LD matrix. :return: An instance of `LDMatrix` containing the computed LD matrix. @@ -184,6 +194,7 @@ def compute(self, ld_mat.set_metadata('a1', self.genotype_matrix.a1, overwrite=overwrite) ld_mat.set_metadata('a2', self.genotype_matrix.a2, overwrite=overwrite) + try: ld_mat.set_metadata('cm', self.genotype_matrix.cm_pos, overwrite=overwrite) except KeyError: @@ -191,6 +202,9 @@ def compute(self, ld_mat.set_metadata('ldscore', ld_mat.compute_ld_scores(), overwrite=overwrite) + if compute_spectral_properties: + ld_mat.set_store_attr('Min eigenvalue', ld_mat.estimate_min_eigenvalue()) + if ld_mat.validate_ld_matrix(): return ld_mat @@ -217,6 +231,7 @@ class WindowedLD(SampleLD): * [BlockLD][magenpy.stats.ld.estimator.BlockLD] :ivar genotype_matrix: The genotype matrix, an instance of `GenotypeMatrix`. + :ivar ld_boundaries: The LD boundaries for each variant in the LD matrix. :ivar window_size: The number of neighboring SNPs to consider on each side when computing LD. :ivar kb_window_size: The maximum distance in kilobases to consider when computing LD. :ivar cm_window_size: The maximum distance in centi Morgan to consider when computing LD. @@ -256,41 +271,45 @@ def compute_ld_boundaries(self): :return: A 2xM matrix of LD boundaries. """ - bounds = [] + if self.ld_boundaries is None: - m = self.genotype_matrix.n_snps - indices = np.arange(m) + bounds = [] - if self.window_size is not None: - bounds.append( - np.clip(np.array( - [indices - self.window_size, - indices + self.window_size - ] - ), a_min=0, a_max=m) - ) + m = self.genotype_matrix.n_snps + indices = np.arange(m) - from .c_utils import find_windowed_ld_boundaries + if self.window_size is not None: + bounds.append( + np.clip(np.array( + [indices - self.window_size, + indices + self.window_size + ] + ), a_min=0, a_max=m) + ) - if self.kb_window_size is not None: - bounds.append( - find_windowed_ld_boundaries(.001*self.genotype_matrix.bp_pos, - self.kb_window_size) - ) + from .c_utils import find_windowed_ld_boundaries - if self.cm_window_size is not None: - bounds.append( - find_windowed_ld_boundaries(self.genotype_matrix.cm_pos, - self.cm_window_size) - ) + if self.kb_window_size is not None: + bounds.append( + find_windowed_ld_boundaries(.001*self.genotype_matrix.bp_pos, + self.kb_window_size) + ) - if len(bounds) == 1: - return bounds[0] - else: - return np.array([ - np.maximum.reduce([b[0, :] for b in bounds]), - np.minimum.reduce([b[1, :] for b in bounds]) - ]) + if self.cm_window_size is not None: + bounds.append( + find_windowed_ld_boundaries(self.genotype_matrix.cm_pos, + self.cm_window_size) + ) + + if len(bounds) == 1: + self.ld_boundaries = bounds[0] + else: + self.ld_boundaries = np.array([ + np.maximum.reduce([b[0, :] for b in bounds]), + np.minimum.reduce([b[1, :] for b in bounds]) + ]) + + return self.ld_boundaries def compute(self, output_dir, @@ -299,7 +318,8 @@ def compute(self, delete_original=True, dtype='int8', compressor_name='zstd', - compression_level=7): + compression_level=7, + compute_spectral_properties=False,): """ Compute the windowed LD matrix and store in Zarr array format. @@ -311,14 +331,17 @@ def compute(self, :param dtype: The data type for the entries of the LD matrix. :param compressor_name: The name of the compressor to use for the LD matrix. :param compression_level: The compression level to use for the LD matrix (1-9). + :param compute_spectral_properties: If True, compute and store information about the eigenvalues of + the LD matrix. - :return: An instance of `LDMatrix` containing the computed LD matrix. + :return: An instance of `LDMatrix` encapsulating the computed LD matrix, its attributes, and metadata. """ ld_mat = super().compute(output_dir, temp_dir, overwrite=overwrite, delete_original=delete_original, + compute_spectral_properties=compute_spectral_properties, dtype=dtype, compressor_name=compressor_name, compression_level=compression_level) @@ -337,6 +360,17 @@ def compute(self, ld_mat.set_store_attr('Estimator properties', w_properties) + if compute_spectral_properties: + eigs, block_bounds = ld_mat.estimate_min_eigenvalue(block_size=self.window_size, + block_size_kb=self.kb_window_size, + block_size_cm=self.cm_window_size, + return_block_boundaries=True) + + ld_mat.set_store_attr('Min eigenvalue per block', { + 'Min eigenvalue': list(eigs), + 'Block boundaries': list(block_bounds) + }) + return ld_mat @@ -360,6 +394,7 @@ class ShrinkageLD(SampleLD): * [BlockLD][magenpy.stats.ld.estimator.BlockLD] :ivar genotype_matrix: The genotype matrix, an instance of `GenotypeMatrix`. + :ivar ld_boundaries: The LD boundaries for each variant in the LD matrix. :ivar genetic_map_ne: The effective population size (Ne) from which the genetic map is derived. :ivar genetic_map_sample_size: The sample size used to infer the genetic map. :ivar threshold: The shrinkage cutoff below which the LD is set to zero. @@ -394,11 +429,14 @@ def compute_ld_boundaries(self): :return: A 2xM matrix of LD boundaries. """ - from .c_utils import find_shrinkage_ld_boundaries - return find_shrinkage_ld_boundaries(self.genotype_matrix.cm_pos, - self.genetic_map_ne, - self.genetic_map_sample_size, - self.threshold) + if self.ld_boundaries is None: + from .c_utils import find_shrinkage_ld_boundaries + self.ld_boundaries = find_shrinkage_ld_boundaries(self.genotype_matrix.cm_pos, + self.genetic_map_ne, + self.genetic_map_sample_size, + self.threshold) + + return self.ld_boundaries def compute(self, output_dir, @@ -408,6 +446,7 @@ def compute(self, dtype='int8', compressor_name='zstd', compression_level=7, + compute_spectral_properties=False, chunk_size=1000): """ @@ -427,10 +466,12 @@ def compute(self, :param dtype: The data type for the entries of the LD matrix. :param compressor_name: The name of the compressor to use for the LD matrix. :param compression_level: The compression level to use for the LD matrix (1-9). + :param compute_spectral_properties: If True, compute and store information about the eigenvalues of + the LD matrix. :param chunk_size: An optional parameter that sets the maximum number of rows processed simultaneously. The smaller the `chunk_size`, the less memory requirements needed for the shrinkage step. - :return: An instance of `LDMatrix` containing the computed LD matrix. + :return: An instance of `LDMatrix` encapsulating the computed LD matrix, its attributes, and metadata. """ @@ -438,6 +479,7 @@ def compute(self, temp_dir, overwrite=overwrite, delete_original=delete_original, + compute_spectral_properties=False, # Compute after shrinkage dtype=dtype, compressor_name=compressor_name, compression_level=compression_level) @@ -460,6 +502,23 @@ def compute(self, 'Threshold': self.threshold }) + if compute_spectral_properties: + + ld_mat.set_store_attr('Min eigenvalue', ld_mat.estimate_min_eigenvalue()) + + cm_ld_bounds_start = self.genotype_matrix.cm_pos[self.ld_boundaries[0, :]] + cm_ld_bounds_end = self.genotype_matrix.cm_pos[self.ld_boundaries[1, :] - 1] + + median_dist_cm = np.median(cm_ld_bounds_end - cm_ld_bounds_start) + + eigs, block_bounds = ld_mat.estimate_min_eigenvalue(block_size_cm=median_dist_cm, + return_block_boundaries=True) + + ld_mat.set_store_attr('Min eigenvalue per block', { + 'Min eigenvalue': list(eigs), + 'Block boundaries': list(block_bounds) + }) + return ld_mat @@ -483,6 +542,7 @@ class BlockLD(SampleLD): * [ShrinkageLD][magenpy.stats.ld.estimator.ShrinkageLD] :ivar genotype_matrix: The genotype matrix, an instance of `GenotypeMatrix`. + :ivar ld_boundaries: The LD boundaries for each variant in the LD matrix. :ivar ld_blocks: The LD blocks, a Bx2 matrix where B is the number of blocks and the columns are the start and end of each block, respectively. @@ -497,7 +557,7 @@ def __init__(self, :param genotype_matrix: The genotype matrix, an instance of `GenotypeMatrix`. :param ld_blocks: The LD blocks, a Bx2 matrix where B is the number of blocks and the - columns are the start and end of each block, respectively. + columns are the start and end of each block in units of base pair, respectively. :param ld_blocks_file: The path to the LD blocks file """ @@ -508,6 +568,8 @@ def __init__(self, if ld_blocks is None: from ...parsers.misc_parsers import parse_ld_block_data self.ld_blocks = parse_ld_block_data(ld_blocks_file)[self.genotype_matrix.chromosome] + else: + self.ld_blocks = ld_blocks def compute_ld_boundaries(self): """ @@ -516,8 +578,11 @@ def compute_ld_boundaries(self): :return: A 2xM matrix of LD boundaries. """ - from .c_utils import find_ld_block_boundaries - return find_ld_block_boundaries(self.genotype_matrix.bp_pos, self.ld_blocks) + if self.ld_boundaries is None: + from .c_utils import find_ld_block_boundaries + self.ld_boundaries = find_ld_block_boundaries(self.genotype_matrix.bp_pos, self.ld_blocks) + + return self.ld_boundaries def compute(self, output_dir, @@ -526,6 +591,7 @@ def compute(self, delete_original=True, dtype='int8', compressor_name='zstd', + compute_spectral_properties=False, compression_level=7): """ @@ -535,17 +601,20 @@ def compute(self, :param temp_dir: A temporary directory to store intermediate files and results. :param overwrite: If True, overwrite any existing LD matrices in `temp_dir` and `output_dir`. :param delete_original: If True, deletes dense or intermediate LD matrices generated along the way. + :param compute_spectral_properties: If True, compute and store information about the eigenvalues of + the LD matrix. :param dtype: The data type for the entries of the LD matrix. :param compressor_name: The name of the compressor to use for the LD matrix. :param compression_level: The compression level to use for the LD matrix (1-9). - :return: An instance of `LDMatrix` containing the computed LD matrix. + :return: An instance of `LDMatrix` encapsulating the computed LD matrix, its attributes, and metadata. """ ld_mat = super().compute(output_dir, temp_dir, overwrite=overwrite, delete_original=delete_original, + compute_spectral_properties=compute_spectral_properties, dtype=dtype, compressor_name=compressor_name, compression_level=compression_level) @@ -556,4 +625,14 @@ def compute(self, 'LD blocks': self.ld_blocks.tolist() }) + if compute_spectral_properties: + + blocks = np.concatenate([[0], np.unique(self.ld_boundaries[1, :])]) + eigs, block_bounds = ld_mat.estimate_min_eigenvalue(blocks=blocks, return_block_boundaries=True) + + ld_mat.set_store_attr('Min eigenvalue per block', { + 'Min eigenvalue': list(eigs), + 'Block boundaries': list(block_bounds) + }) + return ld_mat diff --git a/magenpy/stats/ld/ld_utils.hpp b/magenpy/stats/ld/ld_utils.hpp new file mode 100644 index 0000000..41a035b --- /dev/null +++ b/magenpy/stats/ld/ld_utils.hpp @@ -0,0 +1,244 @@ +#ifndef LD_UTLS_H +#define LD_UTLS_H + +#include +#include +#include +#include +#include +#include + +// Check for and include `cblas`: +#ifdef HAVE_CBLAS + #include +#endif + +// Check for and include `omp`: +#ifdef _OPENMP + #include +#endif + + +/* ----------------------------- */ +// Helper system-related functions to check for BLAS and OpenMP support + +bool omp_supported() { + /* Check if OpenMP is supported by examining compiler flags. */ + #ifdef _OPENMP + return true; + #else + return false; + #endif +} + +bool blas_supported() { + /* Check if BLAS is supported by examining compiler flags. */ + #ifdef HAVE_CBLAS + return true; + #else + return false; + #endif +} + +/* ------------------------------ */ +// Dot product functions + +// Define a function pointer for the dot product functions `dot` and `blas_dot`: +template +using dot_func_pt = typename std::enable_if::value && std::is_arithmetic::value, T>::type (*)(T*, U*, int); + +/* * * * * */ + +template +typename std::enable_if::value && std::is_arithmetic::value, T>::type +dot(T* x, U* y, int size) { + /* Perform dot product between two vectors x and y, each of length `size` + + :param x: Pointer to the first element of the first vector + :param y: Pointer to the first element of the second vector + :param size: Length of the vectors + + */ + + T s = 0.; + + #ifdef _OPENMP + #ifndef _WIN32 + #pragma omp simd + #endif + #endif + for (int i = 0; i < size; ++i) { + s += x[i]*static_cast(y[i]); + } + return s; +} + +/* * * * * */ + +template +typename std::enable_if::value && std::is_arithmetic::value, T>::type +blas_dot(T* x, U* y, int size) { + /* + Use BLAS (if available) to perform dot product + between two vectors x and y, each of length `size`. + + :param x: Pointer to the first element of the first vector + :param y: Pointer to the first element of the second vector + :param size: Length of the vectors + */ + + #ifdef HAVE_CBLAS + int incx = 1; + int incy = 1; + + if constexpr (std::is_same::value) { + if constexpr (std::is_same::value) { + return cblas_sdot(size, x, incx, y, incy); + } else { + // Handles the case where y is any data type that is not a float: + std::vector y_float(size); + std::transform(y, y + size, y_float.begin(), [](U val) { return static_cast(val);}); + return cblas_sdot(size, x, incx, y_float.data(), incy); + } + } + else if constexpr (std::is_same::value) { + if constexpr (std::is_same::value) { + return cblas_ddot(size, x, incx, y, incy); + } else { + // Handles the case where y is any data type that is not a double: + std::vector y_double(size); + std::transform(y, y + size, y_double.begin(), [](U val) { return static_cast(val);}); + return cblas_ddot(size, x, incx, y_double.data(), incy); + } + } + #else + return dot(x, y, size); + #endif +} + +/* * * * * */ + +// Define a function pointer for the axpy functions `axpy` and `blas_axpy`: +template +using axpy_func_pt = typename std::enable_if::value && std::is_arithmetic::value, void>::type (*)(T*, U*, T, int); + +/* * * * * */ + +template +typename std::enable_if::value && std::is_arithmetic::value, void>::type +axpy(T* x, U* y, T alpha, int size) { + /* + Perform axpy operation on two vectors x and y, each of length `size`. + axpy is a standard linear algebra operation that performs + element-wise addition and multiplication: + x := x + a*y. + */ + + #ifdef _OPENMP + #ifndef _WIN32 + #pragma omp simd + #endif + #endif + for (int i = 0; i < size; ++i) { + x[i] += static_cast(y[i]) * alpha; + } +} + +/* * * * * */ + +template +typename std::enable_if::value && std::is_arithmetic::value, void>::type +blas_axpy(T *y, U *x, T alpha, int size) { + /* + Use BLAS (if available) to perform axpy operation on two vectors x and y, + each of length `size`. + axpy is a standard linear algebra operation that performs + element-wise addition and multiplication: + x := x + a*y. + */ + + #ifdef HAVE_CBLAS + int incx = 1; + int incy = 1; + + if constexpr (std::is_same::value) { + if constexpr (std::is_same::value) { + cblas_saxpy(size, alpha, x, incx, y, incy); + } else { + // Handles the case where x is any data type that is not a float: + std::vector x_float(size); + std::transform(x, x + size, x_float.begin(), [](U val) { return static_cast(val);}); + cblas_saxpy(size, alpha, x_float.data(), incx, y, incy); + } + } + else if constexpr (std::is_same::value) { + if constexpr (std::is_same::value) { + cblas_daxpy(size, alpha, x, incx, y, incy); + } else { + // Handles the case where x is any data type that is not a float: + std::vector x_double(size); + std::transform(x, x + size, x_double.begin(), [](U val) { return static_cast(val);}); + cblas_daxpy(size, alpha, x_double.data(), incx, y, incy); + } + } + #else + axpy(y, x, alpha, size); + #endif +} + + +template +typename std::enable_if::value && std::is_arithmetic::value && std::is_integral::value, void>::type +ld_matrix_dot(int c_size, + I* ld_indptr, + U* ld_data, + T* vec, + T* out, + T dq_scale, + bool include_lower_triangle, + bool include_upper_triangle, + bool include_diag, + int threads) { + /* + Perform matrix-vector multiplication between an upper-triangular matrix where the entries + are stored in a compressed format (CSR) and a vector `vec`. This function assumes that + the entries of the matrix are contiguous along the diagonal (not including the diagonal itself). + The result is stored in the output vector `out` of the same length and data type as `vec`. + + The function is parallelized using OpenMP if the compiler supports it. It also uses BLAS + if the library is available on the user's system. Finally, the function is templated to allow + the user to pass quantized matrices and vectors of different data types (e.g., float, double, etc.). + If the matrix is quantized, ensure to pass the correct scaling factor `dq_scale` to the function. + + The function gives flexibility to perform matrix-vector product assuming a full symmetric matrix, + with and without the diagonal, and with and without the lower and upper triangles. These options + can be specified by setting the corresponding boolean flags `include_lower_triangle`, + `include_upper_triangle`, and `include_diag`. + */ + + I ld_start, ld_end; + + #ifdef _OPENMP + #pragma omp parallel for private(ld_start, ld_end) schedule(static) num_threads(threads) + #endif + for (int j = 0; j < c_size; ++j) { + + ld_start = ld_indptr[j]; + ld_end = ld_indptr[j + 1]; + + if (include_upper_triangle){ + out[j] += dq_scale*blas_dot(vec + j + 1, ld_data + ld_start, ld_end - ld_start); + } + + if (include_lower_triangle){ + blas_axpy(out + j + 1, ld_data + ld_start, dq_scale*vec[j], ld_end - ld_start); + } + + if (include_diag) { + out[j] += vec[j]; + } + + } +} + +#endif // LD_UTLS_H diff --git a/magenpy/stats/ld/utils.py b/magenpy/stats/ld/utils.py index 603ad28..2feb412 100644 --- a/magenpy/stats/ld/utils.py +++ b/magenpy/stats/ld/utils.py @@ -29,7 +29,7 @@ def move_ld_store(z_arr, target_path, overwrite=True): def delete_ld_store(ld_mat): """ Delete the LD store from disk. - :param ld_mat: An LDMatrix object + :param ld_mat: An `LDMatrix` object """ try: @@ -140,6 +140,9 @@ def shrink_ld_matrix(ld_mat_obj, https://github.com/stephenslab/rss/blob/master/misc/get_corr.R + TODO: Re-write this function without using CSR data structures. + Also, consider saving shrinkage factor per pair of variants instead of shrunk LD entries? + :param ld_mat_obj: An `LDMatrix` object encapsulating the LD matrix whose entries we wish to shrink. :param cm_pos: The position of each variant in the LD matrix in centi Morgan. :param maf_var: A vector of the variance in minor allele frequency (MAF) for each SNP in the LD matrix. Should be @@ -200,7 +203,15 @@ def harmonic_series_sum(n): end_row = min((chunk_idx+1)*chunk_size, len(ld_mat_obj)) # Load the subset of the LD matrix specified by chunk_size. - csr_mat = ld_mat_obj.load_rows(start_row=start_row, end_row=end_row, dtype=np.float32) + csr_mat = ld_mat_obj.load_data( + start_row=start_row, + end_row=end_row, + return_symmetric=False, + return_square=False, + keep_original_shape=True, + return_as_csr=True, + dtype=np.float32 + ) # Get the relevant portion of indices and pointers from the CSR matrix: indptr = global_indptr[start_row:end_row+1] @@ -242,6 +253,8 @@ def estimate_rows_per_chunk(rows, cols, dtype='int8', mem_size=128): :param cols: Total number of columns. If sparse matrix with uneven columns, provide average column size. :param dtype: The data type for the matrix entries. :param mem_size: Size of the chunk in memory (MB) + + :return: The estimated number of rows per chunk. """ matrix_size = rows * cols * np.dtype(dtype).itemsize / 1024 ** 2 @@ -273,6 +286,8 @@ def compute_ld_plink1p9(genotype_matrix, and integer quantized data types int8 and int16). :param compressor_name: The name of the compressor to use for the Zarr arrays. :param compression_level: The compression level to use for the Zarr arrays (1-9). + + :return: An LDMatrix object """ from ...utils.executors import plink1Executor @@ -389,6 +404,8 @@ def compute_ld_xarray(genotype_matrix, and integer quantized data types int8 and int16). :param compressor_name: The name of the compressor to use for the Zarr arrays. :param compression_level: The compression level to use for the Zarr arrays (1-9). + + :return: An LDMatrix object """ from ...GenotypeMatrix import xarrayGenotypeMatrix diff --git a/magenpy/utils/compute_utils.py b/magenpy/utils/compute_utils.py index 2fa6884..a5ee951 100644 --- a/magenpy/utils/compute_utils.py +++ b/magenpy/utils/compute_utils.py @@ -9,6 +9,7 @@ def generate_slice_dictionary(vec): delineating the start and end positions of each element in the vector. :param vec: A numpy array + :return: A dictionary of slices """ vals, idx = np.unique(vec, return_index=True) @@ -37,6 +38,8 @@ def intersect_arrays(arr1, arr2, return_index=False): :param arr1: The first array :param arr2: The second array :param return_index: Return the index of shared elements in the first array + + :return: A numpy array of shared elements or their indices """ # NOTE: For best and consistent results, we cast all data types to `str` @@ -51,9 +54,22 @@ def intersect_arrays(arr1, arr2, return_index=False): return common_elements['ID'].values +def is_numeric(obj): + """ + Check if a python object is numeric. This function handles + numpy arrays and scalars. + :param obj: A python object + :return: True if the object is numeric, False otherwise. + """ + if isinstance(obj, np.ndarray): + return np.issubdtype(obj.dtype, np.number) + else: + return np.issubdtype(type(obj), np.number) + + def iterable(arg): """ - Check if an object is iterable (but not a string). + Check if an object is iterable, but not a string. :param arg: A python object. :return: True if the object is iterable, False otherwise. """ diff --git a/magenpy/utils/data_utils.py b/magenpy/utils/data_utils.py index feaa97e..d04cdc9 100644 --- a/magenpy/utils/data_utils.py +++ b/magenpy/utils/data_utils.py @@ -28,7 +28,8 @@ def lrld_path(): Which is based on the work of - > Anderson, Carl A., et al. "Data quality control in genetic case-control association studies." Nature protocols 5.9 (2010): 1564-1573. + > Anderson, Carl A., et al. "Data quality control in genetic case-control association studies." + Nature protocols 5.9 (2010): 1564-1573. :return: The path of the attached BED file containing long-range linkage disequilibrium (LD) regions in the human genome. The coordinates are in hg19/GRCh37. diff --git a/magenpy/utils/executors.py b/magenpy/utils/executors.py index eea748c..cdf0489 100644 --- a/magenpy/utils/executors.py +++ b/magenpy/utils/executors.py @@ -1,4 +1,4 @@ -from .system_utils import is_cmd_tool, run_shell_script, available_cpu +from .system_utils import is_cmd_tool, run_shell_script from magenpy import get_option @@ -7,20 +7,17 @@ class plink2Executor(object): A wrapper class for interfacing with the `plink2` command line tool. """ - def __init__(self, threads='auto', verbose=True): + def __init__(self, threads=None, verbose=True): """ Initialize the plink2 executor - :param threads: The number of threads to use for computations. If set to 'auto', the number of - available CPUs will be used. + :param threads: The number of threads to use for computations. If None, the number of + threads will be set to 1. :type threads: int or str :param verbose: Whether to print the output of the command :type verbose: bool """ - if threads == 'auto': - self.threads = available_cpu() - else: - self.threads = threads + self.threads = threads or 1 self.plink2_path = get_option('plink2_path') @@ -56,25 +53,22 @@ class plink1Executor(object): A wrapper class for interfacing with the `plink1.9` command line tool. """ - def __init__(self, threads='auto', verbose=True): + def __init__(self, threads=None, verbose=True): """ Initialize the plink1.9 executor - :param threads: The number of threads to use for computations. If set to 'auto', the number of - available CPUs will be used. + :param threads: The number of threads to use for computations. If None, the number of + threads will be set to 1. :type threads: int or str :param verbose: Whether to print the output of the command :type verbose: bool """ - if threads == 'auto': - self.threads = available_cpu() - else: - self.threads = threads + self.threads = threads or 1 self.plink1_path = get_option('plink1.9_path') if not is_cmd_tool(self.plink1_path): - raise Exception(f"Did not find the executable for plink at: {self.plink1_path}") + raise Exception(f"Did not find the executable for plink1.9 at: {self.plink1_path}") self.verbose = verbose diff --git a/magenpy/utils/system_utils.py b/magenpy/utils/system_utils.py index 0a24eb4..412e051 100644 --- a/magenpy/utils/system_utils.py +++ b/magenpy/utils/system_utils.py @@ -117,12 +117,28 @@ def makedir(dirs): raise +def glob_s3_path(path): + """ + Get the list of files/folders in the provided AWS S3 path. This works with wildcards. + + :param path: A string with the S3 path to list files/folders from. + :return: A list of strings with the full paths of the files/folders. + """ + + import s3fs + s3 = s3fs.S3FileSystem(anon=True) + + return s3.glob(path) + + def get_filenames(path, extension=None): """ Obtain valid and full path names given the provided `path` or prefix and extensions. :param path: A string with the path prefix or full path. :param extension: The extension for the class of files to search for. + + :return: A list of strings with the full paths of the files/folders. """ if osp.isdir(path): diff --git a/pyproject.toml b/pyproject.toml index a15f1e0..1f70e20 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,8 +3,10 @@ requires = [ "setuptools", "wheel", - "Cython", - "oldest-supported-numpy" + "cython", + "extension-helpers", + "oldest-supported-numpy", + "pkgconfig" ] build-backend = "setuptools.build_meta" diff --git a/requirements-optional.txt b/requirements-optional.txt index b62db07..77f5b55 100644 --- a/requirements-optional.txt +++ b/requirements-optional.txt @@ -1,3 +1,4 @@ matplotlib seaborn statsmodels +s3fs diff --git a/requirements.txt b/requirements.txt index 9365d39..ecbe65c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,8 @@ dask<=2024.1.0 # Seen installation issues with newer versions scipy -numpy +numpy<2 pandas pandas-plink psutil tqdm -zarr +zarr<3 diff --git a/setup.py b/setup.py index 4743c81..8bc92ba 100644 --- a/setup.py +++ b/setup.py @@ -1,5 +1,9 @@ from setuptools import setup, Extension, find_packages +from extension_helpers import add_openmp_flags_if_available +from extension_helpers._openmp_helpers import check_openmp_support +import pkgconfig import numpy as np +import warnings import os try: @@ -7,9 +11,138 @@ except ImportError: cythonize = None +# ------------------------------------------------------ +# Find and set BLAS-related flags and paths: + + +def find_blas_libraries(): + """ + Find BLAS libraries on the system using pkg-config. + This function will return the include directories (compiler flags) + and the linker flags to enable building the C/C++/Cython extensions + that require BLAS (or whose performance would be enhanced with BLAS). + + We use pkg-config (as encapsulated in the `pkgconfig` Python package) + to perform this search. Note that we augment the pkg-config + search path with the conda library path (if available) to + enable linking against BLAS libraries installed via Conda. + + :return: A dictionary with the following keys: + * 'found': A boolean indicating whether BLAS libraries were found. + * 'include_dirs': A list of include directories (compiler flags). + * 'extra_link_args': A list of linker flags. + * 'define_macros': A list of macros to define. + * 'libraries': A list of libraries to link against. + """ + + # STEP 0: Get the current pkg-config search path: + current_pkg_config_path = os.getenv("PKG_CONFIG_PATH", "") + + # STEP 1: Augment the pkg-config search path with + # the path of the current Conda environment (if exists). + # This can leverage BLAS libraries installed via Conda. + + conda_path = os.getenv("CONDA_PREFIX") + + if conda_path is not None: + conda_pkgconfig_path = os.path.join(conda_path, 'lib/pkgconfig') + if os.path.isdir(conda_pkgconfig_path): + current_pkg_config_path += ":" + conda_pkgconfig_path + + # STEP 2: Add the updated path to the environment variable: + os.environ["PKG_CONFIG_PATH"] = current_pkg_config_path + + # STEP 3: Get all pkg-config packages and filter to + # those that have "blas" in the name. + blas_packages = [pkg for pkg in pkgconfig.list_all() + if "blas" in pkg] + + # First check: Make sure that compiler flags are defined and a + # valid cblas.h header file exists in the include directory: + if len(blas_packages) >= 1: + + blas_packages = [pkg for pkg in blas_packages + if pkgconfig.cflags(pkg) and + os.path.isfile(os.path.join(pkgconfig.variables(pkg)['includedir'], 'cblas.h'))] + + # If there remains more than one library after the previous + # search and filtering steps, then apply some heuristics + # to select the most relevant one: + if len(blas_packages) > 1: + # Check if the information about the most relevant library + # can be inferred from numpy. Note that this interface from + # numpy changes quite often between versions, so it's not + # a reliable check. But in case it works on some systems, + # we use it to link to the same library as numpy: + try: + for pkg in blas_packages: + if pkg in np.__config__.get_info('blas_opt')['libraries']: + blas_packages = [pkg] + break + except (KeyError, AttributeError): + pass + + # If there are still multiple libraries, then apply some + # additional heuristics (based on name matching) to select + # the most relevant one. Some libraries (e.g. flexiblas) are published with support for 64bit + # and they expose libraries for non-BLAS API (with the _api suffix). + # Ignore these here if that is the case? + if len(blas_packages) > 1: + # Some libraries (e.g. flexiblas) are published with support for 64bit + # and they expose libraries for non-BLAS API (with the _api suffix). + # Ignore these here if that is the case? + + idx_to_remove = set() + + for pkg1 in blas_packages: + if pkg1 != 'blas': + for i, pkg2 in enumerate(blas_packages): + if pkg1 != pkg2 and pkg1 in pkg2: + idx_to_remove.add(i) + + blas_packages = [pkg for i, pkg in enumerate(blas_packages) if i not in idx_to_remove] + + # After applying all the heuristics, out of all the remaining libraries, + # select the first one in the list. Not the greatest solution, maybe + # down the line we can use the same BLAS order as numpy. + if len(blas_packages) >= 1: + final_blas_pkg = blas_packages[0] + else: + final_blas_pkg = None + + # STEP 4: If a relevant BLAS package was found, extract the flags + # needed for building the Cython/C/C++ extensions: + + if final_blas_pkg is not None: + blas_info = pkgconfig.parse(final_blas_pkg) + blas_info['define_macros'] = [('HAVE_CBLAS', None)] + else: + blas_info = { + 'include_dirs': [], + 'library_dirs': [], + 'libraries': [], + 'define_macros': [], + } + warnings.warn(""" + ********************* WARNING ********************* + BLAS library header files not found on your system. + This may slow down some computations. If you are + using conda, we recommend installing BLAS libraries + beforehand. + ********************* WARNING ********************* + """, stacklevel=2) + + return blas_info + + +blas_flags = find_blas_libraries() + +# ------------------------------------------------------ + # ------------------------------------------------------ # Cython dependencies: + def no_cythonize(extensions, **_ignore): """ Copied from: @@ -33,8 +166,12 @@ def no_cythonize(extensions, **_ignore): extensions = [ Extension("magenpy.stats.ld.c_utils", sources=["magenpy/stats/ld/c_utils.pyx"], - include_dirs=[np.get_include()], - define_macros=[("NPY_NO_DEPRECATED_API", "NPY_1_7_API_VERSION")], + language="c++", + libraries=blas_flags['libraries'], + include_dirs=[np.get_include()] + blas_flags['include_dirs'], + library_dirs=blas_flags['library_dirs'], + define_macros=[("NPY_NO_DEPRECATED_API", "NPY_1_7_API_VERSION")] + blas_flags['define_macros'], + extra_compile_args=["-O3", "-std=c++17"] ), # Not ready yet: # Extension("magenpy.stats.score.score_cpp", diff --git a/tests/conda_manual_testing.sh b/tests/conda_manual_testing.sh index 04232c1..ec7655d 100644 --- a/tests/conda_manual_testing.sh +++ b/tests/conda_manual_testing.sh @@ -22,7 +22,7 @@ do conda activate "magenpy$version" # Add some of the required dependencies: - conda install -c conda-forge -c anaconda pip wheel compilers -y + conda install -c conda-forge -c anaconda pip wheel compilers openblas -y # Check python version: python --version diff --git a/tests/test_ld.py b/tests/test_ld.py index ea8da98..4e334ad 100644 --- a/tests/test_ld.py +++ b/tests/test_ld.py @@ -53,6 +53,7 @@ def test_windowed_ld_computation(gdl_object): gdl_object.compute_ld('windowed', gdl_object.output_dir, + compute_spectral_properties=True, window_size=500, kb_window_size=100, cm_window_size=3.) @@ -98,7 +99,8 @@ def test_block_ld_computation(gdl_object): Test the LD computation functionality according to the Block estimator """ - ld_block_url = "https://bitbucket.org/nygcresearch/ldetect-data/raw/ac125e47bf7ff3e90be31f278a7b6a61daaba0dc/EUR/fourier_ls-all.bed" + ld_block_url = ("https://bitbucket.org/nygcresearch/ldetect-data/raw/" + "ac125e47bf7ff3e90be31f278a7b6a61daaba0dc/EUR/fourier_ls-all.bed") gdl_object.compute_ld('block', gdl_object.output_dir, ld_blocks_file=ld_block_url) gdl_object.harmonize_data()