-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Separate argument parsing from processing function to allow using scripts without the CLI #68
Comments
@xaviermouy please take a look at a related discussion #43 from @cparcerisas on how Dask + pyhydrophone works for speed-up and to ensure correct hydrophone settings. @ryjombari and I tested this approach just a few days ago and were able to parallelize efficiently. Here is what our script currently looks like which has just a few modifications from the original script import xarray as xr
import dask
import pandas as pd
import time
import json
import pathlib
import yaml
from urllib.parse import urlparse
import os
import pyhydrophone as pyhy
from pbp.meta_gen.gen_soundtrap import SoundTrapMetadataGenerator
from pbp.logging_helper import create_logger_info, create_logger
from pbp.process_helper import ProcessHelper
from pbp.file_helper import FileHelper
from pathlib import Path
meta_dir = Path('/opt/pbp/PAM_Analysis/pbp_workspace/MB05/metadata')
config_file = meta_dir / 'deployments' / 'deployment_20240718.json'
with config_file.open('r') as f:
deployment_config = json.load(f)
instrument_file = meta_dir / 'receivers' / 'SOUNDTRAP_ST600HF-6999.json'
with instrument_file.open('r') as f_i:
instrument_config = json.load(f_i)
# Audio data input specifications
wav_uri = deployment_config['FOLDER_PATH']
# file storage location for the input audio data
wav_path = Path(urlparse(wav_uri).path)
os.listdir(wav_path.as_posix())
json_base_dir = meta_dir / 'metadata' / 'json' # location to store generated data in JSON format
xml_dir = wav_path # file storage location for the input audio data
serial_number = instrument_config['recorder']['serial_number']
wav_prefix = serial_number # prefix for the audio files
start_date = pd.to_datetime(deployment_config['AUDIO_START_DEPLOYMENT_DATETIME']).to_pydatetime() # start date
end_date = pd.to_datetime(deployment_config['AUDIO_END_DEPLOYMENT_DATETIME']).to_pydatetime() # end date for
# A prefix for the name of generated files:
deployment_name = deployment_config['DEPLOYMENT_NAME']
sitename = deployment_config['DEPLOYMENT_ID']
# Location for generated files:
output_dir = config_file.parent.parent.parent.joinpath('HMD_pbp', deployment_name)
global_attrs_uri = './metadata/attribute/globalAttributes_MB05.yaml'
variable_attrs_uri = './metadata/attribute/variableAttributes_MB05.yaml'
# Populate deployment-specific yaml attributes
deployment_attrs_uri = output_dir.joinpath(f'globalAttributes_{deployment_name}.yml')
yaml_config = yaml.safe_load(open('./metadata/attribute/globalAttributes_MB05.yaml'))
yaml_config['creator_name'] = deployment_config['AUTHOR_NAME']
yaml_config['creator_email'] = deployment_config['AUTHOR_EMAIL']
lon = deployment_config['location']['DEP_LON_DEG']
lat = deployment_config['location']['DEP_LAT_DEG']
yaml_config['geospatial_bounds'] = f'POINT ({lat} {lon})'
yaml_config['comment'] = deployment_config['DEPLOYMENT_COMMENTS']
yaml_config['platform'] = deployment_config['location']['MOORING_TYPE']
yaml_config['instrument'] = deployment_config['RECORDER_ID']
if not output_dir.exists():
print('Creating a new directory...', output_dir)
output_dir.mkdir(exist_ok=True, parents=True)
with open(deployment_attrs_uri, 'w') as file:
yaml.dump(yaml_config, file)
voltage_multiplier = 1.0
# voltage_multiplier = st.vpp/2
subset_to = (10, 24000)
st = pyhy.SoundTrap(model=instrument_config['recorder']['model'],
serial_number=int(instrument_config['recorder']['serial_number']),
name=deployment_config['RECORDER_ID'],
gain_type='High')
print('SoundTrap settings to:')
print('sensitivity: ', st.sensitivity)
print('Vpp: ', st.Vpp)
print('Voltage multiplier: ', voltage_multiplier)
print('preamp_gain: ', st.preamp_gain)
print('gain_type: ', 'High')
log = create_logger(
log_filename_and_level=(deployment_config['DEPLOYMENT_ID'], "INFO"),
console_level="INFO",
)
# Create the data generators
meta_gen = SoundTrapMetadataGenerator(
log=log,
uri=wav_uri,
json_base_dir=json_base_dir.as_posix(),
xml_dir=str(xml_dir),
start=start_date,
end=end_date,
prefixes=[wav_prefix],
seconds_per_file=20)
# Generate the data - this will generate JSON files in the json_base_dir
meta_gen.run()
def process_date(date: str, gen_netcdf: bool = True):
"""
Main function to generate the HMB product for a given day.
It makes use of supporting elements in PBP in terms of logging,
file handling, and PyPAM based HMB generation.
:param date: Date to process, in YYYYMMDD format.
:param gen_netcdf: Allows caller to skip the `.nc` creation here
and instead save the datasets after all days have been generated
(see parallel execution below).
:return: the generated xarray dataset.
"""
log_filename = f"{output_dir}/{deployment_name}{date}.log"
log = create_logger(
log_filename_and_level=(log_filename, "INFO"),
console_level=None,
)
file_helper = FileHelper(
log=log,
json_base_dir=json_base_dir,
gs_client=None,
download_dir=None,
)
process_helper = ProcessHelper(
log=log,
file_helper=file_helper,
output_dir=str(output_dir),
output_prefix=sitename,
global_attrs_uri=str(deployment_attrs_uri),
variable_attrs_uri=variable_attrs_uri,
voltage_multiplier=voltage_multiplier,
sensitivity_uri=None,
sensitivity_flat_value=-st.sensitivity,
subset_to=subset_to,
)
# now, get the HMB result:
print(f"::: Started processing {date=}")
result = process_helper.process_day(date)
if gen_netcdf:
nc_filename = f"{output_dir}/{output_prefix}{date}.nc"
print(f"::: Ended processing {date=} => {nc_filename=}")
else:
print(f"::: Ended processing {date=} => (dataset generated in memory)")
if result is not None:
return result.dataset
else:
print(f"::: UNEXPECTED: no segments were processed for {date=}")
def process_multiple_dates(
dates: list[str], gen_netcdf: bool = False
) -> list[xr.Dataset]:
"""
Generates HMB for multiple days in parallel using Dask.
Returns the resulting HMB datasets.
:param dates: The dates to process, each in YYYYMMDD format.
:param gen_netcdf: Allows caller to skip the `.nc` creation here
and instead save the datasets after all days have been generated.
:return: the list of generated datasets.
"""
@dask.delayed
def delayed_process_date(date: str):
return process_date(date, gen_netcdf=True)
# To display total elapsed time at the end the processing:
start_time = time.time()
# This will be called by Dask when all dates have completed processing:
def aggregate(*datasets): # -> list[xr.Dataset]:
elapsed_time = time.time() - start_time
print(
f"===> All {len(datasets)} dates completed. Elapsed time: {elapsed_time:.1f} seconds ({elapsed_time/60:.1f} mins)"
)
return datasets
# Prepare the processes:
delayed_processes = [delayed_process_date(date) for date in dates]
aggregation = dask.delayed(aggregate)(*delayed_processes)
# And launch them:
return aggregation.compute()
# but in general we can use pandas to help us generate the list:
date_range = pd.date_range(start=start_date, end=end_date, freq='1D')
dates = date_range.strftime("%Y%m%d").tolist()
# Now, launch the generation:
print(f"Launching HMB generation for {len(dates)} {dates=}")
# Get all HMB datasets:
generated_datasets = process_multiple_dates(dates, gen_netcdf=True)
print(f"Generated datasets: {len(generated_datasets)}\n") |
Thanks @danellecline. Yes, @xaviermouy this parallelization with Dask is working great in a Python script. Danelle had originally demonstrated Dask use in one of the IOOS Python notebooks. I am about to run our most recent deployments of SoundTrap data with these updated aspects:
Let me know if you would like to see example configuration files for this approach. |
@danellecline @ryjombari this sounds really cool thanks for sharing. I will definitely look into this further and reach out when I am at that point. At the moment I am trying to keep things simple and once things are rolling, I will work on improving progressively ;-) Because we will be running a very large number of deployments, my goal is to use Prefect to make sure it is be able to recover from/identify issues (e.g. short network outages) so we don't have to babysit the processing too much. Prefect comes with a nice web interface that can help us monitor the status of the processing. My plan (already partially implemented) is to create a flow for the processing of each deployment and have main_meta_generator, main_hmb_generator and main_plot as separate task of that flow. Prefect works well with Dask and each task can be parallelized with Dask if needed. At the moment I am trying to minimize the amount of new code and rely heavily on your CLI. As I indicated I am using sys.argv to pass arguments inside python. It works but is not a great solution. If you decouple the argument parsing from the rest of the processing as I suggested above that would really help me achieve this cleanly, while not affecting any of the functionality of your CLIs. If you are open to this suggestion, I am happy to make these changes and submit them to you for consideration. Let me know Again, once I have this going, I would love to dig into the code you shared above. |
Hi @xaviermouy - Please see PR #69, which I just submitted. Beside that separation, in the case of HMB Gen in particular, you can also use the API elements directly, that is, set up the logger, |
I would like to use main_meta_generator.py, main_hmb_generator.py, and main_plot.py from my python script without having to use the CLI.
I am currently using sys.argv and argparse.Namespace to pass arguments to these functions from python but it is less than ideal and will cause issues when I start running things in parallel.
Would it be possible to make a simple modification to main_meta_generator.py, main_hmb_generator.py, and main_plot.py so that the processing function is decoupled from the argument parsing? This would allow me to import the function and use it in python directly without having to start an external process.
For example, in the case of main_meta_generator.py this would mean changing
to
This would not change anything to the way you are doing things now, but would allow to use the function run_main_meta_generator without using the CLI (which would be a huge benefit for many people). Doing the same for main_hmb_generator.py, and main_plot.py would also be fantastic.
Thanks for your consideration and for your amazing package! So useful.
Xavier
The text was updated successfully, but these errors were encountered: