diff --git a/corebridge/_modidx.py b/corebridge/_modidx.py index 0b34e93..1f2160a 100644 --- a/corebridge/_modidx.py +++ b/corebridge/_modidx.py @@ -32,5 +32,12 @@ 'corebridge/core.py'), 'corebridge.core.timeseries_dataframe_to_datadict': ( 'core.html#timeseries_dataframe_to_datadict', 'corebridge/core.py')}, - 'corebridge.rscript': { 'corebridge.rscript.check_script_inputs': ( 'rscriptbridge.html#check_script_inputs', - 'corebridge/rscript.py')}}} + 'corebridge.rscript': { 'corebridge.rscript.calc_hash_from_flowobject': ( 'rscriptbridge.html#calc_hash_from_flowobject', + 'corebridge/rscript.py'), + 'corebridge.rscript.check_script_inputs': ( 'rscriptbridge.html#check_script_inputs', + 'corebridge/rscript.py'), + 'corebridge.rscript.check_script_output': ( 'rscriptbridge.html#check_script_output', + 'corebridge/rscript.py'), + 'corebridge.rscript.install_R_package': ( 'rscriptbridge.html#install_r_package', + 'corebridge/rscript.py'), + 'corebridge.rscript.run_script': ('rscriptbridge.html#run_script', 'corebridge/rscript.py')}}} diff --git a/corebridge/aicorebridge.py b/corebridge/aicorebridge.py index 9c427fe..4388193 100644 --- a/corebridge/aicorebridge.py +++ b/corebridge/aicorebridge.py @@ -29,8 +29,23 @@ except: pass -# %% ../nbs/01_aicorebridge.ipynb 7 -def build_historic_args(data, history): +# %% ../nbs/01_aicorebridge.ipynb 8 +def build_historic_args(data:pd.DataFrame, history:dict|list) -> dict: + """Create a timeseries DataFrame from historic data defined in `history`. + + Parameters + ---------- + data : pd.DataFrame + The input time-series DataFrame. + history : dict or list of dicts + Historic data definition, each item in the list is a dictionary with a startDate key to set the start of a section of historic data in the result and a column-value pair for each of the columns in the + + Returns + ------- + historic_data : dict + Historic data in dictionary format where keys are column names and values are the historic values as numpy array. + """ + if not history: return {} @@ -62,10 +77,10 @@ def build_historic_args(data, history): #return pd.DataFrame(column_data, index=data.index) -# %% ../nbs/01_aicorebridge.ipynb 10 +# %% ../nbs/01_aicorebridge.ipynb 12 class AICoreModule(): pass -# %% ../nbs/01_aicorebridge.ipynb 11 +# %% ../nbs/01_aicorebridge.ipynb 13 @patch def __init__(self:AICoreModule, processor:typing.Callable, # data processing function @@ -85,7 +100,7 @@ def __init__(self:AICoreModule, -# %% ../nbs/01_aicorebridge.ipynb 12 +# %% ../nbs/01_aicorebridge.ipynb 14 @patch def _init_processor( self:AICoreModule, @@ -99,14 +114,14 @@ def _init_processor( self.data_param, *self.call_params = list(self.processor_params.keys()) -# %% ../nbs/01_aicorebridge.ipynb 13 +# %% ../nbs/01_aicorebridge.ipynb 15 # can be overloaded @patch def call_processor(self:AICoreModule, calldata, **callargs): return self.processor(calldata, **callargs) -# %% ../nbs/01_aicorebridge.ipynb 15 +# %% ../nbs/01_aicorebridge.ipynb 17 @patch def infer(self:AICoreModule, data:dict, *_, **kwargs): try: @@ -175,7 +190,7 @@ def infer(self:AICoreModule, data:dict, *_, **kwargs): } -# %% ../nbs/01_aicorebridge.ipynb 17 +# %% ../nbs/01_aicorebridge.ipynb 19 # Specialized types for initializing annotated parameters # Add types by adding a tuple with the type name and a builder function annotated_arg_builders = { @@ -184,7 +199,7 @@ def infer(self:AICoreModule, data:dict, *_, **kwargs): ] } -# %% ../nbs/01_aicorebridge.ipynb 18 +# %% ../nbs/01_aicorebridge.ipynb 20 @patch def init_annotated_param(self:AICoreModule, param_name, value): """ @@ -211,7 +226,7 @@ def init_annotated_param(self:AICoreModule, param_name, value): -# %% ../nbs/01_aicorebridge.ipynb 19 +# %% ../nbs/01_aicorebridge.ipynb 21 @patch def get_callargs(self:AICoreModule, kwargs, history): "Get arguments for the processor call" @@ -237,7 +252,7 @@ def get_callargs(self:AICoreModule, kwargs, history): } -# %% ../nbs/01_aicorebridge.ipynb 23 +# %% ../nbs/01_aicorebridge.ipynb 25 @patch def get_call_data( self:AICoreModule, diff --git a/corebridge/rscript.py b/corebridge/rscript.py index e72274e..24d56af 100644 --- a/corebridge/rscript.py +++ b/corebridge/rscript.py @@ -1,13 +1,104 @@ # AUTOGENERATED! DO NOT EDIT! File to edit: ../nbs/02_rscriptbridge.ipynb. # %% auto 0 -__all__ = [] +__all__ = ['install_R_package', 'calc_hash_from_flowobject', 'check_script_inputs', 'check_script_output', 'run_script'] -# %% ../nbs/02_rscriptbridge.ipynb 3 +# %% ../nbs/02_rscriptbridge.ipynb 4 import json, os import subprocess -import warnings +import hashlib from functools import reduce +# %% ../nbs/02_rscriptbridge.ipynb 30 +def install_R_package(pkg:str|list): + """ + Checks and if neccesary installs an R package + Parameters + ---------- + pkg : str|list + name(s) of the package(s) + """ + + if isinstance(pkg, str): + pkg = [pkg] + + for pkg_i in pkg: + run_script_result = subprocess.run(['Rscript','-e', f"library({pkg_i})"], capture_output=True) + if run_script_result.returncode != 0: + print(f"Installing {pkg_i}") + run_script_result = subprocess.run(['Rscript','-e', f"install.packages({pkg_i}, repos='https://cloud.r-project.org')"], capture_output=True) + else: + print(f"Library {pkg_i} already installed") + + print(run_script_result.stderr.decode('UTF-8')) + + + +# %% ../nbs/02_rscriptbridge.ipynb 45 +def calc_hash_from_flowobject(flow_object:dict)->str: + return hashlib.md5(repr(flow_object).encode('UTF-8')).hexdigest() + +# %% ../nbs/02_rscriptbridge.ipynb 48 +def check_script_inputs(flow_object:dict)->bool: + """ + Check if the input files for a script are up-to-date, returns True if up-to-date. + """ + + checksum_file = get_save_path(f"input-checksum-{calc_hash_from_flowobject(flow_object)}") + md5_check_result = subprocess.run( + ['md5sum', '-c', checksum_file], + cwd=save_dir, + capture_output=True) + + return int(md5_check_result.returncode) == 0 + +# %% ../nbs/02_rscriptbridge.ipynb 51 +def check_script_output(flow_object:dict)->bool: + """ + Check if the output files for a script exist, returns True if they all exist. + """ + + return all([ + os.path.isfile(get_save_path(F)) + for F in flow_object['out'] + ]) + +# %% ../nbs/02_rscriptbridge.ipynb 54 +def run_script(flow_object): + """ Run a script in R + args: + flow_object: dict of flow object + returns: + bool: False if nothing has changed, or an update failed, + and True if a follow-up script might need to be run + + """ + + # Check if output exists and inputs have not changed and return False if + # output exists and inputs have not changed + if check_script_output(flow_object) and check_script_inputs(flow_object): + return False + + # Run script + run_script_result = subprocess.run( + ['Rscript', '--vanilla', get_asset_path(flow_object['name'])], + cwd=save_dir, + capture_output=True + ) + + # check the return code + if run_script_result.returncode: + print(f"Run returned code {run_script_result.returncode}") + print('STDOUT------------\n', run_script_result.stdout.decode('UTF-8')) + print('STDERR------------\n', run_script_result.stderr.decode('UTF-8')) + return False + + # check the output + if not check_script_output(flow_object): + print(f"Output not found for {flow_object['name']}") + return False + + return check_script_output(flow_object) +