diff --git a/src/genie_python/genie.py b/src/genie_python/genie.py index bfb852e..e2c1cb5 100644 --- a/src/genie_python/genie.py +++ b/src/genie_python/genie.py @@ -11,7 +11,9 @@ from io import open from typing import Any, Callable, TypedDict +import numpy as np import numpy.typing as npt +from typing_extensions import Unpack from genie_python.genie_api_setup import __api as _genie_api @@ -43,7 +45,8 @@ from genie_python.version import VERSION # noqa E402 PVBaseValue = bool | int | float | str -PVValue = PVBaseValue | list[PVBaseValue] | npt.NDArray | None +PVValue = PVBaseValue | list[PVBaseValue] | npt.NDArray | None # pyright: ignore +# because we don't want to make PVValue generic print("\ngenie_python version " + VERSION) @@ -99,7 +102,7 @@ def get_blocks() -> list[str]: @usercommand @helparglist("block") @log_command_and_handle_exception -def get_block_units(block_name: str) -> str | dict | None: +def get_block_units(block_name: str) -> str | None: """ Get the physical measurement units associated with a block name. @@ -177,7 +180,7 @@ def cset( raise Exception( "Too many arguments, please type: help(g.cset) for more information on the syntax" ) - blocks = [args[0]] + blocks = [str(args[0])] values = [args[1]] if len(args) == 2 else [None] elif len(kwargs) > 0: # Check for specifying blocks via the cset(block=value) syntax @@ -268,7 +271,7 @@ def _log_alarmed_block(block_name: str, alarm_state: str) -> None: print("Block {} is in alarm: {}".format(block_name, alarm_state), file=sys.stdout) -def _warn_if_block_alarm(block: object) -> None: +def _warn_if_block_alarm(block: str) -> None: """ Checks whether a block is in an alarmed state and warn user (inc log) @@ -282,7 +285,7 @@ def _warn_if_block_alarm(block: object) -> None: _log_alarmed_block(alarm[0], alarm_type) -def _print_from_cget(block_details: dict[str, str]) -> None: +def _print_from_cget(block_details: _CgetReturn) -> None: """ Prints the values obtained through cget into a human readable format, used for cshow. @@ -326,7 +329,7 @@ def cshow(block: str | None = None) -> None: @log_command_and_handle_exception def waitfor( block: str | None = None, - value: float | None = None, + value: PVValue = None, lowlimit: float | None = None, highlimit: float | None = None, maxwait: float | None = None, @@ -341,7 +344,7 @@ def waitfor( mevents: float | None = None, early_exit: Callable[[], bool] = lambda: False, quiet: bool = False, - **pars: bool | int | float | str | Callable[[None], bool] | None, + **pars: PVValue, ) -> None: """ Interrupts execution until certain conditions are met. @@ -418,7 +421,8 @@ def waitfor( else: raise ValueError("Block named '{}' did not exist.".format(k)) # Check that wait_for object exists - if _genie_api.waitfor is None: + if _genie_api.waitfor is None: # pyright: ignore + # pyright doesn't recognise that waitfor can be None raise Exception("Cannot execute waitfor - try calling set_instrument first") # Warn if highlimit and lowlimit are round correct way check_lowlimit_against_highlimit(lowlimit, highlimit) @@ -448,7 +452,7 @@ def waitfor( @log_command_and_handle_exception def waitfor_block( block: str, - value: bool | int | float | str | None = None, + value: PVValue = None, lowlimit: float | None = None, highlimit: float | None = None, maxwait: float | None = None, @@ -477,7 +481,8 @@ def waitfor_block( ... "myblock", value=123, early_exit=lambda: cget("myblock_limit_reached")["value"] != 0 ... ) """ - if _genie_api.waitfor is None: + if _genie_api.waitfor is None: # pyright: ignore + # pyright doesn't recognise that waitfor can be None raise Exception("Cannot execute waitfor_block - try calling set_instrument first") # Warn if highlimit and lowlimit are round correct way check_lowlimit_against_highlimit(lowlimit, highlimit) @@ -527,7 +532,8 @@ def waitfor_time( ) if any(t is not None and t < 0 for t in (seconds, minutes, hours)): raise ValueError("Cannot execute waitfor_time - Time parameters cannot be negative") - if _genie_api.waitfor is None: + if _genie_api.waitfor is None: # pyright: ignore + # pyright doesn't recognise that waitfor can be None raise TypeError("Cannot execute waitfor_time - try calling set_instrument first") _genie_api.waitfor.start_waiting( seconds=seconds, minutes=minutes, hours=hours, time=time, quiet=quiet @@ -537,7 +543,7 @@ def waitfor_time( @usercommand @helparglist("frames") @log_command_and_handle_exception -def waitfor_frames(frames: int, quiet: bool = False) -> None: +def waitfor_frames(frames: int | None = None, quiet: bool = False) -> None: """ Interrupts execution to wait for number of total good frames to reach parameter value @@ -555,7 +561,8 @@ def waitfor_frames(frames: int, quiet: bool = False) -> None: ) if frames < 0: raise ValueError("Cannot execute waitfor_frames - frames parameter cannot be negative") - if _genie_api.waitfor is None: + if _genie_api.waitfor is None: # pyright: ignore + # pyright doesn't recognise that waitfor can be None raise Exception("Cannot execute waitfor_frames - try calling set_instrument first") _genie_api.waitfor.start_waiting(frames=frames, quiet=quiet) @@ -563,7 +570,7 @@ def waitfor_frames(frames: int, quiet: bool = False) -> None: @usercommand @helparglist("raw_frames") @log_command_and_handle_exception -def waitfor_raw_frames(raw_frames: int, quiet: bool = False) -> None: +def waitfor_raw_frames(raw_frames: int | None = None, quiet: bool = False) -> None: """ Interrupts execution to wait for number of total raw frames to reach parameter value @@ -584,7 +591,8 @@ def waitfor_raw_frames(raw_frames: int, quiet: bool = False) -> None: raise ValueError( "Cannot execute waitfor_raw_frames - raw_frames parameter cannot be negative" ) - if _genie_api.waitfor is None: + if _genie_api.waitfor is None: # pyright: ignore + # pyright doesn't recognise that waitfor can be None raise Exception("Cannot execute waitfor_raw_frames - try calling set_instrument first") _genie_api.waitfor.start_waiting(raw_frames=raw_frames, quiet=quiet) @@ -604,7 +612,8 @@ def waitfor_uamps(uamps: float, quiet: bool = False) -> None: >>> waitfor_uamps(115.5) """ - if _genie_api.waitfor is None: + if _genie_api.waitfor is None: # pyright: ignore + # pyright doesn't recognise that waitfor can be None raise Exception("Cannot execute waitfor_uamps - try calling set_instrument first") _genie_api.waitfor.start_waiting(uamps=uamps, quiet=quiet) @@ -612,7 +621,7 @@ def waitfor_uamps(uamps: float, quiet: bool = False) -> None: @usercommand @helparglist("mevents") @log_command_and_handle_exception -def waitfor_mevents(mevents: float, quiet: bool = False) -> None: +def waitfor_mevents(mevents: float | None = None, quiet: bool = False) -> None: """ Interrupts execution to wait for number of millions of events to reach parameter value @@ -631,7 +640,8 @@ def waitfor_mevents(mevents: float, quiet: bool = False) -> None: ) if mevents < 0: raise ValueError("Cannot execute waitfor_mevents - mevents parameter cannot be negative") - if _genie_api.waitfor is None: + if _genie_api.waitfor is None: # pyright: ignore + # pyright doesn't recognise that waitfor can be None raise Exception("Cannot execute waitfor_mevents - try calling set_instrument first") _genie_api.waitfor.start_waiting(mevents=mevents, quiet=quiet) @@ -661,7 +671,8 @@ def waitfor_runstate( >>> waitfor_runstate("paused", onexit=True) """ # Check that wait_for object exists - if _genie_api.waitfor is None: + if _genie_api.waitfor is None: # pyright: ignore + # pyright doesn't recognise that waitfor can be None raise Exception("Cannot execute waitfor_runstate - try calling set_instrument first") _genie_api.waitfor.wait_for_runstate(state, maxwaitsecs, onexit, quiet) @@ -669,7 +680,7 @@ def waitfor_runstate( @usercommand @helparglist("[block, ...][, start_timeout][, move_timeout]") @log_command_and_handle_exception -def waitfor_move(*blocks: str | None, **kwargs: int | None) -> None: +def waitfor_move(*blocks: str, **kwargs: int | None) -> None: """ Wait for all motion or specific motion to complete. @@ -678,7 +689,7 @@ def waitfor_move(*blocks: str | None, **kwargs: int | None) -> None: to stop. Args: - blocks (string, multiple, optional): the names of specific blocks to wait for + blocks (string, multiple): the names of specific blocks to wait for start_timeout (int, optional): the number of seconds to wait for the movement to begin (default = 2 seconds) move_timeout (int, optional): the maximum number of seconds to wait for motion to stop @@ -1175,7 +1186,7 @@ def get_number_spectra() -> int: @usercommand @helparglist("") @log_command_and_handle_exception -def get_spectrum_integrals(with_spec_zero: bool = True) -> npt.NDArray: +def get_spectrum_integrals(with_spec_zero: bool = True) -> npt.NDArray[np.float32]: """ Get the event mode spectrum integrals as numpy ND array. @@ -1200,7 +1211,7 @@ def get_spectrum_integrals(with_spec_zero: bool = True) -> npt.NDArray: @usercommand @helparglist("") @log_command_and_handle_exception -def get_spectrum_data(with_spec_zero: bool = True) -> npt.NDArray: +def get_spectrum_data(with_spec_zero: bool = True) -> npt.NDArray[np.float32]: """ Get the event mode spectrum data as numpy ND array. @@ -1289,7 +1300,7 @@ class _GetdashboardReturn(TypedDict): rb_number: str user: str title: str - display_title: str + display_title: bool run_time: int good_frames_total: int good_frames_period: int @@ -1316,33 +1327,34 @@ def get_dashboard() -> _GetdashboardReturn: Returns: dict: the experiment values """ - data = dict() - data["status"] = _genie_api.dae.get_run_state() - data["run_number"] = _genie_api.dae.get_run_number() - data["rb_number"] = _genie_api.dae.get_rb_number() - data["user"] = _genie_api.dae.get_users() - data["title"] = _genie_api.dae.get_title() - data["display_title"] = _genie_api.dae.get_display_title() - data["run_time"] = _genie_api.dae.get_run_duration() - data["good_frames_total"] = _genie_api.dae.get_good_frames() - data["good_frames_period"] = _genie_api.dae.get_good_frames(True) - data["raw_frames_total"] = _genie_api.dae.get_raw_frames() - data["raw_frames_period"] = _genie_api.dae.get_raw_frames(True) - data["beam_current"] = _genie_api.dae.get_beam_current() - data["total_current"] = _genie_api.dae.get_total_uamps() - data["spectra"] = _genie_api.dae.get_num_spectra() - # data["dae_memory_used"] = genie_api.dae.get_memory_used() - # Not implemented in EPICS system - data["periods"] = _genie_api.dae.get_num_periods() - data["time_channels"] = _genie_api.dae.get_num_timechannels() - data["monitor_spectrum"] = _genie_api.dae.get_monitor_spectrum() - data["monitor_from"] = _genie_api.dae.get_monitor_from() - data["monitor_to"] = _genie_api.dae.get_monitor_to() - data["monitor_counts"] = _genie_api.dae.get_monitor_counts() + data = _GetdashboardReturn( + status=_genie_api.dae.get_run_state(), + run_number=_genie_api.dae.get_run_number(), + rb_number=_genie_api.dae.get_rb_number(), + user=_genie_api.dae.get_users(), + title=_genie_api.dae.get_title(), + display_title=_genie_api.dae.get_display_title(), + run_time=_genie_api.dae.get_run_duration(), + good_frames_total=_genie_api.dae.get_good_frames(), + good_frames_period=_genie_api.dae.get_good_frames(True), + raw_frames_total=_genie_api.dae.get_raw_frames(), + raw_frames_period=_genie_api.dae.get_raw_frames(True), + beam_current=_genie_api.dae.get_beam_current(), + total_current=_genie_api.dae.get_total_uamps(), + spectra=_genie_api.dae.get_num_spectra(), + # data["dae_memory_used"] = genie_api.dae.get_memory_used() + # Not implemented in EPICS system + periods=_genie_api.dae.get_num_periods(), + time_channels=_genie_api.dae.get_num_timechannels(), + monitor_spectrum=_genie_api.dae.get_monitor_spectrum(), + monitor_from=_genie_api.dae.get_monitor_from(), + monitor_to=_genie_api.dae.get_monitor_to(), + monitor_counts=_genie_api.dae.get_monitor_counts(), + ) return data -def _get_correct_globals() -> dict[types.FrameType, int]: +def _get_correct_globals() -> dict[str, int]: """ This is a hack to find the frame in which to add the script function(s). @@ -1466,6 +1478,10 @@ def __load_module(name: str, directory: str) -> types.ModuleType: if spec is None: raise ValueError(f"Cannot find spec for module {name} in {directory}") module = importlib.util.module_from_spec(spec) + + if module.__file__ is None: + raise ValueError(f"Module {name} has no __file__ attribute") + if os.path.normpath(os.path.dirname(module.__file__)) != os.path.normpath(directory): raise ValueError( f"Cannot load script '{name}' as its name clashes with a standard python module " @@ -1707,7 +1723,7 @@ def get_tcb_settings(trange: int, regime: int = 1) -> dict[str, int]: @usercommand @helparglist("[...]") @log_command_and_handle_exception -def change_vetos(**params: bool | None) -> None: +def change_vetos(**params: bool) -> None: """ Change the DAE veto settings. @@ -1892,8 +1908,16 @@ def define_hard_period( configure_internal_periods(None, None, period, daq, dwell, unused, frames, output, label) +class ChangeParams(TypedDict): + title: str + period: int + nperiods: int + user: str + rb: int + + @log_command_and_handle_exception -def change(**params: str | int) -> None: +def change(**params: Unpack[ChangeParams]) -> None: """ Change experiment parameters. @@ -2038,13 +2062,11 @@ def change_rb(rb: int | str) -> None: if isinstance(rb, int): # If it is an int then that is fine, just cast to str as the PV is a string rb = str(rb) - elif isinstance(rb, str): + else: # Let's be kind in case they enter a string. # Check string contains only digits though if not rb.isdigit(): raise TypeError("RB number must be a number.") - else: - raise TypeError("RB number must be a number.") _genie_api.dae.set_rb_number(rb) @@ -2167,7 +2189,7 @@ def get_sample_pars() -> _GetSampleParsReturn: @usercommand @helparglist("name, value") @log_command_and_handle_exception -def change_sample_par(name: str, value: bool | int | float | str | None) -> None: +def change_sample_par(name: str, value: PVValue) -> None: """ Set a new value for a sample parameter. @@ -2223,7 +2245,7 @@ def get_beamline_pars() -> _GetbeamlineparsReturn: @usercommand @helparglist("name, value") @log_command_and_handle_exception -def change_beamline_par(name: str, value: bool | int | float | str | None) -> None: +def change_beamline_par(name: str, value: PVValue) -> None: """ Set a new value for a beamline parameter diff --git a/src/genie_python/genie_dae.py b/src/genie_python/genie_dae.py index 1c98c86..e8ef8af 100644 --- a/src/genie_python/genie_dae.py +++ b/src/genie_python/genie_dae.py @@ -36,7 +36,7 @@ ) if TYPE_CHECKING: - from genie_python.genie import PVValue + from genie_python.genie import PVValue, _GetspectrumReturn from genie_python.genie_epics_api import API ## for beginrun etc. there exists both the PV specified here and also a PV with @@ -1349,7 +1349,13 @@ def _create_tcb_return_string(self, low: float, high: float, step: float, log: b return out def change_tcb( - self, low: float, high: float, step: float, trange: int, log: bool = False, regime: int = 1 + self, + low: float | None, + high: float | None, + step: float | None, + trange: int, + log: bool = False, + regime: int = 1, ) -> None: """ Change the time channel boundaries. @@ -1869,7 +1875,7 @@ def _change_period_settings(self) -> None: def get_spectrum( self, spectrum: int, period: int = 1, dist: bool = True, use_numpy: bool | None = None - ) -> dict: + ) -> "_GetspectrumReturn": """ Gets a spectrum from the DAE via a PV. @@ -1924,7 +1930,7 @@ def in_transition(self) -> bool: else: return False - def get_wiring_tables(self) -> str: + def get_wiring_tables(self) -> list[str]: """ Gets a list of wiring table choices. diff --git a/src/genie_python/genie_epics_api.py b/src/genie_python/genie_epics_api.py index 99cfcda..96dc7c8 100644 --- a/src/genie_python/genie_epics_api.py +++ b/src/genie_python/genie_epics_api.py @@ -31,7 +31,12 @@ ) if TYPE_CHECKING: - from genie_python.genie import PVValue + from genie_python.genie import ( + PVValue, + _CgetReturn, + _GetbeamlineparsReturn, + _GetSampleParsReturn, + ) RC_ENABLE = ":RC:ENABLE" RC_LOW = ":RC:LOW" @@ -735,7 +740,9 @@ def get_runcontrol_settings(self, block_name: str) -> tuple["PVValue", "PVValue" except UnableToConnectToPVException: return "UNKNOWN", "UNKNOWN", "UNKNOWN" - def check_alarms(self, blocks: list[str]) -> tuple[list[str], list[str], list[str]]: + def check_alarms( + self, blocks: typing.Tuple[str, ...] + ) -> tuple[list[str], list[str], list[str]]: """ Checks whether the specified blocks are in alarm. @@ -745,22 +752,23 @@ def check_alarms(self, blocks: list[str]) -> tuple[list[str], list[str], list[st Returns: list, list, list: the blocks in minor, major and invalid alarm """ - alarm_states = self._get_fields_from_blocks(blocks, "SEVR", "alarm state") + alarm_states = self._get_fields_from_blocks(list(blocks), "SEVR", "alarm state") minor = [t[0] for t in alarm_states if t[1] == "MINOR"] major = [t[0] for t in alarm_states if t[1] == "MAJOR"] invalid = [t[0] for t in alarm_states if t[1] == "INVALID"] return minor, major, invalid - def check_limit_violations(self, blocks: list[str]) -> list[str]: + def check_limit_violations(self, blocks: typing.Iterable[str]) -> list[str]: """ Checks whether the specified blocks have soft limit violations. Args: - blocks (list): the blocks to check + blocks (iterable): the blocks to check Returns: list: the blocks which have soft limit violations """ + violation_states = self._get_fields_from_blocks(blocks, "LVIO", "limit violation") return [t[0] for t in violation_states if t[1]] diff --git a/src/genie_python/genie_simulate_impl.py b/src/genie_python/genie_simulate_impl.py index 4262f98..47a3d4b 100644 --- a/src/genie_python/genie_simulate_impl.py +++ b/src/genie_python/genie_simulate_impl.py @@ -6,6 +6,7 @@ import xml.etree.ElementTree as ET from builtins import object, str from collections import OrderedDict +from datetime import timedelta from typing import TYPE_CHECKING, Callable import numpy as np @@ -16,7 +17,13 @@ from genie_python.utilities import require_runstate if TYPE_CHECKING: - from genie_python.genie import PVValue, _GetspectrumReturn + from genie_python.genie import ( + PVValue, + _CgetReturn, + _GetbeamlineparsReturn, + _GetSampleParsReturn, + _GetspectrumReturn, + ) from genie_python.genie_waitfor import WAITFOR_VALUE @@ -433,6 +440,21 @@ def recover_run(self) -> None: else: raise Exception("Can only be called when SETUP") + def post_recover_check(self, verbose: bool = False) -> None: + pass + + def get_time_since_begin(self, get_timedelta: bool) -> float | timedelta: + return 0.0 + + def get_events(self) -> int: + return 0 + + def get_tcb_settings(self, trange: int, regime: int = 1) -> dict: + return {} + + def get_simulation_mode(self) -> bool: + return False + def get_period(self) -> int: """ returns the current period number @@ -532,7 +554,7 @@ def get_good_frames(self, period: bool = False) -> int: def get_users(self) -> str: return self.users - def get_run_duration(self) -> float: + def get_run_duration(self) -> int: return self.run_duration def get_raw_frames(self, period: bool = False) -> int: @@ -565,6 +587,10 @@ def get_monitor_counts(self) -> int: def set_users(self, users: str) -> None: self.users = users + @require_runstate(["SETUP", "PROCESSING"]) + def set_simulation_mode(self, mode: bool) -> None: + return False + def change_start(self) -> None: """Start a change operation. The operaton is finished when change_finish is called. @@ -906,7 +932,13 @@ def change_tcb_file(self, tcb_file: str | None = None, default: bool = False) -> self.change_finish() def change_tcb( - self, low: float, high: float, step: float, trange: int, log: bool = False, regime: int = 1 + self, + low: float | None, + high: float | None, + step: float | None, + trange: int, + log: bool = False, + regime: int = 1, ) -> None: """Change the time channel boundaries. @@ -981,7 +1013,7 @@ def change_vetos(self, **params: bool) -> None: if did_change: self.change_finish() - def set_fermi_veto(self, enable: bool = None, delay: float = 1.0, width: float = 1.0) -> None: + def set_fermi_veto(self, enable: bool | None, delay: float = 1.0, width: float = 1.0) -> None: """Configure the fermi chopper veto. Parameters: @@ -1100,7 +1132,7 @@ def prefix_pv_name(self, name: str) -> str: return name def set_pv_value( - self, name: str, value: str, wait: bool = False, is_local: bool = False + self, name: str, value: "PVValue", wait: bool = False, is_local: bool = False ) -> None: if is_local: name = self.prefix_pv_name(name) @@ -1110,18 +1142,26 @@ def set_pv_value( ) def get_pv_value( - self, name: str, to_string: bool = False, attempts: int = 3, is_local: bool = False + self, + name: str, + to_string: bool = False, + attempts: int = 3, + is_local: bool = False, + use_numpy: bool = False, ) -> None: if is_local: name = self.prefix_pv_name(name) print( - "get_pv_value called (name=%s value=%s attempts=%s is_local=%s)" - % (name, to_string, attempts, is_local) + "get_pv_value called (name=%s value=%s attempts=%s is_local=%s use_numpy=%s)" + % (name, to_string, attempts, is_local, use_numpy) ) def pv_exists(self, name: str) -> bool: return True + def connected_pvs_in_list(self, pv_list: list[str], is_local: bool = False) -> list[str]: + return [] + def reload_current_config(self) -> None: pass @@ -1141,7 +1181,7 @@ def set_block_value( runcontrol: bool | None = None, lowlimit: float | None = None, highlimit: float | None = None, - wait: bool = False, + wait: bool | None = False, ) -> None: """Sets a block's values. If the block already exists, update the block. Only update values @@ -1178,7 +1218,7 @@ def set_block_value( if wait: self.waitfor.start_waiting(block=name, value=value) - def get_block_data(self, block: str, fail_fast: bool = False) -> dict(): + def get_block_data(self, block: str, fail_fast: bool = False) -> "_CgetReturn": ans = OrderedDict() ans["connected"] = True @@ -1206,16 +1246,16 @@ def set_multiple_blocks(self, names: list, values: list) -> None: def run_pre_post_cmd(self, command: str, **pars: str) -> None: pass - def get_sample_pars(self) -> dict: + def get_sample_pars(self) -> "_GetSampleParsReturn": return self.sample_pars - def set_sample_par(self, name: str, value: str) -> None: + def set_sample_par(self, name: str, value: "PVValue") -> None: self.sample_pars[name] = value - def get_beamline_pars(self) -> dict: + def get_beamline_pars(self) -> "_GetbeamlineparsReturn": return self.beamline_pars - def set_beamline_par(self, name: str, value: str) -> None: + def set_beamline_par(self, name: str, value: "PVValue") -> None: self.beamline_pars[name] = value def get_runcontrol_settings(self, name: str) -> tuple(): @@ -1225,13 +1265,13 @@ def get_runcontrol_settings(self, name: str) -> tuple(): self.block_dict[name]["highlimit"], ) - def check_alarms(*blocks: str) -> tuple[list[str], list[str], list[str]]: + def check_alarms(self, blocks: tuple[str, ...]) -> tuple[list[str], list[str], list[str]]: minor = list() major = list() invalid = list() return (minor, major, invalid) - def check_limit_violations(self, blocks: str) -> list: + def check_limit_violations(self, blocks: tuple[str, ...]) -> list: return list() def get_current_block_values(self) -> dict: @@ -1256,7 +1296,7 @@ def send_sms(self, phone_num: str, message: str) -> None: def send_email(self, address: str, message: str) -> None: print(('Email "{}" sent to {}'.format(message, address))) - def send_alert(self, message: str, inst: str) -> str: + def send_alert(self, message: str, inst: str | None) -> None: print(('Slack message "{}" sent to {}'.format(message, inst))) def get_alarm_from_block(self, block: str) -> str: diff --git a/src/genie_python/genie_waitfor.py b/src/genie_python/genie_waitfor.py index a823f22..76c1bfd 100644 --- a/src/genie_python/genie_waitfor.py +++ b/src/genie_python/genie_waitfor.py @@ -8,17 +8,16 @@ from builtins import object, str from datetime import datetime, timedelta from time import sleep, strptime -from typing import TYPE_CHECKING, Callable, TypeAlias +from typing import TYPE_CHECKING, Callable from genie_python.utilities import check_break, get_time_delta if TYPE_CHECKING: + from genie_python.genie import PVValue from genie_python.genie_epics_api import API NUMERIC_TYPE = (float, int) -WAITFOR_VALUE: TypeAlias = bool | int | float | str | None - # The time in a loop to sleep for when waiting for an event, e.g. polling a pv/block value DELAY_IN_WAIT_FOR_SLEEP_LOOP = 0.1 @@ -257,7 +256,7 @@ def __init__(self, api: "API") -> None: def start_waiting( self, block: str | None = None, - value: WAITFOR_VALUE | None = None, + value: "PVValue | None" = None, lowlimit: float | int | None = None, highlimit: float | int | None = None, maxwait: float | None = None, @@ -508,7 +507,7 @@ def _waiting_for_time(self) -> bool: def _init_wait_block( self, block: str | None, - value: WAITFOR_VALUE, + value: "PVValue", lowlimit: float | int | None, highlimit: float | int | None, quiet: bool = False, @@ -533,8 +532,8 @@ def _init_wait_block( ) def _get_block_limits( - self, value: WAITFOR_VALUE, lowlimit: float | int | None, highlimit: float | int | None - ) -> tuple[WAITFOR_VALUE | None, WAITFOR_VALUE | None]: + self, value: "PVValue", lowlimit: float | int | None, highlimit: float | int | None + ) -> tuple["PVValue | None", "PVValue | None"]: low = None high = None if value is not None: diff --git a/src/genie_python/utilities.py b/src/genie_python/utilities.py index 1c35e42..b06d718 100644 --- a/src/genie_python/utilities.py +++ b/src/genie_python/utilities.py @@ -336,7 +336,7 @@ def check_lowlimit_against_highlimit(lowlimit: float | None, highlimit: float | def require_runstate( runstates: Iterable[str], -) -> Callable[[Callable[Concatenate["Dae", P], T]], Callable[Concatenate["Dae", P], T]]: +) -> Callable[[Callable[Concatenate["Dae", ...], T]], Callable[Concatenate["Dae", ...], T]]: """ Decorator that checks for needed runstates. If skip_required_runstates is passed in as a keyword argument to the @@ -345,8 +345,8 @@ def require_runstate( runstates_string = ", ".join(runstates) def _check_runstate( - func: Callable[Concatenate["Dae", P], T], - ) -> Callable[Concatenate["Dae", P], T]: + func: Callable[Concatenate["Dae", ...], T], + ) -> Callable[Concatenate["Dae", ...], T]: @wraps(func) def _wrapper(self: "Dae", *args: P.args, **kwargs: P.kwargs) -> T: if not kwargs.pop("skip_required_runstates", False):