12
12
13
13
from vcztools import filter as filter_mod
14
14
from vcztools .constants import INT_FILL , INT_MISSING
15
- from vcztools .vcf_writer import _compute_info_fields , write_vcf
15
+ from vcztools .vcf_writer import _compute_info_fields , c_chunk_to_vcf , write_vcf
16
16
17
17
from .utils import assert_vcfs_close , vcz_path_cache
18
18
@@ -387,7 +387,6 @@ def test_compute_info_fields():
387
387
np .testing .assert_array_equal (expected_result [key ], computed_info_fields [key ])
388
388
389
389
390
-
391
390
class TestApiErrors :
392
391
393
392
@pytest .fixture ()
@@ -401,9 +400,114 @@ def test_samples_and_drop_genotypes(self, vcz):
401
400
):
402
401
write_vcf (vcz , sys .stdout , samples = ["NA00001" ], drop_genotypes = True )
403
402
404
-
405
403
def test_no_output_filter_parse_error (self , vcz ):
406
404
output = StringIO ()
407
405
with pytest .raises (filter_mod .ParseError ):
408
406
write_vcf (vcz , output , include = "Not a valid expression" )
409
407
assert output .getvalue () == ""
408
+
409
+
410
+ def minimal_vcf_chunk (num_variants , num_samples , ploidy = 2 ):
411
+ return {
412
+ "variant_position" : 1 + np .arange (num_variants , dtype = np .int32 ),
413
+ "variant_contig" : np .zeros (num_variants , dtype = np .int32 ),
414
+ # "variant_id": np.array(["."] * num_variants, dtype="S1"),
415
+ "variant_id" : np .array (["." ] * num_variants , dtype = "S" ).reshape (
416
+ (num_variants , 1 )
417
+ ),
418
+ "variant_allele" : np .array ([("A" , "T" )] * num_variants ),
419
+ "variant_quality" : np .zeros (num_variants , dtype = np .float32 ),
420
+ "variant_filter" : np .ones (num_variants , dtype = bool ).reshape ((num_variants , 1 )),
421
+ "call_genotype" : np .zeros ((num_variants , num_samples , ploidy ), dtype = np .int8 ),
422
+ }
423
+
424
+
425
+ def chunk_to_vcf (chunk ):
426
+ filters = np .array ([b"PASS" ])
427
+ contigs = np .array ([b"chr1" ])
428
+ output = StringIO ()
429
+ c_chunk_to_vcf (
430
+ chunk ,
431
+ samples_selection = None ,
432
+ contigs = contigs ,
433
+ filters = filters ,
434
+ output = output ,
435
+ drop_genotypes = False ,
436
+ no_update = False ,
437
+ )
438
+ return output .getvalue ()
439
+
440
+
441
+ def chunk_to_vcf_file (chunk ):
442
+ """
443
+ Simple function just to get the data out to a minimal file for
444
+ testing and evaluation
445
+ """
446
+ num_samples = chunk ["call_genotype" ].shape [1 ]
447
+
448
+ output = StringIO ()
449
+ print ("##fileformat=VCFv4.3" , file = output )
450
+ print ("##contig=<ID=chr1>" , file = output )
451
+ print (
452
+ '##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">' ,
453
+ file = output ,
454
+ )
455
+ print (
456
+ "#CHROM" ,
457
+ "POS" ,
458
+ "ID" ,
459
+ "REF" ,
460
+ "ALT" ,
461
+ "QUAL" ,
462
+ "FILTER" ,
463
+ "INFO" ,
464
+ sep = "\t " ,
465
+ end = "" ,
466
+ file = output ,
467
+ )
468
+ print (end = "\t " , file = output )
469
+ sample_ids = [f"x{ j } " for j in range (num_samples )]
470
+ print ("FORMAT" , * sample_ids , sep = "\t " , file = output )
471
+ return output .getvalue () + chunk_to_vcf (chunk )
472
+
473
+
474
+ class TestEncoding :
475
+
476
+ def test_basic_example (self ):
477
+ chunk = minimal_vcf_chunk (1 , 2 )
478
+ out = chunk_to_vcf (chunk )
479
+ line = "\t " .join (
480
+ ["chr1" , "1" , "." , "A" , "T" , "0" , "PASS" , "." , "GT" , "0/0" , "0/0" ]
481
+ )
482
+ assert out == line + "\n "
483
+
484
+ def test_mixed_ploidy (self ):
485
+ chunk = minimal_vcf_chunk (2 , 2 )
486
+ chunk ["call_genotype" ][0 , 0 , 1 ] = - 2
487
+ chunk ["call_genotype" ][1 , 1 , 1 ] = - 2
488
+ out = chunk_to_vcf (chunk )
489
+ lines = [
490
+ ["chr1" , "1" , "." , "A" , "T" , "0" , "PASS" , "." , "GT" , "0" , "0/0" ],
491
+ ["chr1" , "2" , "." , "A" , "T" , "0" , "PASS" , "." , "GT" , "0/0" , "0" ],
492
+ ]
493
+ lines = "\n " .join ("\t " .join (line ) for line in lines )
494
+ assert out == lines + "\n "
495
+
496
+ def test_zero_ploidy (self ):
497
+ chunk = minimal_vcf_chunk (2 , 2 )
498
+ chunk ["call_genotype" ][0 , 0 ] = - 2
499
+ chunk ["call_genotype" ][1 , 1 ] = - 2
500
+ out = chunk_to_vcf (chunk )
501
+ lines = [
502
+ ["chr1" , "1" , "." , "A" , "T" , "0" , "PASS" , "." , "GT" , "" , "0/0" ],
503
+ ["chr1" , "2" , "." , "A" , "T" , "0" , "PASS" , "." , "GT" , "0/0" , "" ],
504
+ ]
505
+ lines = "\n " .join ("\t " .join (line ) for line in lines )
506
+ assert out == lines + "\n "
507
+
508
+ # NOTE bcftools/htslib doesn't like this
509
+ # [E::vcf_parse_format] Couldn't read GT data:
510
+ # value not a number or '.' at chr1:1
511
+
512
+ # with open("zero-ploidy.vcf", "w") as f:
513
+ # print(chunk_to_vcf_file(chunk), file=f, end="")
0 commit comments