From 58b0019d79c29eeaf4d59cde6b2801b1293b1c69 Mon Sep 17 00:00:00 2001 From: Jerome Kelleher Date: Wed, 28 May 2025 20:15:29 +0100 Subject: [PATCH] Add tests for mixed/zero ploidy cases --- tests/test_vcf_writer.py | 110 +++++++++++++++++++++++++++++++++++++-- vcztools/vcf_writer.py | 2 +- 2 files changed, 108 insertions(+), 4 deletions(-) diff --git a/tests/test_vcf_writer.py b/tests/test_vcf_writer.py index 1f32705..d2b98e6 100644 --- a/tests/test_vcf_writer.py +++ b/tests/test_vcf_writer.py @@ -11,7 +11,7 @@ from vcztools import filter as filter_mod from vcztools.constants import INT_FILL, INT_MISSING -from vcztools.vcf_writer import _compute_info_fields, write_vcf +from vcztools.vcf_writer import _compute_info_fields, c_chunk_to_vcf, write_vcf from .utils import assert_vcfs_close, vcz_path_cache @@ -386,7 +386,6 @@ def test_compute_info_fields(): np.testing.assert_array_equal(expected_result[key], computed_info_fields[key]) - class TestApiErrors: @pytest.fixture() @@ -400,9 +399,114 @@ def test_samples_and_drop_genotypes(self, vcz): ): write_vcf(vcz, sys.stdout, samples=["NA00001"], drop_genotypes=True) - def test_no_output_filter_parse_error(self, vcz): output = StringIO() with pytest.raises(filter_mod.ParseError): write_vcf(vcz, output, include="Not a valid expression") assert output.getvalue() == "" + + +def minimal_vcf_chunk(num_variants, num_samples, ploidy=2): + return { + "variant_position": 1 + np.arange(num_variants, dtype=np.int32), + "variant_contig": np.zeros(num_variants, dtype=np.int32), + # "variant_id": np.array(["."] * num_variants, dtype="S1"), + "variant_id": np.array(["."] * num_variants, dtype="S").reshape( + (num_variants, 1) + ), + "variant_allele": np.array([("A", "T")] * num_variants), + "variant_quality": np.zeros(num_variants, dtype=np.float32), + "variant_filter": np.ones(num_variants, dtype=bool).reshape((num_variants, 1)), + "call_genotype": np.zeros((num_variants, num_samples, ploidy), dtype=np.int8), + } + + +def chunk_to_vcf(chunk): + filters = np.array([b"PASS"]) + contigs = np.array([b"chr1"]) + output = StringIO() + c_chunk_to_vcf( + chunk, + samples_selection=None, + contigs=contigs, + filters=filters, + output=output, + drop_genotypes=False, + no_update=False, + ) + return output.getvalue() + + +def chunk_to_vcf_file(chunk): + """ + Simple function just to get the data out to a minimal file for + testing and evaluation + """ + num_samples = chunk["call_genotype"].shape[1] + + output = StringIO() + print("##fileformat=VCFv4.3", file=output) + print("##contig=", file=output) + print( + '##FORMAT=', + file=output, + ) + print( + "#CHROM", + "POS", + "ID", + "REF", + "ALT", + "QUAL", + "FILTER", + "INFO", + sep="\t", + end="", + file=output, + ) + print(end="\t", file=output) + sample_ids = [f"x{j}" for j in range(num_samples)] + print("FORMAT", *sample_ids, sep="\t", file=output) + return output.getvalue() + chunk_to_vcf(chunk) + + +class TestEncoding: + + def test_basic_example(self): + chunk = minimal_vcf_chunk(1, 2) + out = chunk_to_vcf(chunk) + line = "\t".join( + ["chr1", "1", ".", "A", "T", "0", "PASS", ".", "GT", "0/0", "0/0"] + ) + assert out == line + "\n" + + def test_mixed_ploidy(self): + chunk = minimal_vcf_chunk(2, 2) + chunk["call_genotype"][0, 0, 1] = -2 + chunk["call_genotype"][1, 1, 1] = -2 + out = chunk_to_vcf(chunk) + lines = [ + ["chr1", "1", ".", "A", "T", "0", "PASS", ".", "GT", "0", "0/0"], + ["chr1", "2", ".", "A", "T", "0", "PASS", ".", "GT", "0/0", "0"], + ] + lines = "\n".join("\t".join(line) for line in lines) + assert out == lines + "\n" + + def test_zero_ploidy(self): + chunk = minimal_vcf_chunk(2, 2) + chunk["call_genotype"][0, 0] = -2 + chunk["call_genotype"][1, 1] = -2 + out = chunk_to_vcf(chunk) + lines = [ + ["chr1", "1", ".", "A", "T", "0", "PASS", ".", "GT", "", "0/0"], + ["chr1", "2", ".", "A", "T", "0", "PASS", ".", "GT", "0/0", ""], + ] + lines = "\n".join("\t".join(line) for line in lines) + assert out == lines + "\n" + + # NOTE bcftools/htslib doesn't like this + # [E::vcf_parse_format] Couldn't read GT data: + # value not a number or '.' at chr1:1 + + # with open("zero-ploidy.vcf", "w") as f: + # print(chunk_to_vcf_file(chunk), file=f, end="") diff --git a/vcztools/vcf_writer.py b/vcztools/vcf_writer.py index ba0d27e..621fdcc 100644 --- a/vcztools/vcf_writer.py +++ b/vcztools/vcf_writer.py @@ -230,7 +230,7 @@ def c_chunk_to_vcf( gt_phased = chunk_data["call_genotype_phased"] else: # Default to unphased if call_genotype_phased not present - gt_phased = np.zeros_like(gt, dtype=bool) + gt_phased = np.zeros(gt.shape[:2], dtype=bool) for name, array in chunk_data.items(): if (