Skip to content

Commit

Permalink
Merge pull request #844 from uclahs-cds/czhu-fix-call-variant
Browse files Browse the repository at this point in the history
Add --timeout-second and retry to callVariant
  • Loading branch information
zhuchcn authored Feb 8, 2024
2 parents 0ba2f84 + 080c1e4 commit 01dea81
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 13 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Added `--graph-output-dir` to save graph data in json.

- Added `--timeout-seconds` to callVariant.

## Fixed

- Fixed `summarizeFasta` that SEC and W2F on fusion peptides are ignored. #789
Expand Down
64 changes: 53 additions & 11 deletions moPepGen/cli/call_variant_peptide.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ def add_subparser_call_variant(subparsers:argparse._SubParsersAction):
' when there are local regions that are heavily mutated. When creating'
' the cleavage graph, nodes containing variants larger than this value'
' are skipped. Setting to -1 will avoid checking for this.',
default=7,
default=(7,),
nargs='+',
metavar='<number>'
)
p.add_argument(
Expand All @@ -90,7 +91,8 @@ def add_subparser_call_variant(subparsers:argparse._SubParsersAction):
help='Additional variants allowed for every miscleavage. This argument'
' is used together with --max-variants-per-node to handle hypermutated'
' regions. Setting to -1 will avoid checking for this.',
default=2,
default=(2,),
nargs='+',
metavar='<number>'
)
p.add_argument(
Expand All @@ -114,6 +116,12 @@ def add_subparser_call_variant(subparsers:argparse._SubParsersAction):
help='Process only noncanonical transcripts of fusion transcripts and'
' circRNA. Canonical transcripts are skipped.'
)
p.add_argument(
'--timeout-seconds',
type=int,
default=1800,
help='Time out in seconds for each transcript.'
)
p.add_argument(
'--verbose-level',
type=int,
Expand Down Expand Up @@ -167,8 +175,8 @@ def __init__(self, args:argparse.Namespace):
min_mw=float(args.min_mw),
min_length=args.min_length,
max_length=args.max_length,
max_variants_per_node = args.max_variants_per_node,
additional_variants_per_misc = args.additional_variants_per_misc,
max_variants_per_node = args.max_variants_per_node[0],
additional_variants_per_misc = args.additional_variants_per_misc[0],
min_nodes_to_collapse = args.min_nodes_to_collapse,
naa_to_collapse = args.naa_to_collapse
)
Expand Down Expand Up @@ -272,6 +280,8 @@ def write_pgraphs(self, tx_id:str, pgraphs:TypePGraphs):
Dict[str, svgraph.PeptideVariantGraph],
Dict[str, svgraph.PeptideVariantGraph]
]
# pylint: disable=unused-argument
@common.timeout()
def call_variant_peptides_wrapper(tx_id:str,
variant_series:seqvar.TranscriptionalVariantSeries,
tx_seqs:Dict[str, dna.DNASeqRecordWithCoordinates],
Expand All @@ -283,7 +293,8 @@ def call_variant_peptides_wrapper(tx_id:str,
max_adjacent_as_mnv:bool,
truncate_sec:bool,
w2f_reassignment:bool,
save_graph:bool
save_graph:bool,
**kwargs
) -> Tuple[Set[aa.AminoAcidSeqRecord], str, TypeDGraphs, TypePGraphs]:
""" wrapper function to call variant peptides """
peptide_pool:List[Set[aa.AminoAcidSeqRecord]] = []
Expand Down Expand Up @@ -375,9 +386,34 @@ def call_variant_peptides_wrapper(tx_id:str,

return peptide_pool, tx_id, dgraphs, pgraphs

def wrapper(dispatch):
""" wrapper for ParallelPool """
return call_variant_peptides_wrapper(**dispatch)
def caller_reducer(dispatch):
""" wrapper for ParallelPool. Also reduces the complexity if the run is timed out. """
max_variants_per_node = dispatch['max_variants_per_node']
additional_variants_per_misc = dispatch['additional_variants_per_misc']
tx_id = dispatch['tx_id']
while True:
try:
return call_variant_peptides_wrapper(**dispatch)
except TimeoutError as e:
new_dispatch = copy.copy(dispatch)
p = copy.copy(new_dispatch['cleavage_params'])
max_variants_per_node = max_variants_per_node[1:]
if len(max_variants_per_node) == 0:
max_variants_per_node = (p.max_variants_per_node - 1, )
if max_variants_per_node[0] <= 0:
raise ValueError(f"Failed to finish transcript: {tx_id}") from e
additional_variants_per_misc = additional_variants_per_misc[1:]
if len(additional_variants_per_misc) == 0:
additional_variants_per_misc = (0,)
p.max_variants_per_node = max_variants_per_node[0]
p.additional_variants_per_misc = additional_variants_per_misc[0]
new_dispatch['cleavage_params'] = p
dispatch = new_dispatch
logger(
f"Transcript {tx_id} timed out. Retry with "
f"--max-variants-per-node {p.max_variants_per_node} "
f"--additional-variants-per-misc {p.additional_variants_per_misc}"
)

def call_variant_peptide(args:argparse.Namespace) -> None:
""" Main entry point for calling variant peptide """
Expand Down Expand Up @@ -475,7 +511,10 @@ def call_variant_peptide(args:argparse.Namespace) -> None:
'max_adjacent_as_mnv': caller.max_adjacent_as_mnv,
'truncate_sec': caller.truncate_sec,
'w2f_reassignment': caller.w2f_reassignment,
'save_graph': caller.graph_output_dir is not None
'save_graph': caller.graph_output_dir is not None,
'timeout': args.timeout_seconds,
'max_variants_per_node': tuple(args.max_variants_per_node),
'additional_variants_per_misc': tuple(args.additional_variants_per_misc)
}
dispatches.append(dispatch)

Expand All @@ -485,9 +524,12 @@ def call_variant_peptide(args:argparse.Namespace) -> None:
if caller.verbose >= 2:
logger([x['tx_id'] for x in dispatches])
if caller.threads > 1:
results = process_pool.map(wrapper, dispatches)
results = process_pool.map(
caller_reducer,
dispatches
)
else:
results = [wrapper(dispatches[0])]
results = [ caller_reducer(dispatches[0]) ]

# pylint: disable=W0621
for peptide_series, tx_id, dgraphs, pgraphs in results:
Expand Down
26 changes: 26 additions & 0 deletions moPepGen/cli/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import sys
from typing import Tuple, Set, List
from pathlib import Path
import errno
import signal
import functools
import pickle
import pkg_resources
from moPepGen import aa, dna, gtf, logger, seqvar, err
Expand Down Expand Up @@ -371,3 +374,26 @@ def validate_file_format(file:Path, types:List[str]=None, check_readable:bool=Fa
else:
if not os.access(file.parent, os.W_OK):
raise PermissionError(f"Permission denied: '{file}'")

# pylint: disable=unused-argument
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
""" Decorator to raise a TimeoutError if the process runs over time. """
def decorator(func):
def _handle_timeout(signum, frame):
raise TimeoutError(error_message)

@functools.wraps(func)
def wrapper(*args, **kwargs):
if 'timeout' in kwargs and kwargs['timeout']:
seconds = kwargs['timeout']
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
finally:
signal.alarm(0)
return result

return wrapper

return decorator
5 changes: 3 additions & 2 deletions test/integration/test_call_variant_peptides.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ def create_base_args() -> argparse.Namespace:
args.max_adjacent_as_mnv = 0
args.selenocysteine_termination = False
args.w2f_reassignment = False
args.max_variants_per_node = 7
args.additional_variants_per_misc = 2
args.max_variants_per_node = [7]
args.additional_variants_per_misc = [2]
args.min_nodes_to_collapse = 30
args.naa_to_collapse = 5
args.inclusion_biotypes = None
Expand All @@ -42,6 +42,7 @@ def create_base_args() -> argparse.Namespace:
args.noncanonical_transcripts = False
args.invalid_protein_as_noncoding = False
args.threads = 1
args.timeout_seconds = 1800
return args

class TestCallVariantPeptides(TestCaseIntegration):
Expand Down

0 comments on commit 01dea81

Please sign in to comment.