Skip to content

Add tests for mixed/zero ploidy cases #211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 107 additions & 3 deletions tests/test_vcf_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from vcztools import filter as filter_mod
from vcztools.constants import INT_FILL, INT_MISSING
from vcztools.vcf_writer import _compute_info_fields, write_vcf
from vcztools.vcf_writer import _compute_info_fields, c_chunk_to_vcf, write_vcf

from .utils import assert_vcfs_close, vcz_path_cache

Expand Down Expand Up @@ -386,7 +386,6 @@ def test_compute_info_fields():
np.testing.assert_array_equal(expected_result[key], computed_info_fields[key])



class TestApiErrors:

@pytest.fixture()
Expand All @@ -400,9 +399,114 @@ def test_samples_and_drop_genotypes(self, vcz):
):
write_vcf(vcz, sys.stdout, samples=["NA00001"], drop_genotypes=True)


def test_no_output_filter_parse_error(self, vcz):
output = StringIO()
with pytest.raises(filter_mod.ParseError):
write_vcf(vcz, output, include="Not a valid expression")
assert output.getvalue() == ""


def minimal_vcf_chunk(num_variants, num_samples, ploidy=2):
return {
"variant_position": 1 + np.arange(num_variants, dtype=np.int32),
"variant_contig": np.zeros(num_variants, dtype=np.int32),
# "variant_id": np.array(["."] * num_variants, dtype="S1"),
"variant_id": np.array(["."] * num_variants, dtype="S").reshape(
(num_variants, 1)
),
"variant_allele": np.array([("A", "T")] * num_variants),
"variant_quality": np.zeros(num_variants, dtype=np.float32),
"variant_filter": np.ones(num_variants, dtype=bool).reshape((num_variants, 1)),
"call_genotype": np.zeros((num_variants, num_samples, ploidy), dtype=np.int8),
}


def chunk_to_vcf(chunk):
filters = np.array([b"PASS"])
contigs = np.array([b"chr1"])
output = StringIO()
c_chunk_to_vcf(
chunk,
samples_selection=None,
contigs=contigs,
filters=filters,
output=output,
drop_genotypes=False,
no_update=False,
)
return output.getvalue()


def chunk_to_vcf_file(chunk):
"""
Simple function just to get the data out to a minimal file for
testing and evaluation
"""
num_samples = chunk["call_genotype"].shape[1]

output = StringIO()
print("##fileformat=VCFv4.3", file=output)
print("##contig=<ID=chr1>", file=output)
print(
'##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">',
file=output,
)
print(
"#CHROM",
"POS",
"ID",
"REF",
"ALT",
"QUAL",
"FILTER",
"INFO",
sep="\t",
end="",
file=output,
)
print(end="\t", file=output)
sample_ids = [f"x{j}" for j in range(num_samples)]
print("FORMAT", *sample_ids, sep="\t", file=output)
return output.getvalue() + chunk_to_vcf(chunk)


class TestEncoding:

def test_basic_example(self):
chunk = minimal_vcf_chunk(1, 2)
out = chunk_to_vcf(chunk)
line = "\t".join(
["chr1", "1", ".", "A", "T", "0", "PASS", ".", "GT", "0/0", "0/0"]
)
assert out == line + "\n"

def test_mixed_ploidy(self):
chunk = minimal_vcf_chunk(2, 2)
chunk["call_genotype"][0, 0, 1] = -2
chunk["call_genotype"][1, 1, 1] = -2
out = chunk_to_vcf(chunk)
lines = [
["chr1", "1", ".", "A", "T", "0", "PASS", ".", "GT", "0", "0/0"],
["chr1", "2", ".", "A", "T", "0", "PASS", ".", "GT", "0/0", "0"],
]
lines = "\n".join("\t".join(line) for line in lines)
assert out == lines + "\n"

def test_zero_ploidy(self):
chunk = minimal_vcf_chunk(2, 2)
chunk["call_genotype"][0, 0] = -2
chunk["call_genotype"][1, 1] = -2
out = chunk_to_vcf(chunk)
lines = [
["chr1", "1", ".", "A", "T", "0", "PASS", ".", "GT", "", "0/0"],
["chr1", "2", ".", "A", "T", "0", "PASS", ".", "GT", "0/0", ""],
]
lines = "\n".join("\t".join(line) for line in lines)
assert out == lines + "\n"

# NOTE bcftools/htslib doesn't like this
# [E::vcf_parse_format] Couldn't read GT data:
# value not a number or '.' at chr1:1

# with open("zero-ploidy.vcf", "w") as f:
# print(chunk_to_vcf_file(chunk), file=f, end="")
2 changes: 1 addition & 1 deletion vcztools/vcf_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def c_chunk_to_vcf(
gt_phased = chunk_data["call_genotype_phased"]
else:
# Default to unphased if call_genotype_phased not present
gt_phased = np.zeros_like(gt, dtype=bool)
gt_phased = np.zeros(gt.shape[:2], dtype=bool)

for name, array in chunk_data.items():
if (
Expand Down
Loading