Skip to content

Commit

Permalink
wrap it up
Browse files Browse the repository at this point in the history
  • Loading branch information
fenke committed Jul 16, 2024
1 parent 3eb032c commit f3a8fd3
Show file tree
Hide file tree
Showing 3 changed files with 340 additions and 86 deletions.
6 changes: 6 additions & 0 deletions corebridge/_modidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,16 @@
'corebridge/core.py')},
'corebridge.rscript': { 'corebridge.rscript.calc_hash_from_flowobject': ( 'rscriptbridge.html#calc_hash_from_flowobject',
'corebridge/rscript.py'),
'corebridge.rscript.calc_hash_from_input_files': ( 'rscriptbridge.html#calc_hash_from_input_files',
'corebridge/rscript.py'),
'corebridge.rscript.check_script_inputs': ( 'rscriptbridge.html#check_script_inputs',
'corebridge/rscript.py'),
'corebridge.rscript.check_script_output': ( 'rscriptbridge.html#check_script_output',
'corebridge/rscript.py'),
'corebridge.rscript.generate_checksum_file': ( 'rscriptbridge.html#generate_checksum_file',
'corebridge/rscript.py'),
'corebridge.rscript.get_asset_path': ('rscriptbridge.html#get_asset_path', 'corebridge/rscript.py'),
'corebridge.rscript.get_save_path': ('rscriptbridge.html#get_save_path', 'corebridge/rscript.py'),
'corebridge.rscript.install_R_package': ( 'rscriptbridge.html#install_r_package',
'corebridge/rscript.py'),
'corebridge.rscript.run_script': ('rscriptbridge.html#run_script', 'corebridge/rscript.py')}}}
120 changes: 84 additions & 36 deletions corebridge/rscript.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
# AUTOGENERATED! DO NOT EDIT! File to edit: ../nbs/02_rscriptbridge.ipynb.

# %% auto 0
__all__ = ['install_R_package', 'calc_hash_from_flowobject', 'check_script_inputs', 'check_script_output', 'run_script']
__all__ = ['get_asset_path', 'get_save_path', 'install_R_package', 'calc_hash_from_flowobject', 'calc_hash_from_input_files',
'check_script_inputs', 'check_script_output', 'generate_checksum_file', 'run_script']

# %% ../nbs/02_rscriptbridge.ipynb 4
import json, os
import json, os, fcntl, time
import subprocess
import hashlib

from functools import reduce

# %% ../nbs/02_rscriptbridge.ipynb 30
# %% ../nbs/02_rscriptbridge.ipynb 7
def get_asset_path(script_name, assets_dir:str):
return os.path.join(assets_dir, script_name)
def get_save_path(datafile_name, save_dir):
return os.path.join(save_dir, datafile_name)


# %% ../nbs/02_rscriptbridge.ipynb 31
def install_R_package(pkg:str|list):
"""
Checks and if neccesary installs an R package
Expand All @@ -36,69 +44,109 @@ def install_R_package(pkg:str|list):



# %% ../nbs/02_rscriptbridge.ipynb 45
# %% ../nbs/02_rscriptbridge.ipynb 46
def calc_hash_from_flowobject(flow_object:dict)->str:
return hashlib.md5(repr(flow_object).encode('UTF-8')).hexdigest()

# %% ../nbs/02_rscriptbridge.ipynb 48
def check_script_inputs(flow_object:dict)->bool:
def calc_hash_from_input_files(flow_object:dict, save_dir:str)->str:
hashobj = hashlib.md5()

# iterate over files in input_files
for input_file in flow_object['in']:
with open(os.path.join(save_dir, input_file), 'rb') as f:
# loop till the end of the file
chunk = 0
while chunk != b'':
# read only 1024 bytes at a time
chunk = f.read(1024)
hashobj.update(chunk)

return hashobj.hexdigest()

# %% ../nbs/02_rscriptbridge.ipynb 50
def check_script_inputs(flow_object:dict, save_dir:str)->bool:
"""
Check if the input files for a script are up-to-date, returns True if up-to-date.
"""

checksum_file = get_save_path(f"input-checksum-{calc_hash_from_flowobject(flow_object)}")
checksum_file = get_save_path(f"input-checksum-{calc_hash_from_flowobject(flow_object)}", save_dir)
md5_check_result = subprocess.run(
['md5sum', '-c', checksum_file],
cwd=save_dir,
capture_output=True)

return int(md5_check_result.returncode) == 0

# %% ../nbs/02_rscriptbridge.ipynb 51
def check_script_output(flow_object:dict)->bool:
# %% ../nbs/02_rscriptbridge.ipynb 53
def check_script_output(flow_object:dict, save_dir:str)->bool:
"""
Check if the output files for a script exist, returns True if they all exist.
"""

return all([
os.path.isfile(get_save_path(F))
os.path.isfile(get_save_path(F, save_dir))
for F in flow_object['out']
])

# %% ../nbs/02_rscriptbridge.ipynb 54
def run_script(flow_object):
# %% ../nbs/02_rscriptbridge.ipynb 56
def generate_checksum_file(flow_object:dict, save_dir:str)->bool:
"""Generates the checksum file for a given flow object"""

input_files = flow_object['in']
md5_encode_result = subprocess.run(
['md5sum','-b']+
input_files,
cwd=save_dir,
capture_output=True)

checksum_file = get_save_path(f"input-checksum-{calc_hash_from_flowobject(flow_object)}", save_dir)
with open(checksum_file, 'wt') as cf:
cf.write(md5_encode_result.stdout.decode('UTF-8'))

return md5_encode_result.returncode == 0 and check_script_inputs(flow_object, save_dir)

# %% ../nbs/02_rscriptbridge.ipynb 65
def run_script(flow_object, assets_dir:str, save_dir:str):
""" Run a script in R
args:
flow_object: dict of flow object
returns:
bool: False if nothing has changed, or an update failed,
and True if a follow-up script might need to be run
bool: True if a follow-up script might need to be run, False if not
"""

print(f"Running script {flow_object['name']}")
# Check if output exists and inputs have not changed and return False if
# output exists and inputs have not changed
if check_script_output(flow_object) and check_script_inputs(flow_object):
return False

# Run script
run_script_result = subprocess.run(
['Rscript', '--vanilla', get_asset_path(flow_object['name'])],
cwd=save_dir,
capture_output=True
)
if check_script_output(flow_object, save_dir) and check_script_inputs(flow_object, save_dir):
return True

# check the return code
if run_script_result.returncode:
print(f"Run returned code {run_script_result.returncode}")
print('STDOUT------------\n', run_script_result.stdout.decode('UTF-8'))
print('STDERR------------\n', run_script_result.stderr.decode('UTF-8'))
return False

# check the output
if not check_script_output(flow_object):
print(f"Output not found for {flow_object['name']}")
return False
# Create the lock file
lock_file = get_save_path(f"lock-{calc_hash_from_input_files(flow_object, save_dir)}", save_dir)
with open(lock_file, 'wt') as cf:
try:
print(f"Locking {lock_file}")
# Get exclusive lock on the file
fcntl.flock(cf, fcntl.LOCK_EX | fcntl.LOCK_NB)

# run the script
run_script_result = subprocess.run(
['Rscript', '--vanilla', get_asset_path(flow_object['name'], assets_dir)],
cwd=save_dir,
capture_output=True
)

# check the return code
if run_script_result.returncode:
cf.write(f"Run returned code {run_script_result.returncode}\n")
cf.write('STDOUT------------\n', run_script_result.stdout.decode('UTF-8'),'\n')
cf.write('STDERR------------\n', run_script_result.stderr.decode('UTF-8'),'\n')
return False

except BlockingIOError as locked_error:
print(locked_error)
return False


return check_script_output(flow_object)
# check the output and generate the checksum file
return check_script_output(flow_object, save_dir) and generate_checksum_file(flow_object, save_dir)

Loading

0 comments on commit f3a8fd3

Please sign in to comment.