diff --git a/CHANGELOG.md b/CHANGELOG.md index 27471856..56766ae6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Added `--graph-output-dir` to save graph data in json. +- Added `--timeout-seconds` to callVariant. + ## Fixed - Fixed `summarizeFasta` that SEC and W2F on fusion peptides are ignored. #789 diff --git a/moPepGen/cli/call_variant_peptide.py b/moPepGen/cli/call_variant_peptide.py index f4a5ba8c..681611c4 100644 --- a/moPepGen/cli/call_variant_peptide.py +++ b/moPepGen/cli/call_variant_peptide.py @@ -81,7 +81,8 @@ def add_subparser_call_variant(subparsers:argparse._SubParsersAction): ' when there are local regions that are heavily mutated. When creating' ' the cleavage graph, nodes containing variants larger than this value' ' are skipped. Setting to -1 will avoid checking for this.', - default=7, + default=(7,), + nargs='+', metavar='' ) p.add_argument( @@ -90,7 +91,8 @@ def add_subparser_call_variant(subparsers:argparse._SubParsersAction): help='Additional variants allowed for every miscleavage. This argument' ' is used together with --max-variants-per-node to handle hypermutated' ' regions. Setting to -1 will avoid checking for this.', - default=2, + default=(2,), + nargs='+', metavar='' ) p.add_argument( @@ -114,6 +116,12 @@ def add_subparser_call_variant(subparsers:argparse._SubParsersAction): help='Process only noncanonical transcripts of fusion transcripts and' ' circRNA. Canonical transcripts are skipped.' ) + p.add_argument( + '--timeout-seconds', + type=int, + default=1800, + help='Time out in seconds for each transcript.' + ) p.add_argument( '--verbose-level', type=int, @@ -167,8 +175,8 @@ def __init__(self, args:argparse.Namespace): min_mw=float(args.min_mw), min_length=args.min_length, max_length=args.max_length, - max_variants_per_node = args.max_variants_per_node, - additional_variants_per_misc = args.additional_variants_per_misc, + max_variants_per_node = args.max_variants_per_node[0], + additional_variants_per_misc = args.additional_variants_per_misc[0], min_nodes_to_collapse = args.min_nodes_to_collapse, naa_to_collapse = args.naa_to_collapse ) @@ -272,6 +280,8 @@ def write_pgraphs(self, tx_id:str, pgraphs:TypePGraphs): Dict[str, svgraph.PeptideVariantGraph], Dict[str, svgraph.PeptideVariantGraph] ] +# pylint: disable=unused-argument +@common.timeout() def call_variant_peptides_wrapper(tx_id:str, variant_series:seqvar.TranscriptionalVariantSeries, tx_seqs:Dict[str, dna.DNASeqRecordWithCoordinates], @@ -283,7 +293,8 @@ def call_variant_peptides_wrapper(tx_id:str, max_adjacent_as_mnv:bool, truncate_sec:bool, w2f_reassignment:bool, - save_graph:bool + save_graph:bool, + **kwargs ) -> Tuple[Set[aa.AminoAcidSeqRecord], str, TypeDGraphs, TypePGraphs]: """ wrapper function to call variant peptides """ peptide_pool:List[Set[aa.AminoAcidSeqRecord]] = [] @@ -375,9 +386,34 @@ def call_variant_peptides_wrapper(tx_id:str, return peptide_pool, tx_id, dgraphs, pgraphs -def wrapper(dispatch): - """ wrapper for ParallelPool """ - return call_variant_peptides_wrapper(**dispatch) +def caller_reducer(dispatch): + """ wrapper for ParallelPool. Also reduces the complexity if the run is timed out. """ + max_variants_per_node = dispatch['max_variants_per_node'] + additional_variants_per_misc = dispatch['additional_variants_per_misc'] + tx_id = dispatch['tx_id'] + while True: + try: + return call_variant_peptides_wrapper(**dispatch) + except TimeoutError as e: + new_dispatch = copy.copy(dispatch) + p = copy.copy(new_dispatch['cleavage_params']) + max_variants_per_node = max_variants_per_node[1:] + if len(max_variants_per_node) == 0: + max_variants_per_node = (p.max_variants_per_node - 1, ) + if max_variants_per_node[0] <= 0: + raise ValueError(f"Failed to finish transcript: {tx_id}") from e + additional_variants_per_misc = additional_variants_per_misc[1:] + if len(additional_variants_per_misc) == 0: + additional_variants_per_misc = (0,) + p.max_variants_per_node = max_variants_per_node[0] + p.additional_variants_per_misc = additional_variants_per_misc[0] + new_dispatch['cleavage_params'] = p + dispatch = new_dispatch + logger( + f"Transcript {tx_id} timed out. Retry with " + f"--max-variants-per-node {p.max_variants_per_node} " + f"--additional-variants-per-misc {p.additional_variants_per_misc}" + ) def call_variant_peptide(args:argparse.Namespace) -> None: """ Main entry point for calling variant peptide """ @@ -475,7 +511,10 @@ def call_variant_peptide(args:argparse.Namespace) -> None: 'max_adjacent_as_mnv': caller.max_adjacent_as_mnv, 'truncate_sec': caller.truncate_sec, 'w2f_reassignment': caller.w2f_reassignment, - 'save_graph': caller.graph_output_dir is not None + 'save_graph': caller.graph_output_dir is not None, + 'timeout': args.timeout_seconds, + 'max_variants_per_node': tuple(args.max_variants_per_node), + 'additional_variants_per_misc': tuple(args.additional_variants_per_misc) } dispatches.append(dispatch) @@ -485,9 +524,12 @@ def call_variant_peptide(args:argparse.Namespace) -> None: if caller.verbose >= 2: logger([x['tx_id'] for x in dispatches]) if caller.threads > 1: - results = process_pool.map(wrapper, dispatches) + results = process_pool.map( + caller_reducer, + dispatches + ) else: - results = [wrapper(dispatches[0])] + results = [ caller_reducer(dispatches[0]) ] # pylint: disable=W0621 for peptide_series, tx_id, dgraphs, pgraphs in results: diff --git a/moPepGen/cli/common.py b/moPepGen/cli/common.py index ccf3bf5d..9f6ec7cb 100644 --- a/moPepGen/cli/common.py +++ b/moPepGen/cli/common.py @@ -5,6 +5,9 @@ import sys from typing import Tuple, Set, List from pathlib import Path +import errno +import signal +import functools import pickle import pkg_resources from moPepGen import aa, dna, gtf, logger, seqvar, err @@ -371,3 +374,26 @@ def validate_file_format(file:Path, types:List[str]=None, check_readable:bool=Fa else: if not os.access(file.parent, os.W_OK): raise PermissionError(f"Permission denied: '{file}'") + +# pylint: disable=unused-argument +def timeout(seconds=10, error_message=os.strerror(errno.ETIME)): + """ Decorator to raise a TimeoutError if the process runs over time. """ + def decorator(func): + def _handle_timeout(signum, frame): + raise TimeoutError(error_message) + + @functools.wraps(func) + def wrapper(*args, **kwargs): + if 'timeout' in kwargs and kwargs['timeout']: + seconds = kwargs['timeout'] + signal.signal(signal.SIGALRM, _handle_timeout) + signal.alarm(seconds) + try: + result = func(*args, **kwargs) + finally: + signal.alarm(0) + return result + + return wrapper + + return decorator diff --git a/test/integration/test_call_variant_peptides.py b/test/integration/test_call_variant_peptides.py index a33593d1..9eb1d6e5 100644 --- a/test/integration/test_call_variant_peptides.py +++ b/test/integration/test_call_variant_peptides.py @@ -25,8 +25,8 @@ def create_base_args() -> argparse.Namespace: args.max_adjacent_as_mnv = 0 args.selenocysteine_termination = False args.w2f_reassignment = False - args.max_variants_per_node = 7 - args.additional_variants_per_misc = 2 + args.max_variants_per_node = [7] + args.additional_variants_per_misc = [2] args.min_nodes_to_collapse = 30 args.naa_to_collapse = 5 args.inclusion_biotypes = None @@ -42,6 +42,7 @@ def create_base_args() -> argparse.Namespace: args.noncanonical_transcripts = False args.invalid_protein_as_noncoding = False args.threads = 1 + args.timeout_seconds = 1800 return args class TestCallVariantPeptides(TestCaseIntegration):