Skip to content

Genie.py ruff & pyright #45

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/genie_python/block_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class BlockNamesManager:

def __init__(
self,
block_names: "PVValue",
block_names: "BlockNames",
delay_before_retry_add_monitor: float = DELAY_BEFORE_RETRYING_BLOCK_NAMES_PV_ON_FAIL,
) -> None:
"""
Expand Down
125 changes: 69 additions & 56 deletions src/genie_python/genie.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
from io import open
from typing import Any, Callable, TypedDict

import numpy as np
import numpy.typing as npt
from typing_extensions import Unpack

from genie_python.genie_api_setup import __api as _genie_api

Expand Down Expand Up @@ -43,7 +45,7 @@
from genie_python.version import VERSION # noqa E402

PVBaseValue = bool | int | float | str
PVValue = PVBaseValue | list[PVBaseValue] | npt.NDArray | None
PVValue = PVBaseValue | list[PVBaseValue] | npt.NDArray | None # pyright: ignore

print("\ngenie_python version " + VERSION)

Expand Down Expand Up @@ -99,7 +101,7 @@ def get_blocks() -> list[str]:
@usercommand
@helparglist("block")
@log_command_and_handle_exception
def get_block_units(block_name: str) -> str | dict | None:
def get_block_units(block_name: str) -> str | None:
"""
Get the physical measurement units associated with a block name.

Expand Down Expand Up @@ -177,7 +179,7 @@ def cset(
raise Exception(
"Too many arguments, please type: help(g.cset) for more information on the syntax"
)
blocks = [args[0]]
blocks = [str(args[0])]
values = [args[1]] if len(args) == 2 else [None]
elif len(kwargs) > 0:
# Check for specifying blocks via the cset(block=value) syntax
Expand Down Expand Up @@ -268,7 +270,7 @@ def _log_alarmed_block(block_name: str, alarm_state: str) -> None:
print("Block {} is in alarm: {}".format(block_name, alarm_state), file=sys.stdout)


def _warn_if_block_alarm(block: object) -> None:
def _warn_if_block_alarm(block: str) -> None:
"""
Checks whether a block is in an alarmed state and warn user (inc log)

Expand All @@ -282,7 +284,7 @@ def _warn_if_block_alarm(block: object) -> None:
_log_alarmed_block(alarm[0], alarm_type)


def _print_from_cget(block_details: dict[str, str]) -> None:
def _print_from_cget(block_details: _CgetReturn) -> None:
"""
Prints the values obtained through cget into a
human readable format, used for cshow.
Expand Down Expand Up @@ -414,11 +416,11 @@ def waitfor(
if block is not None:
raise Exception("Can set waitfor for only one block at a time")
block = k
value = pars[k]
value = pars[k] # pyright: ignore
else:
raise ValueError("Block named '{}' did not exist.".format(k))
# Check that wait_for object exists
if _genie_api.waitfor is None:
if _genie_api.waitfor is None: # pyright: ignore
raise Exception("Cannot execute waitfor - try calling set_instrument first")
# Warn if highlimit and lowlimit are round correct way
check_lowlimit_against_highlimit(lowlimit, highlimit)
Expand Down Expand Up @@ -448,7 +450,7 @@ def waitfor(
@log_command_and_handle_exception
def waitfor_block(
block: str,
value: bool | int | float | str | None = None,
value: PVValue | None = None,
lowlimit: float | None = None,
highlimit: float | None = None,
maxwait: float | None = None,
Expand Down Expand Up @@ -477,7 +479,7 @@ def waitfor_block(
... "myblock", value=123, early_exit=lambda: cget("myblock_limit_reached")["value"] != 0
... )
"""
if _genie_api.waitfor is None:
if _genie_api.waitfor is None: # pyright: ignore
raise Exception("Cannot execute waitfor_block - try calling set_instrument first")
# Warn if highlimit and lowlimit are round correct way
check_lowlimit_against_highlimit(lowlimit, highlimit)
Expand Down Expand Up @@ -527,7 +529,7 @@ def waitfor_time(
)
if any(t is not None and t < 0 for t in (seconds, minutes, hours)):
raise ValueError("Cannot execute waitfor_time - Time parameters cannot be negative")
if _genie_api.waitfor is None:
if _genie_api.waitfor is None: # pyright: ignore
raise TypeError("Cannot execute waitfor_time - try calling set_instrument first")
_genie_api.waitfor.start_waiting(
seconds=seconds, minutes=minutes, hours=hours, time=time, quiet=quiet
Expand All @@ -537,7 +539,7 @@ def waitfor_time(
@usercommand
@helparglist("frames")
@log_command_and_handle_exception
def waitfor_frames(frames: int, quiet: bool = False) -> None:
def waitfor_frames(frames: int | None = None, quiet: bool = False) -> None:
"""
Interrupts execution to wait for number of total good frames to reach parameter value

Expand All @@ -555,15 +557,15 @@ def waitfor_frames(frames: int, quiet: bool = False) -> None:
)
if frames < 0:
raise ValueError("Cannot execute waitfor_frames - frames parameter cannot be negative")
if _genie_api.waitfor is None:
if _genie_api.waitfor is None: # pyright: ignore
raise Exception("Cannot execute waitfor_frames - try calling set_instrument first")
_genie_api.waitfor.start_waiting(frames=frames, quiet=quiet)


@usercommand
@helparglist("raw_frames")
@log_command_and_handle_exception
def waitfor_raw_frames(raw_frames: int, quiet: bool = False) -> None:
def waitfor_raw_frames(raw_frames: int | None = None, quiet: bool = False) -> None:
"""
Interrupts execution to wait for number of total raw frames to reach parameter value

Expand All @@ -584,7 +586,7 @@ def waitfor_raw_frames(raw_frames: int, quiet: bool = False) -> None:
raise ValueError(
"Cannot execute waitfor_raw_frames - raw_frames parameter cannot be negative"
)
if _genie_api.waitfor is None:
if _genie_api.waitfor is None: # pyright: ignore
raise Exception("Cannot execute waitfor_raw_frames - try calling set_instrument first")
_genie_api.waitfor.start_waiting(raw_frames=raw_frames, quiet=quiet)

Expand All @@ -604,15 +606,15 @@ def waitfor_uamps(uamps: float, quiet: bool = False) -> None:

>>> waitfor_uamps(115.5)
"""
if _genie_api.waitfor is None:
if _genie_api.waitfor is None: # pyright: ignore
raise Exception("Cannot execute waitfor_uamps - try calling set_instrument first")
_genie_api.waitfor.start_waiting(uamps=uamps, quiet=quiet)


@usercommand
@helparglist("mevents")
@log_command_and_handle_exception
def waitfor_mevents(mevents: float, quiet: bool = False) -> None:
def waitfor_mevents(mevents: float | None = None, quiet: bool = False) -> None:
"""
Interrupts execution to wait for number of millions of events to reach parameter value

Expand All @@ -631,7 +633,7 @@ def waitfor_mevents(mevents: float, quiet: bool = False) -> None:
)
if mevents < 0:
raise ValueError("Cannot execute waitfor_mevents - mevents parameter cannot be negative")
if _genie_api.waitfor is None:
if _genie_api.waitfor is None: # pyright: ignore
raise Exception("Cannot execute waitfor_mevents - try calling set_instrument first")
_genie_api.waitfor.start_waiting(mevents=mevents, quiet=quiet)

Expand Down Expand Up @@ -661,7 +663,7 @@ def waitfor_runstate(
>>> waitfor_runstate("paused", onexit=True)
"""
# Check that wait_for object exists
if _genie_api.waitfor is None:
if _genie_api.waitfor is None: # pyright: ignore
raise Exception("Cannot execute waitfor_runstate - try calling set_instrument first")
_genie_api.waitfor.wait_for_runstate(state, maxwaitsecs, onexit, quiet)

Expand All @@ -678,7 +680,7 @@ def waitfor_move(*blocks: str | None, **kwargs: int | None) -> None:
to stop.

Args:
blocks (string, multiple, optional): the names of specific blocks to wait for
blocks (string, multiple): the names of specific blocks to wait for
start_timeout (int, optional): the number of seconds to wait for the
movement to begin (default = 2 seconds)
move_timeout (int, optional): the maximum number of seconds to wait for motion to stop
Expand Down Expand Up @@ -711,12 +713,12 @@ def waitfor_move(*blocks: str | None, **kwargs: int | None) -> None:
if _genie_api.wait_for_move is None:
raise Exception("Cannot execute waitfor_move - try calling set_instrument first")

if len(blocks) > 0:
if blocks != ():
# Specified blocks waitfor_move
move_blocks = list()
# Check blocks exist
for b in blocks:
if _genie_api.block_exists(b):
if _genie_api.block_exists(b): # pyright: ignore b cannot be None given line 716
move_blocks.append(b)
else:
print("Block %s does not exist, so ignoring it" % b)
Expand Down Expand Up @@ -1175,7 +1177,7 @@ def get_number_spectra() -> int:
@usercommand
@helparglist("")
@log_command_and_handle_exception
def get_spectrum_integrals(with_spec_zero: bool = True) -> npt.NDArray:
def get_spectrum_integrals(with_spec_zero: bool = True) -> npt.NDArray[np.float32]:
"""
Get the event mode spectrum integrals as numpy ND array.

Expand All @@ -1200,7 +1202,7 @@ def get_spectrum_integrals(with_spec_zero: bool = True) -> npt.NDArray:
@usercommand
@helparglist("")
@log_command_and_handle_exception
def get_spectrum_data(with_spec_zero: bool = True) -> npt.NDArray:
def get_spectrum_data(with_spec_zero: bool = True) -> npt.NDArray[np.float32]:
"""
Get the event mode spectrum data as numpy ND array.

Expand Down Expand Up @@ -1289,7 +1291,7 @@ class _GetdashboardReturn(TypedDict):
rb_number: str
user: str
title: str
display_title: str
display_title: bool
run_time: int
good_frames_total: int
good_frames_period: int
Expand All @@ -1316,33 +1318,34 @@ def get_dashboard() -> _GetdashboardReturn:
Returns:
dict: the experiment values
"""
data = dict()
data["status"] = _genie_api.dae.get_run_state()
data["run_number"] = _genie_api.dae.get_run_number()
data["rb_number"] = _genie_api.dae.get_rb_number()
data["user"] = _genie_api.dae.get_users()
data["title"] = _genie_api.dae.get_title()
data["display_title"] = _genie_api.dae.get_display_title()
data["run_time"] = _genie_api.dae.get_run_duration()
data["good_frames_total"] = _genie_api.dae.get_good_frames()
data["good_frames_period"] = _genie_api.dae.get_good_frames(True)
data["raw_frames_total"] = _genie_api.dae.get_raw_frames()
data["raw_frames_period"] = _genie_api.dae.get_raw_frames(True)
data["beam_current"] = _genie_api.dae.get_beam_current()
data["total_current"] = _genie_api.dae.get_total_uamps()
data["spectra"] = _genie_api.dae.get_num_spectra()
# data["dae_memory_used"] = genie_api.dae.get_memory_used()
# Not implemented in EPICS system
data["periods"] = _genie_api.dae.get_num_periods()
data["time_channels"] = _genie_api.dae.get_num_timechannels()
data["monitor_spectrum"] = _genie_api.dae.get_monitor_spectrum()
data["monitor_from"] = _genie_api.dae.get_monitor_from()
data["monitor_to"] = _genie_api.dae.get_monitor_to()
data["monitor_counts"] = _genie_api.dae.get_monitor_counts()
data = _GetdashboardReturn(
status=_genie_api.dae.get_run_state(),
run_number=_genie_api.dae.get_run_number(),
rb_number=_genie_api.dae.get_rb_number(),
user=_genie_api.dae.get_users(),
title=_genie_api.dae.get_title(),
display_title=_genie_api.dae.get_display_title(),
run_time=_genie_api.dae.get_run_duration(),
good_frames_total=_genie_api.dae.get_good_frames(),
good_frames_period=_genie_api.dae.get_good_frames(True),
raw_frames_total=_genie_api.dae.get_raw_frames(),
raw_frames_period=_genie_api.dae.get_raw_frames(True),
beam_current=_genie_api.dae.get_beam_current(),
total_current=_genie_api.dae.get_total_uamps(),
spectra=_genie_api.dae.get_num_spectra(),
# data["dae_memory_used"] = genie_api.dae.get_memory_used()
# Not implemented in EPICS system
periods=_genie_api.dae.get_num_periods(),
time_channels=_genie_api.dae.get_num_timechannels(),
monitor_spectrum=_genie_api.dae.get_monitor_spectrum(),
monitor_from=_genie_api.dae.get_monitor_from(),
monitor_to=_genie_api.dae.get_monitor_to(),
monitor_counts=_genie_api.dae.get_monitor_counts(),
)
return data


def _get_correct_globals() -> dict[types.FrameType, int]:
def _get_correct_globals() -> dict[str, int]:
"""
This is a hack to find the frame in which to add the script function(s).

Expand Down Expand Up @@ -1466,6 +1469,10 @@ def __load_module(name: str, directory: str) -> types.ModuleType:
if spec is None:
raise ValueError(f"Cannot find spec for module {name} in {directory}")
module = importlib.util.module_from_spec(spec)

if module.__file__ is None:
raise ValueError(f"Module {name} has no __file__ attribute")

if os.path.normpath(os.path.dirname(module.__file__)) != os.path.normpath(directory):
raise ValueError(
f"Cannot load script '{name}' as its name clashes with a standard python module "
Expand Down Expand Up @@ -1707,7 +1714,7 @@ def get_tcb_settings(trange: int, regime: int = 1) -> dict[str, int]:
@usercommand
@helparglist("[...]")
@log_command_and_handle_exception
def change_vetos(**params: bool | None) -> None:
def change_vetos(**params: bool) -> None:
"""
Change the DAE veto settings.

Expand Down Expand Up @@ -1892,8 +1899,16 @@ def define_hard_period(
configure_internal_periods(None, None, period, daq, dwell, unused, frames, output, label)


class ChangeParams(TypedDict):
title: str
period: int
nperiods: int
user: str
rb: int


@log_command_and_handle_exception
def change(**params: str | int) -> None:
def change(**params: Unpack[ChangeParams]) -> None:
"""
Change experiment parameters.

Expand Down Expand Up @@ -2038,13 +2053,11 @@ def change_rb(rb: int | str) -> None:
if isinstance(rb, int):
# If it is an int then that is fine, just cast to str as the PV is a string
rb = str(rb)
elif isinstance(rb, str):
else:
# Let's be kind in case they enter a string.
# Check string contains only digits though
if not rb.isdigit():
raise TypeError("RB number must be a number.")
else:
raise TypeError("RB number must be a number.")
_genie_api.dae.set_rb_number(rb)


Expand Down Expand Up @@ -2167,7 +2180,7 @@ def get_sample_pars() -> _GetSampleParsReturn:
@usercommand
@helparglist("name, value")
@log_command_and_handle_exception
def change_sample_par(name: str, value: bool | int | float | str | None) -> None:
def change_sample_par(name: str, value: PVValue) -> None:
"""
Set a new value for a sample parameter.

Expand Down Expand Up @@ -2223,7 +2236,7 @@ def get_beamline_pars() -> _GetbeamlineparsReturn:
@usercommand
@helparglist("name, value")
@log_command_and_handle_exception
def change_beamline_par(name: str, value: bool | int | float | str | None) -> None:
def change_beamline_par(name: str, value: PVValue) -> None:
"""
Set a new value for a beamline parameter

Expand Down Expand Up @@ -2409,7 +2422,7 @@ def set_dae_simulation_mode(mode: bool, skip_required_runstates: bool = False) -
skip_required_runstates: Ignore all checks, use with caution
"""
# skip_required_runstates must be passed as a keyword argument for wrapper to catch it.
_genie_api.dae.set_simulation_mode(mode, skip_required_runstates=skip_required_runstates)
_genie_api.dae.set_simulation_mode(mode, skip_required_runstates=skip_required_runstates) # pyright: ignore


@usercommand
Expand Down
Loading
Loading