From 6c0844e9d5e1bca79272ab829b11577e11b02f8b Mon Sep 17 00:00:00 2001 From: zhuchcn Date: Wed, 7 Feb 2024 22:30:37 -0800 Subject: [PATCH 1/3] fix (callVariant): add --timeout-second and retry --- CHANGELOG.md | 2 + moPepGen/cli/call_variant_peptide.py | 65 ++++++++++++++++--- moPepGen/cli/common.py | 25 +++++++ .../integration/test_call_variant_peptides.py | 5 +- 4 files changed, 85 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 27471856..56766ae6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Added `--graph-output-dir` to save graph data in json. +- Added `--timeout-seconds` to callVariant. + ## Fixed - Fixed `summarizeFasta` that SEC and W2F on fusion peptides are ignored. #789 diff --git a/moPepGen/cli/call_variant_peptide.py b/moPepGen/cli/call_variant_peptide.py index f4a5ba8c..4d2f8490 100644 --- a/moPepGen/cli/call_variant_peptide.py +++ b/moPepGen/cli/call_variant_peptide.py @@ -81,7 +81,8 @@ def add_subparser_call_variant(subparsers:argparse._SubParsersAction): ' when there are local regions that are heavily mutated. When creating' ' the cleavage graph, nodes containing variants larger than this value' ' are skipped. Setting to -1 will avoid checking for this.', - default=7, + default=(7,), + nargs='+', metavar='' ) p.add_argument( @@ -90,7 +91,8 @@ def add_subparser_call_variant(subparsers:argparse._SubParsersAction): help='Additional variants allowed for every miscleavage. This argument' ' is used together with --max-variants-per-node to handle hypermutated' ' regions. Setting to -1 will avoid checking for this.', - default=2, + default=(2,), + nargs='+', metavar='' ) p.add_argument( @@ -114,6 +116,12 @@ def add_subparser_call_variant(subparsers:argparse._SubParsersAction): help='Process only noncanonical transcripts of fusion transcripts and' ' circRNA. Canonical transcripts are skipped.' ) + p.add_argument( + '--timeout-seconds', + type=int, + default=1800, + help='Time out in seconds for each transcript.' + ) p.add_argument( '--verbose-level', type=int, @@ -167,8 +175,8 @@ def __init__(self, args:argparse.Namespace): min_mw=float(args.min_mw), min_length=args.min_length, max_length=args.max_length, - max_variants_per_node = args.max_variants_per_node, - additional_variants_per_misc = args.additional_variants_per_misc, + max_variants_per_node = args.max_variants_per_node[0], + additional_variants_per_misc = args.additional_variants_per_misc[0], min_nodes_to_collapse = args.min_nodes_to_collapse, naa_to_collapse = args.naa_to_collapse ) @@ -272,6 +280,7 @@ def write_pgraphs(self, tx_id:str, pgraphs:TypePGraphs): Dict[str, svgraph.PeptideVariantGraph], Dict[str, svgraph.PeptideVariantGraph] ] +@common.timeout() def call_variant_peptides_wrapper(tx_id:str, variant_series:seqvar.TranscriptionalVariantSeries, tx_seqs:Dict[str, dna.DNASeqRecordWithCoordinates], @@ -283,7 +292,8 @@ def call_variant_peptides_wrapper(tx_id:str, max_adjacent_as_mnv:bool, truncate_sec:bool, w2f_reassignment:bool, - save_graph:bool + save_graph:bool, + timeout:int=None ) -> Tuple[Set[aa.AminoAcidSeqRecord], str, TypeDGraphs, TypePGraphs]: """ wrapper function to call variant peptides """ peptide_pool:List[Set[aa.AminoAcidSeqRecord]] = [] @@ -375,9 +385,32 @@ def call_variant_peptides_wrapper(tx_id:str, return peptide_pool, tx_id, dgraphs, pgraphs -def wrapper(dispatch): +def wrapper(dispatch, max_variants_per_node, additional_variants_per_misc): """ wrapper for ParallelPool """ - return call_variant_peptides_wrapper(**dispatch) + tx_id = dispatch['tx_id'] + while True: + try: + return call_variant_peptides_wrapper(**dispatch) + except TimeoutError: + new_dispatch = copy.copy(dispatch) + p = copy.copy(new_dispatch['cleavage_params']) + max_variants_per_node = max_variants_per_node[1:] + if len(max_variants_per_node) == 0: + max_variants_per_node = (p.max_variants_per_node - 1, ) + if max_variants_per_node[0] <= 0: + raise ValueError(f"Failed to finish transcript: {tx_id}") + additional_variants_per_misc = additional_variants_per_misc[1:] + if len(additional_variants_per_misc) == 0: + additional_variants_per_misc = (0,) + p.max_variants_per_node = max_variants_per_node[0] + p.additional_variants_per_misc = additional_variants_per_misc[0] + new_dispatch['cleavage_params'] = p + dispatch = new_dispatch + logger( + f"Transcript {tx_id} timed out. Retry with " + f"--max-variants-per-node {p.max_variants_per_node} " + f"--additional-variants-per-misc {p.additional_variants_per_misc}" + ) def call_variant_peptide(args:argparse.Namespace) -> None: """ Main entry point for calling variant peptide """ @@ -475,7 +508,8 @@ def call_variant_peptide(args:argparse.Namespace) -> None: 'max_adjacent_as_mnv': caller.max_adjacent_as_mnv, 'truncate_sec': caller.truncate_sec, 'w2f_reassignment': caller.w2f_reassignment, - 'save_graph': caller.graph_output_dir is not None + 'save_graph': caller.graph_output_dir is not None, + 'timeout': args.timeout_seconds } dispatches.append(dispatch) @@ -485,9 +519,20 @@ def call_variant_peptide(args:argparse.Namespace) -> None: if caller.verbose >= 2: logger([x['tx_id'] for x in dispatches]) if caller.threads > 1: - results = process_pool.map(wrapper, dispatches) + results = process_pool.map( + wrapper, + dispatches, + tuple(args.max_variants_per_node), + tuple(args.additional_variants_per_misc) + ) else: - results = [wrapper(dispatches[0])] + results = [ + wrapper( + dispatches[0], + tuple(args.max_variants_per_node), + tuple(args.additional_variants_per_misc) + ) + ] # pylint: disable=W0621 for peptide_series, tx_id, dgraphs, pgraphs in results: diff --git a/moPepGen/cli/common.py b/moPepGen/cli/common.py index ccf3bf5d..d5596491 100644 --- a/moPepGen/cli/common.py +++ b/moPepGen/cli/common.py @@ -5,6 +5,9 @@ import sys from typing import Tuple, Set, List from pathlib import Path +import errno +import signal +import functools import pickle import pkg_resources from moPepGen import aa, dna, gtf, logger, seqvar, err @@ -371,3 +374,25 @@ def validate_file_format(file:Path, types:List[str]=None, check_readable:bool=Fa else: if not os.access(file.parent, os.W_OK): raise PermissionError(f"Permission denied: '{file}'") + + +def timeout(seconds=10, error_message=os.strerror(errno.ETIME)): + def decorator(func): + def _handle_timeout(signum, frame): + raise TimeoutError(error_message) + + @functools.wraps(func) + def wrapper(*args, **kwargs): + if 'timeout' in kwargs and kwargs['timeout']: + seconds = kwargs['timeout'] + signal.signal(signal.SIGALRM, _handle_timeout) + signal.alarm(seconds) + try: + result = func(*args, **kwargs) + finally: + signal.alarm(0) + return result + + return wrapper + + return decorator diff --git a/test/integration/test_call_variant_peptides.py b/test/integration/test_call_variant_peptides.py index a33593d1..9eb1d6e5 100644 --- a/test/integration/test_call_variant_peptides.py +++ b/test/integration/test_call_variant_peptides.py @@ -25,8 +25,8 @@ def create_base_args() -> argparse.Namespace: args.max_adjacent_as_mnv = 0 args.selenocysteine_termination = False args.w2f_reassignment = False - args.max_variants_per_node = 7 - args.additional_variants_per_misc = 2 + args.max_variants_per_node = [7] + args.additional_variants_per_misc = [2] args.min_nodes_to_collapse = 30 args.naa_to_collapse = 5 args.inclusion_biotypes = None @@ -42,6 +42,7 @@ def create_base_args() -> argparse.Namespace: args.noncanonical_transcripts = False args.invalid_protein_as_noncoding = False args.threads = 1 + args.timeout_seconds = 1800 return args class TestCallVariantPeptides(TestCaseIntegration): From 0808ffab02f8431eb27520a2f0ec4fd8a3165d66 Mon Sep 17 00:00:00 2001 From: zhuchcn Date: Thu, 8 Feb 2024 08:47:26 -0800 Subject: [PATCH 2/3] fix (callVariant): fixed reducer to support multi threading --- moPepGen/cli/call_variant_peptide.py | 27 ++++++++++++--------------- moPepGen/cli/common.py | 1 + 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/moPepGen/cli/call_variant_peptide.py b/moPepGen/cli/call_variant_peptide.py index 4d2f8490..fbfe5936 100644 --- a/moPepGen/cli/call_variant_peptide.py +++ b/moPepGen/cli/call_variant_peptide.py @@ -280,6 +280,7 @@ def write_pgraphs(self, tx_id:str, pgraphs:TypePGraphs): Dict[str, svgraph.PeptideVariantGraph], Dict[str, svgraph.PeptideVariantGraph] ] +# pylint: disable=unused-argument @common.timeout() def call_variant_peptides_wrapper(tx_id:str, variant_series:seqvar.TranscriptionalVariantSeries, @@ -293,7 +294,7 @@ def call_variant_peptides_wrapper(tx_id:str, truncate_sec:bool, w2f_reassignment:bool, save_graph:bool, - timeout:int=None + **kwargs ) -> Tuple[Set[aa.AminoAcidSeqRecord], str, TypeDGraphs, TypePGraphs]: """ wrapper function to call variant peptides """ peptide_pool:List[Set[aa.AminoAcidSeqRecord]] = [] @@ -385,8 +386,10 @@ def call_variant_peptides_wrapper(tx_id:str, return peptide_pool, tx_id, dgraphs, pgraphs -def wrapper(dispatch, max_variants_per_node, additional_variants_per_misc): - """ wrapper for ParallelPool """ +def caller_reducer(dispatch): + """ wrapper for ParallelPool. Also reduces the complexity if the run is timed out. """ + max_variants_per_node = dispatch['max_variants_per_node'] + additional_variants_per_misc = dispatch['additional_variants_per_misc'] tx_id = dispatch['tx_id'] while True: try: @@ -509,7 +512,9 @@ def call_variant_peptide(args:argparse.Namespace) -> None: 'truncate_sec': caller.truncate_sec, 'w2f_reassignment': caller.w2f_reassignment, 'save_graph': caller.graph_output_dir is not None, - 'timeout': args.timeout_seconds + 'timeout': args.timeout_seconds, + 'max_variants_per_node': tuple(args.max_variants_per_node), + 'additional_variants_per_misc': tuple(args.additional_variants_per_misc) } dispatches.append(dispatch) @@ -520,19 +525,11 @@ def call_variant_peptide(args:argparse.Namespace) -> None: logger([x['tx_id'] for x in dispatches]) if caller.threads > 1: results = process_pool.map( - wrapper, - dispatches, - tuple(args.max_variants_per_node), - tuple(args.additional_variants_per_misc) + caller_reducer, + dispatches ) else: - results = [ - wrapper( - dispatches[0], - tuple(args.max_variants_per_node), - tuple(args.additional_variants_per_misc) - ) - ] + results = [ caller_reducer(dispatches[0]) ] # pylint: disable=W0621 for peptide_series, tx_id, dgraphs, pgraphs in results: diff --git a/moPepGen/cli/common.py b/moPepGen/cli/common.py index d5596491..b49e5a53 100644 --- a/moPepGen/cli/common.py +++ b/moPepGen/cli/common.py @@ -377,6 +377,7 @@ def validate_file_format(file:Path, types:List[str]=None, check_readable:bool=Fa def timeout(seconds=10, error_message=os.strerror(errno.ETIME)): + """ Decorator to raise a TimeoutError if the process runs over time. """ def decorator(func): def _handle_timeout(signum, frame): raise TimeoutError(error_message) From 080c1e42628763806f65d3df0124bede77a3e10d Mon Sep 17 00:00:00 2001 From: zhuchcn Date: Thu, 8 Feb 2024 08:51:01 -0800 Subject: [PATCH 3/3] style: turn off some warning and raised error from its upstream error --- moPepGen/cli/call_variant_peptide.py | 4 ++-- moPepGen/cli/common.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/moPepGen/cli/call_variant_peptide.py b/moPepGen/cli/call_variant_peptide.py index fbfe5936..681611c4 100644 --- a/moPepGen/cli/call_variant_peptide.py +++ b/moPepGen/cli/call_variant_peptide.py @@ -394,14 +394,14 @@ def caller_reducer(dispatch): while True: try: return call_variant_peptides_wrapper(**dispatch) - except TimeoutError: + except TimeoutError as e: new_dispatch = copy.copy(dispatch) p = copy.copy(new_dispatch['cleavage_params']) max_variants_per_node = max_variants_per_node[1:] if len(max_variants_per_node) == 0: max_variants_per_node = (p.max_variants_per_node - 1, ) if max_variants_per_node[0] <= 0: - raise ValueError(f"Failed to finish transcript: {tx_id}") + raise ValueError(f"Failed to finish transcript: {tx_id}") from e additional_variants_per_misc = additional_variants_per_misc[1:] if len(additional_variants_per_misc) == 0: additional_variants_per_misc = (0,) diff --git a/moPepGen/cli/common.py b/moPepGen/cli/common.py index b49e5a53..9f6ec7cb 100644 --- a/moPepGen/cli/common.py +++ b/moPepGen/cli/common.py @@ -375,7 +375,7 @@ def validate_file_format(file:Path, types:List[str]=None, check_readable:bool=Fa if not os.access(file.parent, os.W_OK): raise PermissionError(f"Permission denied: '{file}'") - +# pylint: disable=unused-argument def timeout(seconds=10, error_message=os.strerror(errno.ETIME)): """ Decorator to raise a TimeoutError if the process runs over time. """ def decorator(func):