11
11
12
12
from vcztools import filter as filter_mod
13
13
from vcztools .constants import INT_FILL , INT_MISSING
14
- from vcztools .vcf_writer import _compute_info_fields , write_vcf
14
+ from vcztools .vcf_writer import _compute_info_fields , c_chunk_to_vcf , write_vcf
15
15
16
16
from .utils import assert_vcfs_close , vcz_path_cache
17
17
@@ -386,7 +386,6 @@ def test_compute_info_fields():
386
386
np .testing .assert_array_equal (expected_result [key ], computed_info_fields [key ])
387
387
388
388
389
-
390
389
class TestApiErrors :
391
390
392
391
@pytest .fixture ()
@@ -400,9 +399,114 @@ def test_samples_and_drop_genotypes(self, vcz):
400
399
):
401
400
write_vcf (vcz , sys .stdout , samples = ["NA00001" ], drop_genotypes = True )
402
401
403
-
404
402
def test_no_output_filter_parse_error (self , vcz ):
405
403
output = StringIO ()
406
404
with pytest .raises (filter_mod .ParseError ):
407
405
write_vcf (vcz , output , include = "Not a valid expression" )
408
406
assert output .getvalue () == ""
407
+
408
+
409
+ def minimal_vcf_chunk (num_variants , num_samples , ploidy = 2 ):
410
+ return {
411
+ "variant_position" : 1 + np .arange (num_variants , dtype = np .int32 ),
412
+ "variant_contig" : np .zeros (num_variants , dtype = np .int32 ),
413
+ # "variant_id": np.array(["."] * num_variants, dtype="S1"),
414
+ "variant_id" : np .array (["." ] * num_variants , dtype = "S" ).reshape (
415
+ (num_variants , 1 )
416
+ ),
417
+ "variant_allele" : np .array ([("A" , "T" )] * num_variants ),
418
+ "variant_quality" : np .zeros (num_variants , dtype = np .float32 ),
419
+ "variant_filter" : np .ones (num_variants , dtype = bool ).reshape ((num_variants , 1 )),
420
+ "call_genotype" : np .zeros ((num_variants , num_samples , ploidy ), dtype = np .int8 ),
421
+ }
422
+
423
+
424
+ def chunk_to_vcf (chunk ):
425
+ filters = np .array ([b"PASS" ])
426
+ contigs = np .array ([b"chr1" ])
427
+ output = StringIO ()
428
+ c_chunk_to_vcf (
429
+ chunk ,
430
+ samples_selection = None ,
431
+ contigs = contigs ,
432
+ filters = filters ,
433
+ output = output ,
434
+ drop_genotypes = False ,
435
+ no_update = False ,
436
+ )
437
+ return output .getvalue ()
438
+
439
+
440
+ def chunk_to_vcf_file (chunk ):
441
+ """
442
+ Simple function just to get the data out to a minimal file for
443
+ testing and evaluation
444
+ """
445
+ num_samples = chunk ["call_genotype" ].shape [1 ]
446
+
447
+ output = StringIO ()
448
+ print ("##fileformat=VCFv4.3" , file = output )
449
+ print ("##contig=<ID=chr1>" , file = output )
450
+ print (
451
+ '##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">' ,
452
+ file = output ,
453
+ )
454
+ print (
455
+ "#CHROM" ,
456
+ "POS" ,
457
+ "ID" ,
458
+ "REF" ,
459
+ "ALT" ,
460
+ "QUAL" ,
461
+ "FILTER" ,
462
+ "INFO" ,
463
+ sep = "\t " ,
464
+ end = "" ,
465
+ file = output ,
466
+ )
467
+ print (end = "\t " , file = output )
468
+ sample_ids = [f"x{ j } " for j in range (num_samples )]
469
+ print ("FORMAT" , * sample_ids , sep = "\t " , file = output )
470
+ return output .getvalue () + chunk_to_vcf (chunk )
471
+
472
+
473
+ class TestEncoding :
474
+
475
+ def test_basic_example (self ):
476
+ chunk = minimal_vcf_chunk (1 , 2 )
477
+ out = chunk_to_vcf (chunk )
478
+ line = "\t " .join (
479
+ ["chr1" , "1" , "." , "A" , "T" , "0" , "PASS" , "." , "GT" , "0/0" , "0/0" ]
480
+ )
481
+ assert out == line + "\n "
482
+
483
+ def test_mixed_ploidy (self ):
484
+ chunk = minimal_vcf_chunk (2 , 2 )
485
+ chunk ["call_genotype" ][0 , 0 , 1 ] = - 2
486
+ chunk ["call_genotype" ][1 , 1 , 1 ] = - 2
487
+ out = chunk_to_vcf (chunk )
488
+ lines = [
489
+ ["chr1" , "1" , "." , "A" , "T" , "0" , "PASS" , "." , "GT" , "0" , "0/0" ],
490
+ ["chr1" , "2" , "." , "A" , "T" , "0" , "PASS" , "." , "GT" , "0/0" , "0" ],
491
+ ]
492
+ lines = "\n " .join ("\t " .join (line ) for line in lines )
493
+ assert out == lines + "\n "
494
+
495
+ def test_zero_ploidy (self ):
496
+ chunk = minimal_vcf_chunk (2 , 2 )
497
+ chunk ["call_genotype" ][0 , 0 ] = - 2
498
+ chunk ["call_genotype" ][1 , 1 ] = - 2
499
+ out = chunk_to_vcf (chunk )
500
+ lines = [
501
+ ["chr1" , "1" , "." , "A" , "T" , "0" , "PASS" , "." , "GT" , "" , "0/0" ],
502
+ ["chr1" , "2" , "." , "A" , "T" , "0" , "PASS" , "." , "GT" , "0/0" , "" ],
503
+ ]
504
+ lines = "\n " .join ("\t " .join (line ) for line in lines )
505
+ assert out == lines + "\n "
506
+
507
+ # NOTE bcftools/htslib doesn't like this
508
+ # [E::vcf_parse_format] Couldn't read GT data:
509
+ # value not a number or '.' at chr1:1
510
+
511
+ # with open("zero-ploidy.vcf", "w") as f:
512
+ # print(chunk_to_vcf_file(chunk), file=f, end="")
0 commit comments