Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --timeout-second and retry to callVariant #844

Merged
merged 3 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Added `--graph-output-dir` to save graph data in json.

- Added `--timeout-seconds` to callVariant.

## Fixed

- Fixed `summarizeFasta` that SEC and W2F on fusion peptides are ignored. #789
Expand Down
64 changes: 53 additions & 11 deletions moPepGen/cli/call_variant_peptide.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ def add_subparser_call_variant(subparsers:argparse._SubParsersAction):
' when there are local regions that are heavily mutated. When creating'
' the cleavage graph, nodes containing variants larger than this value'
' are skipped. Setting to -1 will avoid checking for this.',
default=7,
default=(7,),
nargs='+',
metavar='<number>'
)
p.add_argument(
Expand All @@ -90,7 +91,8 @@ def add_subparser_call_variant(subparsers:argparse._SubParsersAction):
help='Additional variants allowed for every miscleavage. This argument'
' is used together with --max-variants-per-node to handle hypermutated'
' regions. Setting to -1 will avoid checking for this.',
default=2,
default=(2,),
nargs='+',
metavar='<number>'
)
p.add_argument(
Expand All @@ -114,6 +116,12 @@ def add_subparser_call_variant(subparsers:argparse._SubParsersAction):
help='Process only noncanonical transcripts of fusion transcripts and'
' circRNA. Canonical transcripts are skipped.'
)
p.add_argument(
'--timeout-seconds',
type=int,
default=1800,
help='Time out in seconds for each transcript.'
)
p.add_argument(
'--verbose-level',
type=int,
Expand Down Expand Up @@ -167,8 +175,8 @@ def __init__(self, args:argparse.Namespace):
min_mw=float(args.min_mw),
min_length=args.min_length,
max_length=args.max_length,
max_variants_per_node = args.max_variants_per_node,
additional_variants_per_misc = args.additional_variants_per_misc,
max_variants_per_node = args.max_variants_per_node[0],
additional_variants_per_misc = args.additional_variants_per_misc[0],
min_nodes_to_collapse = args.min_nodes_to_collapse,
naa_to_collapse = args.naa_to_collapse
)
Expand Down Expand Up @@ -272,6 +280,8 @@ def write_pgraphs(self, tx_id:str, pgraphs:TypePGraphs):
Dict[str, svgraph.PeptideVariantGraph],
Dict[str, svgraph.PeptideVariantGraph]
]
# pylint: disable=unused-argument
@common.timeout()
def call_variant_peptides_wrapper(tx_id:str,
variant_series:seqvar.TranscriptionalVariantSeries,
tx_seqs:Dict[str, dna.DNASeqRecordWithCoordinates],
Expand All @@ -283,7 +293,8 @@ def call_variant_peptides_wrapper(tx_id:str,
max_adjacent_as_mnv:bool,
truncate_sec:bool,
w2f_reassignment:bool,
save_graph:bool
save_graph:bool,
**kwargs
) -> Tuple[Set[aa.AminoAcidSeqRecord], str, TypeDGraphs, TypePGraphs]:
""" wrapper function to call variant peptides """
peptide_pool:List[Set[aa.AminoAcidSeqRecord]] = []
Expand Down Expand Up @@ -375,9 +386,34 @@ def call_variant_peptides_wrapper(tx_id:str,

return peptide_pool, tx_id, dgraphs, pgraphs

def wrapper(dispatch):
""" wrapper for ParallelPool """
return call_variant_peptides_wrapper(**dispatch)
def caller_reducer(dispatch):
""" wrapper for ParallelPool. Also reduces the complexity if the run is timed out. """
max_variants_per_node = dispatch['max_variants_per_node']
additional_variants_per_misc = dispatch['additional_variants_per_misc']
tx_id = dispatch['tx_id']
while True:
try:
return call_variant_peptides_wrapper(**dispatch)
except TimeoutError as e:
new_dispatch = copy.copy(dispatch)
p = copy.copy(new_dispatch['cleavage_params'])
max_variants_per_node = max_variants_per_node[1:]
if len(max_variants_per_node) == 0:
max_variants_per_node = (p.max_variants_per_node - 1, )
if max_variants_per_node[0] <= 0:
raise ValueError(f"Failed to finish transcript: {tx_id}") from e
additional_variants_per_misc = additional_variants_per_misc[1:]
if len(additional_variants_per_misc) == 0:
additional_variants_per_misc = (0,)
p.max_variants_per_node = max_variants_per_node[0]
p.additional_variants_per_misc = additional_variants_per_misc[0]
new_dispatch['cleavage_params'] = p
dispatch = new_dispatch
logger(
f"Transcript {tx_id} timed out. Retry with "
f"--max-variants-per-node {p.max_variants_per_node} "
f"--additional-variants-per-misc {p.additional_variants_per_misc}"
)

def call_variant_peptide(args:argparse.Namespace) -> None:
""" Main entry point for calling variant peptide """
Expand Down Expand Up @@ -475,7 +511,10 @@ def call_variant_peptide(args:argparse.Namespace) -> None:
'max_adjacent_as_mnv': caller.max_adjacent_as_mnv,
'truncate_sec': caller.truncate_sec,
'w2f_reassignment': caller.w2f_reassignment,
'save_graph': caller.graph_output_dir is not None
'save_graph': caller.graph_output_dir is not None,
'timeout': args.timeout_seconds,
'max_variants_per_node': tuple(args.max_variants_per_node),
'additional_variants_per_misc': tuple(args.additional_variants_per_misc)
}
dispatches.append(dispatch)

Expand All @@ -485,9 +524,12 @@ def call_variant_peptide(args:argparse.Namespace) -> None:
if caller.verbose >= 2:
logger([x['tx_id'] for x in dispatches])
if caller.threads > 1:
results = process_pool.map(wrapper, dispatches)
results = process_pool.map(
caller_reducer,
dispatches
)
else:
results = [wrapper(dispatches[0])]
results = [ caller_reducer(dispatches[0]) ]

# pylint: disable=W0621
for peptide_series, tx_id, dgraphs, pgraphs in results:
Expand Down
26 changes: 26 additions & 0 deletions moPepGen/cli/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import sys
from typing import Tuple, Set, List
from pathlib import Path
import errno
import signal
import functools
import pickle
import pkg_resources
from moPepGen import aa, dna, gtf, logger, seqvar, err
Expand Down Expand Up @@ -371,3 +374,26 @@ def validate_file_format(file:Path, types:List[str]=None, check_readable:bool=Fa
else:
if not os.access(file.parent, os.W_OK):
raise PermissionError(f"Permission denied: '{file}'")

# pylint: disable=unused-argument
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
""" Decorator to raise a TimeoutError if the process runs over time. """
def decorator(func):
def _handle_timeout(signum, frame):
raise TimeoutError(error_message)

@functools.wraps(func)
def wrapper(*args, **kwargs):
if 'timeout' in kwargs and kwargs['timeout']:
seconds = kwargs['timeout']
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
finally:
signal.alarm(0)
return result

return wrapper

return decorator
5 changes: 3 additions & 2 deletions test/integration/test_call_variant_peptides.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ def create_base_args() -> argparse.Namespace:
args.max_adjacent_as_mnv = 0
args.selenocysteine_termination = False
args.w2f_reassignment = False
args.max_variants_per_node = 7
args.additional_variants_per_misc = 2
args.max_variants_per_node = [7]
args.additional_variants_per_misc = [2]
args.min_nodes_to_collapse = 30
args.naa_to_collapse = 5
args.inclusion_biotypes = None
Expand All @@ -42,6 +42,7 @@ def create_base_args() -> argparse.Namespace:
args.noncanonical_transcripts = False
args.invalid_protein_as_noncoding = False
args.threads = 1
args.timeout_seconds = 1800
return args

class TestCallVariantPeptides(TestCaseIntegration):
Expand Down
Loading