Skip to content

Commit

Permalink
skip rbridge exec & move folders
Browse files Browse the repository at this point in the history
  • Loading branch information
fenke committed Jan 20, 2025
1 parent 856c4b5 commit 6dfcd27
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 20 deletions.
36 changes: 18 additions & 18 deletions corebridge/rscript.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
'run_rscript_wait', 'run_rscript_nowait', 'release_script_lock', 'AICoreRScriptModule',
'snake_case_to_camel_case', 'recursive_flatten_nested_data']

# %% ../nbs/02_rscriptbridge.ipynb 4
# %% ../nbs/02_rscriptbridge.ipynb 5
import os, logging, json, hashlib
import typing,fcntl, subprocess
import traceback
Expand All @@ -23,10 +23,10 @@
from .core import *


# %% ../nbs/02_rscriptbridge.ipynb 6
# %% ../nbs/02_rscriptbridge.ipynb 7
syslog = logging.getLogger(__name__)

# %% ../nbs/02_rscriptbridge.ipynb 9
# %% ../nbs/02_rscriptbridge.ipynb 10
def get_asset_path(script_name, assets_dir:str):
return os.path.join(assets_dir, script_name)
def get_rscript_libpath(save_dir:str):
Expand All @@ -35,14 +35,14 @@ def get_save_path(datafile_name:str, save_dir:str):
return os.path.join(save_dir, datafile_name)


# %% ../nbs/02_rscriptbridge.ipynb 42
# %% ../nbs/02_rscriptbridge.ipynb 43
def get_rscript_env(libfolder:str):
if os.environ.get('R_LIBS_USER'):
return dict(**os.environ)
else:
return dict(**os.environ, R_LIBS_USER=str(libfolder))

# %% ../nbs/02_rscriptbridge.ipynb 49
# %% ../nbs/02_rscriptbridge.ipynb 50
def check_rscript_libs(libs:list, libfolder:str):
"""Quick check if for all the R packages in libs a folder exists in libfolder"""
return all([os.path.exists(os.path.join(libfolder, L)) for L in libs])
Expand All @@ -69,7 +69,7 @@ def check_rscript_lib(lib:str, libfolder:str) -> bool:
print('STDOUT\n', run_script_result.stdout.decode('UTF-8'))
return run_script_result.returncode == 0

# %% ../nbs/02_rscriptbridge.ipynb 55
# %% ../nbs/02_rscriptbridge.ipynb 56
def install_R_package_wait(pkg:str|list, workdir:str, repo='https://cloud.r-project.org'):
"""
Checks and if neccesary installs an R package
Expand Down Expand Up @@ -119,7 +119,7 @@ def install_R_package_wait(pkg:str|list, workdir:str, repo='https://cloud.r-proj



# %% ../nbs/02_rscriptbridge.ipynb 61
# %% ../nbs/02_rscriptbridge.ipynb 62
def unpack_assets(assets_dir:str, save_dir:str):
"""
Unpack the assets folder to the save_dir
Expand All @@ -131,7 +131,7 @@ def unpack_assets(assets_dir:str, save_dir:str):
)
return unpack_result

# %% ../nbs/02_rscriptbridge.ipynb 78
# %% ../nbs/02_rscriptbridge.ipynb 79
read_chunk_size = 1024 * 32
def calc_hash_from_flowobject(flow_object:dict)->str:
'''Calculate a unique hash for a given flow object'''
Expand Down Expand Up @@ -168,7 +168,7 @@ def calc_hash_from_data_files(flow_object:dict, save_dir:str)->str:
return calc_hash_from_files(list(flow_object['in'].values()) + list(flow_object['out'].values()), save_dir)


# %% ../nbs/02_rscriptbridge.ipynb 83
# %% ../nbs/02_rscriptbridge.ipynb 84
def check_script_inputs(flow_object:dict, workdir:str)->bool:
"""
Check if the input files for a script are up-to-date, returns True if up-to-date.
Expand All @@ -182,7 +182,7 @@ def check_script_inputs(flow_object:dict, workdir:str)->bool:

return int(md5_check_result.returncode) == 0

# %% ../nbs/02_rscriptbridge.ipynb 86
# %% ../nbs/02_rscriptbridge.ipynb 87
def check_script_output(flow_object:dict, workdir:str)->bool:
"""
Check if the output files for a script exist, returns True if they all exist.
Expand All @@ -194,7 +194,7 @@ def check_script_output(flow_object:dict, workdir:str)->bool:
syslog.debug(f"Output files for Flow object: {flow_object['name']}: {list(zip(flow_object['out'], files_exist))}")
return all(files_exist)

# %% ../nbs/02_rscriptbridge.ipynb 89
# %% ../nbs/02_rscriptbridge.ipynb 90
def generate_checksum_file(flow_object:dict, workdir:str)->bool:
"""Generates the checksum file for a given flow object"""

Expand All @@ -212,7 +212,7 @@ def generate_checksum_file(flow_object:dict, workdir:str)->bool:

return md5_encode_result.returncode == 0 and check_script_inputs(flow_object, workdir)

# %% ../nbs/02_rscriptbridge.ipynb 98
# %% ../nbs/02_rscriptbridge.ipynb 99
def run_rscript_wait(flow_object, assets_dir:str, save_dir:str):
""" Run a script in R
args:
Expand Down Expand Up @@ -258,7 +258,7 @@ def run_rscript_wait(flow_object, assets_dir:str, save_dir:str):
return check_script_output(flow_object, save_dir) and generate_checksum_file(flow_object, save_dir)


# %% ../nbs/02_rscriptbridge.ipynb 105
# %% ../nbs/02_rscriptbridge.ipynb 106
RScriptProcess = namedtuple('RScriptProcess', ['flow_object', 'lock_file', 'stdout','stderr', 'popen_args', 'popen'])

#### Asynchronous RScript processing ------------------------------------------------
Expand Down Expand Up @@ -396,14 +396,14 @@ def get_temp_path(lname):

run_rscript_nowait.lock_objects = {}

# %% ../nbs/02_rscriptbridge.ipynb 106
# %% ../nbs/02_rscriptbridge.ipynb 107
def release_script_lock(flow_object, save_dir):
process = run_rscript_nowait.lock_objects.get(flow_object['name'])
if process.popen and process.popen.poll() is not None:
syslog.debug(f"Closing lockfile {process.lock_file.name}")
process.lock_file.close()

# %% ../nbs/02_rscriptbridge.ipynb 112
# %% ../nbs/02_rscriptbridge.ipynb 113
class AICoreRScriptModule(AICoreModuleBase):
def __init__(self,
flow_mapping:dict, # scripts flow map
Expand Down Expand Up @@ -467,7 +467,7 @@ def get_flow_status(self):



# %% ../nbs/02_rscriptbridge.ipynb 114
# %% ../nbs/02_rscriptbridge.ipynb 115
@patch
def update_flow(self:AICoreRScriptModule):
workdir = self.get_rscript_workdir()
Expand Down Expand Up @@ -502,7 +502,7 @@ def update_flow(self:AICoreRScriptModule):
return self.get_flow_status()


# %% ../nbs/02_rscriptbridge.ipynb 125
# %% ../nbs/02_rscriptbridge.ipynb 126
def snake_case_to_camel_case(snake_case:str) -> str:
splittext = snake_case.split('_')
return ''.join([x.capitalize() if n > 0 else x for x,n in zip(splittext, range(len(splittext)))])
Expand Down Expand Up @@ -557,7 +557,7 @@ def recursive_flatten_nested_data(
)


# %% ../nbs/02_rscriptbridge.ipynb 129
# %% ../nbs/02_rscriptbridge.ipynb 130
@patch
def write_uploaded_data(
self:AICoreRScriptModule,
Expand Down
14 changes: 12 additions & 2 deletions nbs/02_rscriptbridge.ipynb
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
{
"cells": [
{
"cell_type": "raw",
"metadata": {},
"source": [
"---\n",
"skip_exec: true\n",
"---"
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand Down Expand Up @@ -97,8 +106,9 @@
"outputs": [],
"source": [
"\n",
"assets_dir = os.path.join(os.path.abspath(os.getcwd()), 'assets', 'rscript')\n",
"save_dir = os.path.join(os.path.abspath(os.getcwd()), 'saves', 'rscript')\n"
"assets_dir = os.path.join(os.path.abspath(os.getcwd()), '..', 'corebridge', 'assets', 'rscript')\n",
"save_dir = os.path.join(os.path.abspath(os.getcwd()), '..', 'corebridge', 'saves', 'rscript')\n",
"''"
]
},
{
Expand Down

0 comments on commit 6dfcd27

Please sign in to comment.