From 210d75d97888846923484600cecc1133da44ce49 Mon Sep 17 00:00:00 2001 From: hannahker Date: Mon, 23 Dec 2024 11:28:12 -0800 Subject: [PATCH 01/13] enforce only backfilling for after 2024 --- docs/usage.md | 2 +- src/pipelines/pipeline.py | 23 ++++++++++++++++++++--- src/scripts/run_floodscan_pipeline.py | 8 +++++++- 3 files changed, 28 insertions(+), 5 deletions(-) diff --git a/docs/usage.md b/docs/usage.md index 8c98a21..86d1e6a 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -17,7 +17,7 @@ These options are available for both pipelines: - `--mode {local,dev,prod}`: Specify the mode to run the pipeline in (default: local) - `--log-level {DEBUG,INFO,WARNING,ERROR,CRITICAL}`: Set the logging level (default: INFO) - `--use-cache`: Use cached raw data if available -- `--backfill`: Check for missing dates and backfill if necessary +- `--backfill`: Check for missing dates and backfill if necessary (only for 2024 dates onwards) ## ERA5 Options diff --git a/src/pipelines/pipeline.py b/src/pipelines/pipeline.py index 80f5711..9714f80 100644 --- a/src/pipelines/pipeline.py +++ b/src/pipelines/pipeline.py @@ -206,7 +206,24 @@ def check_coverage( expected_dates = expected_dates[:-1] existing_dates = self._get_existing_dates() - missing_dates = [date for date in expected_dates if date not in existing_dates] + # NOTE: Here we're dropping dates before 2024. Will notify if there are missing dates pre 2024, + # but won't automatically backfill since for some datasets this is a different process, and we assume + # that this isn't likely to be where we see missing data. + missing_dates = [ + date + for date in expected_dates + if date not in existing_dates and date.year >= 2024 + ] + pre_2024_dates = [ + date + for date in expected_dates + if date not in existing_dates and date.year < 2024 + ] + if pre_2024_dates: + self.logger.warning( + f"There are {len(pre_2024_dates)} missing from before 2024. These won't be backfilled automatically." + ) + coverage_pct = (len(existing_dates) / len(expected_dates)) * 100 return missing_dates, coverage_pct @@ -226,10 +243,10 @@ def print_coverage_report( self.logger.info("=" * 50) self.logger.info(f"Mode: {self.mode}") self.logger.info(f"Storage Path: {self.processed_path}") - self.logger.info(f"Coverage: {coverage_pct:.1f}%") + self.logger.info(f"Overall coverage: {coverage_pct:.1f}%") if missing_dates: - self.logger.info("Missing Dates:") + self.logger.info("Missing Dates after 2024:") for date in missing_dates: self.logger.info(f" - {date.strftime('%Y-%m-%d')}") else: diff --git a/src/scripts/run_floodscan_pipeline.py b/src/scripts/run_floodscan_pipeline.py index eb63e79..cf91c46 100644 --- a/src/scripts/run_floodscan_pipeline.py +++ b/src/scripts/run_floodscan_pipeline.py @@ -36,6 +36,11 @@ def parse_arguments(base_parser): choices=[5], default=5, ) + parser.add_argument( + "--backfill", + action="store_true", + help="Whether to check and backfill for any missing dates (only 2024 onwards)", + ) parser.add_argument("--update", action="store_true", help="Run in update mode") return parser.parse_args() @@ -56,4 +61,5 @@ def main(base_parser): ) pipeline = FloodScanPipeline(**settings) - pipeline.run_pipeline() + # pipeline.run_pipeline() + pipeline.print_coverage_report() From b0f6167e6809c21db36b911b8b0e620c23affe54 Mon Sep 17 00:00:00 2001 From: hannahker Date: Mon, 23 Dec 2024 15:05:23 -0800 Subject: [PATCH 02/13] wip to process floodscan missing dates --- src/pipelines/floodscan_pipeline.py | 65 +++++++++++++++++---------- src/scripts/run_floodscan_pipeline.py | 5 ++- 2 files changed, 45 insertions(+), 25 deletions(-) diff --git a/src/pipelines/floodscan_pipeline.py b/src/pipelines/floodscan_pipeline.py index 9c23279..9f0b080 100644 --- a/src/pipelines/floodscan_pipeline.py +++ b/src/pipelines/floodscan_pipeline.py @@ -38,6 +38,7 @@ def __init__(self, **kwargs): use_cache=kwargs["use_cache"], ) + self.backfill = kwargs["backfill"] self.start_date = datetime.strptime(kwargs["start_date"], DATE_FORMAT) self.end_date = datetime.strptime(kwargs["end_date"], DATE_FORMAT) self.is_update = kwargs["is_update"] @@ -134,6 +135,8 @@ def _get_90_days_filenames_for_dates(self, dates): def get_historical_90days_zipped_files(self, dates): filename_list = self._get_90_days_filenames_for_dates(dates=dates) + print("*******") + print(filename_list) zipped_files_path = [] for zipped_filename in filename_list: @@ -373,41 +376,57 @@ def combine_bands(self, sfed, mfed, date): ) def run_pipeline(self): - yesterday = datetime.today() - pd.DateOffset(days=1) - dates = create_date_range( - self.start_date, - self.end_date, - min_accepted=datetime.strptime("1998-01-12", DATE_FORMAT), - max_accepted=yesterday, - ) - self.logger.info(f"Running FloodScan pipeline in {self.mode} mode...") + if self.backfill: + self.logger.info("Checking for missing data and backfilling if needed...") + missing_dates, coverage_pct = self.check_coverage() + self.print_coverage_report() + if missing_dates: + print("---") + print(missing_dates) + filenames = self.get_historical_90days_zipped_files(dates=missing_dates) + print(filenames) + filenames.reverse() + print("-----") + print(filenames) + self.process_historical_zipped_data(filenames, missing_dates) + # Run for the latest available date if self.is_update: self.logger.info("Retrieving FloodScan data from yesterday...") + yesterday = datetime.today() - pd.DateOffset(days=1) sfed, mfed, latest_date = self.get_raw_data(date=yesterday) sfed_da = self.process_data(sfed, band_type=SFED) mdfed_da = self.process_data(mfed, band_type=MFED) self.combine_bands(sfed_da, mdfed_da, latest_date) return True - elif any(date.year < 2024 for date in dates): - self.logger.info( - f"Retrieving historical FloodScan data from {min(dates).date()} until {max(dates).date()}..." + # Historical run + else: + dates = create_date_range( + self.start_date, + self.end_date, + min_accepted=datetime.strptime("1998-01-12", DATE_FORMAT), + max_accepted=yesterday, ) - # Dates fall under netcdf archive - sfed_path, mfed_path = self.get_historical_nc_files() + if any(date.year < 2024 for date in dates): + self.logger.info( + f"Retrieving historical FloodScan data from {min(dates).date()} until {max(dates).date()}..." + ) + + # Dates fall under netcdf archive + sfed_path, mfed_path = self.get_historical_nc_files() - for date in dates: - if date.year < 2024: - sfed_da = self.process_historical_data(sfed_path, date, SFED) - mfed_da = self.process_historical_data(mfed_path, date, MFED) - self.combine_bands(sfed_da, mfed_da, date=date) + for date in dates: + if date.year < 2024: + sfed_da = self.process_historical_data(sfed_path, date, SFED) + mfed_da = self.process_historical_data(mfed_path, date, MFED) + self.combine_bands(sfed_da, mfed_da, date=date) - # If any of the dates are above 2023: - if any(date.year >= 2024 for date in dates): - filenames = self.get_historical_90days_zipped_files(dates=dates) - filenames.reverse() - self.process_historical_zipped_data(filenames, dates) + # If any of the dates are above 2023: + if any(date.year >= 2024 for date in dates): + filenames = self.get_historical_90days_zipped_files(dates=dates) + filenames.reverse() + self.process_historical_zipped_data(filenames, dates) diff --git a/src/scripts/run_floodscan_pipeline.py b/src/scripts/run_floodscan_pipeline.py index cf91c46..9f70157 100644 --- a/src/scripts/run_floodscan_pipeline.py +++ b/src/scripts/run_floodscan_pipeline.py @@ -52,6 +52,7 @@ def main(base_parser): { "mode": args.mode, "is_update": args.update, + "backfill": args.backfill, "start_date": args.start_date, "end_date": args.end_date, "version": args.version, @@ -61,5 +62,5 @@ def main(base_parser): ) pipeline = FloodScanPipeline(**settings) - # pipeline.run_pipeline() - pipeline.print_coverage_report() + pipeline.run_pipeline() + # pipeline.print_coverage_report() From 8cd81b13261dc09378ca524703f766a0061290a0 Mon Sep 17 00:00:00 2001 From: hannahker Date: Mon, 6 Jan 2025 14:15:28 -0800 Subject: [PATCH 03/13] refactor historical processing --- src/pipelines/floodscan_pipeline.py | 87 ++++++++++++++++------------- 1 file changed, 48 insertions(+), 39 deletions(-) diff --git a/src/pipelines/floodscan_pipeline.py b/src/pipelines/floodscan_pipeline.py index 9f0b080..0633a30 100644 --- a/src/pipelines/floodscan_pipeline.py +++ b/src/pipelines/floodscan_pipeline.py @@ -135,8 +135,6 @@ def _get_90_days_filenames_for_dates(self, dates): def get_historical_90days_zipped_files(self, dates): filename_list = self._get_90_days_filenames_for_dates(dates=dates) - print("*******") - print(filename_list) zipped_files_path = [] for zipped_filename in filename_list: @@ -375,58 +373,69 @@ def combine_bands(self, sfed, mfed, date): f"Failed when combining sfed and mfed geotiffs. {err}" ) + def process_latest_update(self): + """Retrieve yesterday's data""" + self.logger.info("Retrieving Floodscan data from yesterday...") + yesterday = datetime.today() - pd.DateOffset(days=1) + sfed, mfed, latest_date = self.get_raw_data(date=yesterday) + sfed_da = self.process_data(sfed, band_type=SFED) + mdfed_da = self.process_data(mfed, band_type=MFED) + self.combine_bands(sfed_da, mdfed_da, latest_date) + return True + + def process_historical_dates(self, dates): + """ + Process data for a historical date range. + Dates before 2024 will be handled differently + """ + post_2024_dates = [ + date for date in dates if datetime.strptime(date, "%Y-%m-%d").year >= 2024 + ] + pre_2024_dates = [ + date for date in dates if datetime.strptime(date, "%Y-%m-%d").year < 2024 + ] + + if post_2024_dates: + self._process_post_2024(post_2024_dates) + if pre_2024_dates: + self._process_pre_2024(pre_2024_dates) + + def _process_pre_2024(self, dates): + """Process any dates before 2024. These will pull from the large netcdf files.""" + sfed_path, mfed_path = self.get_historical_nc_files() + for date in dates: + sfed_da = self.process_historical_data(sfed_path, date, SFED) + mfed_da = self.process_historical_data(mfed_path, date, MFED) + self.combine_bands(sfed_da, mfed_da, date=date) + + def _process_post_2024(self, dates): + """Process any dates 2024 onwards. This will pull from the 90 days zipped files.""" + filenames = self.get_historical_90days_zipped_files(dates=dates) + filenames.reverse() + self.process_historical_zipped_data(filenames, dates) + def run_pipeline(self): self.logger.info(f"Running FloodScan pipeline in {self.mode} mode...") if self.backfill: self.logger.info("Checking for missing data and backfilling if needed...") - missing_dates, coverage_pct = self.check_coverage() + missing_dates, _ = self.check_coverage() self.print_coverage_report() if missing_dates: - print("---") - print(missing_dates) - filenames = self.get_historical_90days_zipped_files(dates=missing_dates) - print(filenames) - filenames.reverse() - print("-----") - print(filenames) - self.process_historical_zipped_data(filenames, missing_dates) + self._process_historical_dates(missing_dates) # Run for the latest available date if self.is_update: - self.logger.info("Retrieving FloodScan data from yesterday...") - yesterday = datetime.today() - pd.DateOffset(days=1) - sfed, mfed, latest_date = self.get_raw_data(date=yesterday) - sfed_da = self.process_data(sfed, band_type=SFED) - mdfed_da = self.process_data(mfed, band_type=MFED) - self.combine_bands(sfed_da, mdfed_da, latest_date) - return True + self._process_latest_update() # Historical run else: + yesterday = datetime.today() - pd.DateOffset(days=1) + min_date = datetime.strptime(self.coverage["start_date"], DATE_FORMAT) dates = create_date_range( self.start_date, self.end_date, - min_accepted=datetime.strptime("1998-01-12", DATE_FORMAT), + min_accepted=min_date, max_accepted=yesterday, ) - - if any(date.year < 2024 for date in dates): - self.logger.info( - f"Retrieving historical FloodScan data from {min(dates).date()} until {max(dates).date()}..." - ) - - # Dates fall under netcdf archive - sfed_path, mfed_path = self.get_historical_nc_files() - - for date in dates: - if date.year < 2024: - sfed_da = self.process_historical_data(sfed_path, date, SFED) - mfed_da = self.process_historical_data(mfed_path, date, MFED) - self.combine_bands(sfed_da, mfed_da, date=date) - - # If any of the dates are above 2023: - if any(date.year >= 2024 for date in dates): - filenames = self.get_historical_90days_zipped_files(dates=dates) - filenames.reverse() - self.process_historical_zipped_data(filenames, dates) + self._process_historical_dates(dates) From 11b69714e43273b1b3acbb04be8c48419d2be42b Mon Sep 17 00:00:00 2001 From: hannahker Date: Mon, 6 Jan 2025 14:49:06 -0800 Subject: [PATCH 04/13] add more logging --- src/pipelines/floodscan_pipeline.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/pipelines/floodscan_pipeline.py b/src/pipelines/floodscan_pipeline.py index 0633a30..8c8b224 100644 --- a/src/pipelines/floodscan_pipeline.py +++ b/src/pipelines/floodscan_pipeline.py @@ -388,12 +388,8 @@ def process_historical_dates(self, dates): Process data for a historical date range. Dates before 2024 will be handled differently """ - post_2024_dates = [ - date for date in dates if datetime.strptime(date, "%Y-%m-%d").year >= 2024 - ] - pre_2024_dates = [ - date for date in dates if datetime.strptime(date, "%Y-%m-%d").year < 2024 - ] + post_2024_dates = [date for date in dates if date.year >= 2024] + pre_2024_dates = [date for date in dates if date.year < 2024] if post_2024_dates: self._process_post_2024(post_2024_dates) @@ -402,6 +398,7 @@ def process_historical_dates(self, dates): def _process_pre_2024(self, dates): """Process any dates before 2024. These will pull from the large netcdf files.""" + self.logger.debug(f"Processing {len(dates)} dates from before 2024...") sfed_path, mfed_path = self.get_historical_nc_files() for date in dates: sfed_da = self.process_historical_data(sfed_path, date, SFED) @@ -410,6 +407,7 @@ def _process_pre_2024(self, dates): def _process_post_2024(self, dates): """Process any dates 2024 onwards. This will pull from the 90 days zipped files.""" + self.logger.debug(f"Processing {len(dates)} dates from 2024 onwards...") filenames = self.get_historical_90days_zipped_files(dates=dates) filenames.reverse() self.process_historical_zipped_data(filenames, dates) @@ -422,7 +420,7 @@ def run_pipeline(self): missing_dates, _ = self.check_coverage() self.print_coverage_report() if missing_dates: - self._process_historical_dates(missing_dates) + self.process_historical_dates(missing_dates) # Run for the latest available date if self.is_update: @@ -431,11 +429,14 @@ def run_pipeline(self): # Historical run else: yesterday = datetime.today() - pd.DateOffset(days=1) - min_date = datetime.strptime(self.coverage["start_date"], DATE_FORMAT) + min_date = datetime.strptime(str(self.coverage["start_date"]), DATE_FORMAT) + self.logger.info( + f"Processing historical data from {self.start_date} to {self.end_date}..." + ) dates = create_date_range( self.start_date, self.end_date, min_accepted=min_date, max_accepted=yesterday, ) - self._process_historical_dates(dates) + self.process_historical_dates(dates) From c948b2dccccfd3f0830f5ab2ef3aaf4308fb9835 Mon Sep 17 00:00:00 2001 From: hannahker Date: Mon, 6 Jan 2025 15:23:10 -0800 Subject: [PATCH 05/13] slight code reorg --- src/pipelines/floodscan_pipeline.py | 106 +++++++++++++++------------- 1 file changed, 55 insertions(+), 51 deletions(-) diff --git a/src/pipelines/floodscan_pipeline.py b/src/pipelines/floodscan_pipeline.py index 8c8b224..5f20223 100644 --- a/src/pipelines/floodscan_pipeline.py +++ b/src/pipelines/floodscan_pipeline.py @@ -135,6 +135,7 @@ def _get_90_days_filenames_for_dates(self, dates): def get_historical_90days_zipped_files(self, dates): filename_list = self._get_90_days_filenames_for_dates(dates=dates) + print(filename_list) zipped_files_path = [] for zipped_filename in filename_list: @@ -206,31 +207,6 @@ def _unzip_90days_file(self, file_to_unzip, dates): except Exception as e: self.logger.error(f"Failed to extract: {e}") - def process_historical_data(self, filepath, date, band_type): - self.logger.info(f"Processing historical {band_type} data from {date}") - - with xr.open_dataset(filepath) as ds: - ds = ds.transpose("time", "lat", "lon") - if not ds["time"].dtype == "= 2024] - pre_2024_dates = [date for date in dates if date.year < 2024] - - if post_2024_dates: - self._process_post_2024(post_2024_dates) - if pre_2024_dates: - self._process_pre_2024(pre_2024_dates) + # ------------------ Processing runner functions def _process_pre_2024(self, dates): """Process any dates before 2024. These will pull from the large netcdf files.""" @@ -403,7 +383,7 @@ def _process_pre_2024(self, dates): for date in dates: sfed_da = self.process_historical_data(sfed_path, date, SFED) mfed_da = self.process_historical_data(mfed_path, date, MFED) - self.combine_bands(sfed_da, mfed_da, date=date) + self._combine_bands(sfed_da, mfed_da, date=date) def _process_post_2024(self, dates): """Process any dates 2024 onwards. This will pull from the 90 days zipped files.""" @@ -412,9 +392,33 @@ def _process_post_2024(self, dates): filenames.reverse() self.process_historical_zipped_data(filenames, dates) + def process_historical_dates(self, dates): + """ + Process data for a historical date range. + Dates before 2024 will be handled differently + """ + post_2024_dates = [date for date in dates if date.year >= 2024] + pre_2024_dates = [date for date in dates if date.year < 2024] + + if post_2024_dates: + self._process_post_2024(post_2024_dates) + if pre_2024_dates: + self._process_pre_2024(pre_2024_dates) + + def process_latest_update(self): + """Retrieve yesterday's data""" + self.logger.info("Retrieving Floodscan data from yesterday...") + yesterday = datetime.today() - pd.DateOffset(days=1) + sfed, mfed, latest_date = self.get_raw_data(date=yesterday) + sfed_da = self.process_data(sfed, band_type=SFED) + mdfed_da = self.process_data(mfed, band_type=MFED) + self._combine_bands(sfed_da, mdfed_da, latest_date) + return True + def run_pipeline(self): self.logger.info(f"Running FloodScan pipeline in {self.mode} mode...") + # Backfilling dates if self.backfill: self.logger.info("Checking for missing data and backfilling if needed...") missing_dates, _ = self.check_coverage() @@ -424,7 +428,7 @@ def run_pipeline(self): # Run for the latest available date if self.is_update: - self._process_latest_update() + self.process_latest_update() # Historical run else: From da7a9f05d581ad4943b74d33477ac1929eaadce9 Mon Sep 17 00:00:00 2001 From: hannahker Date: Tue, 7 Jan 2025 12:22:34 -0800 Subject: [PATCH 06/13] fix local cleanup function --- src/pipelines/floodscan_pipeline.py | 44 +++++++++++++---------------- src/pipelines/pipeline.py | 2 +- 2 files changed, 21 insertions(+), 25 deletions(-) diff --git a/src/pipelines/floodscan_pipeline.py b/src/pipelines/floodscan_pipeline.py index 5f20223..b031c43 100644 --- a/src/pipelines/floodscan_pipeline.py +++ b/src/pipelines/floodscan_pipeline.py @@ -109,7 +109,11 @@ def _get_90_days_filenames_for_dates(self, dates): ] else: - existing_files = os.listdir(self.local_raw_dir) + existing_files = [ + f + for f in os.listdir(self.local_raw_dir) + if f.startswith("aer_floodscan") + ] for existing_filename in existing_files: date_from_file = get_datetime_from_filename(existing_filename) @@ -135,7 +139,6 @@ def _get_90_days_filenames_for_dates(self, dates): def get_historical_90days_zipped_files(self, dates): filename_list = self._get_90_days_filenames_for_dates(dates=dates) - print(filename_list) zipped_files_path = [] for zipped_filename in filename_list: @@ -212,7 +215,7 @@ def process_historical_zipped_data(self, zipped_filepaths, dates): unzipped_mfed = [] for filepath in zipped_filepaths: - self.logger.info( + self.logger.debug( f"Unzipping data from from {filepath[SFED]} and {filepath[MFED]} to {self.local_raw_dir}" ) @@ -238,28 +241,21 @@ def process_historical_zipped_data(self, zipped_filepaths, dates): mfed_da = self.process_data(file[1], band_type=MFED) self._combine_bands(sfed_da, mfed_da, date=date) - self._cleanup_local(unzipped_files) + self._cleanup_local() - def _cleanup_local(self, unzipped_files): - # Cleaning up after local run + def _cleanup_local(self): + """Cleans up everything in the local directory that isn't a 90-day zip or a historical .nc file""" if self.mode == "local": - sfed_dir = ( - self.local_raw_dir - / "aer_floodscan_sfed_area_flooded_fraction_africa_90days" - ) - mfed_dir = ( - self.local_raw_dir - / "aer_floodscan_mfed_area_flooded_fraction_africa_90days" - ) - for file in unzipped_files: - if self.mode == "local": - sfed_file = self.local_raw_dir / file[0] - mfed_file = self.local_raw_dir / file[1] - shutil.move(sfed_file, sfed_dir) - shutil.move(mfed_file, mfed_dir) - - shutil.rmtree(sfed_dir) - shutil.rmtree(mfed_dir) + for file in os.listdir(self.local_raw_dir): + file_path = self.local_raw_dir / file + if file_path.is_file() and not ( + file.endswith(".zip") or file.endswith(".nc") + ): + os.remove(file_path) + for item in os.listdir(self.local_raw_dir): + item_path = self.local_raw_dir / item + if item_path.is_dir(): + shutil.rmtree(item_path) def _update_name_if_necessary(self, raw_filename, band_type, latest_date): filename_date = get_datetime_from_filename(str(raw_filename)) @@ -368,7 +364,7 @@ def _combine_bands(self, sfed, mfed, date): try: da = xr.merge([sfed, mfed]) self.save_processed_data(da, self._generate_processed_filename(date)) - self.logger.info(f"Successfully combined SFED and MFED for: {date}") + self.logger.debug(f"Successfully combined SFED and MFED for: {date}") except Exception as err: self.logger.error( f"Failed when combining sfed and mfed geotiffs. {err}" diff --git a/src/pipelines/pipeline.py b/src/pipelines/pipeline.py index 9714f80..02f7356 100644 --- a/src/pipelines/pipeline.py +++ b/src/pipelines/pipeline.py @@ -221,7 +221,7 @@ def check_coverage( ] if pre_2024_dates: self.logger.warning( - f"There are {len(pre_2024_dates)} missing from before 2024. These won't be backfilled automatically." + f"There are {len(pre_2024_dates)} files missing from before 2024. These won't be backfilled automatically." # noqa ) coverage_pct = (len(existing_dates) / len(expected_dates)) * 100 From 60d8255086bf0e9b202632b7e1c7d4344aec9b00 Mon Sep 17 00:00:00 2001 From: hannahker Date: Fri, 17 Jan 2025 16:52:35 -0500 Subject: [PATCH 07/13] rewrite functions to get filenames list --- src/pipelines/floodscan_pipeline.py | 137 ++++++++++++++++------------ 1 file changed, 79 insertions(+), 58 deletions(-) diff --git a/src/pipelines/floodscan_pipeline.py b/src/pipelines/floodscan_pipeline.py index b031c43..f77fe0e 100644 --- a/src/pipelines/floodscan_pipeline.py +++ b/src/pipelines/floodscan_pipeline.py @@ -1,7 +1,7 @@ import os import re import shutil -from datetime import datetime +from datetime import datetime, timedelta from zipfile import ZipFile import pandas as pd @@ -95,9 +95,16 @@ def get_historical_nc_files(self): return None - def _get_90_days_filenames_for_dates(self, dates): - filenames = [] + def _get_dates_with_pairs(self, filenames): + pairs = {} + for fname in filenames: + date = get_datetime_from_filename(fname) + file_type = "sfed" if "sfed" in fname else "mfed" + pairs.setdefault(date, set()).add(file_type) + return sorted([date for date, types in pairs.items() if len(types) == 2]) + def _get_90_days_filenames_for_dates(self, dates, max_days=5): + # --- Check existing files if self.mode != "local": existing_files = [ x.name @@ -107,37 +114,55 @@ def _get_90_days_filenames_for_dates(self, dates): name_starts_with=self.raw_path.as_posix() + "/aer_floodscan" ) ] - else: existing_files = [ f for f in os.listdir(self.local_raw_dir) if f.startswith("aer_floodscan") ] + available_dates = self._get_dates_with_pairs(existing_files) + print(f"The following dates have zipped data: {available_dates}") - for existing_filename in existing_files: - date_from_file = get_datetime_from_filename(existing_filename) - if date_from_file in dates: + filenames = [] + for target in dates: + # First check if exact date exists + if target in available_dates: filenames.append( { - SFED: self._generate_raw_filename(date_from_file, SFED), - MFED: self._generate_raw_filename(date_from_file, MFED), + SFED: self._generate_raw_filename(target, SFED), + MFED: self._generate_raw_filename(target, MFED), } ) + continue + + # Check next 5 days + found = False + for days in range(1, max_days + 1): + future_date = target + timedelta(days=days) + if future_date in available_dates: + filenames.append( + { + SFED: self._generate_raw_filename(future_date, SFED), + MFED: self._generate_raw_filename(future_date, MFED), + } + ) + found = True + break - if not filenames: - existing_filename = existing_files[0] - date_from_file = get_datetime_from_filename(existing_filename) - filenames.append( - { - SFED: self._generate_raw_filename(date_from_file, SFED), - MFED: self._generate_raw_filename(date_from_file, MFED), - } - ) + if not found: + self.logger.warning( + f"No available data within {max_days} days of {target}" + ) + # Now drop for any duplicates + filenames = list({tuple(sorted(d.items())): d for d in filenames}.values()) return filenames def get_historical_90days_zipped_files(self, dates): + """ + Retrieves the appropriate zipped files, returning the full path to where they've been downloaded + (Or where they're located if working locally) + """ filename_list = self._get_90_days_filenames_for_dates(dates=dates) zipped_files_path = [] @@ -176,39 +201,34 @@ def get_historical_90days_zipped_files(self, dates): def _unzip_90days_file(self, file_to_unzip, dates): unzipped_files = [] - try: - with ZipFile(file_to_unzip, "r") as zipobj: - for filename in zipobj.namelist(): - if os.path.basename(filename): - date = get_datetime_from_filename(filename) - if date in dates: - date_str = re.search("([0-9]{4}[0-9]{2}[0-9]{2})", filename) - new_filename = os.path.basename( - filename.replace( - date_str[0], date.strftime(DATE_FORMAT) - ) + with ZipFile(file_to_unzip, "r") as zipobj: + for filename in zipobj.namelist(): + if os.path.basename(filename): + date = get_datetime_from_filename(filename) + if date in dates: + date_str = re.search("([0-9]{4}[0-9]{2}[0-9]{2})", filename) + new_filename = os.path.basename( + filename.replace(date_str[0], date.strftime(DATE_FORMAT)) + ) + try: + full_path = zipobj.extract(filename, self.local_raw_dir) + new_full_path = os.path.join( + os.path.dirname(full_path), new_filename ) - try: - full_path = zipobj.extract(filename, self.local_raw_dir) - new_full_path = os.path.join( - os.path.dirname(full_path), new_filename - ) - os.rename(full_path, new_full_path) - unzipped_files.append( - os.path.basename( - shutil.move(new_full_path, self.local_raw_dir) - ) - ) - except FileExistsError: - self.logger.warning( - f"File already exists: {new_filename}" + os.rename(full_path, new_full_path) + unzipped_files.append( + os.path.basename( + shutil.move(new_full_path, self.local_raw_dir) ) - except Exception as e: - self.logger.error(f"Failed to extract: {e}") + ) + except FileExistsError: + self.logger.warning(f"File already exists: {new_filename}") + # except Exception as e: + # self.logger.error(f" ** Failed to extract: {e}") - return unzipped_files - except Exception as e: - self.logger.error(f"Failed to extract: {e}") + return unzipped_files + # except Exception as e: + # self.logger.error(f"-- Failed to extract: {e}") def process_historical_zipped_data(self, zipped_filepaths, dates): unzipped_sfed = [] @@ -219,16 +239,17 @@ def process_historical_zipped_data(self, zipped_filepaths, dates): f"Unzipping data from from {filepath[SFED]} and {filepath[MFED]} to {self.local_raw_dir}" ) - try: - unzipped_sfed += self._unzip_90days_file(filepath[SFED], dates) - unzipped_mfed += self._unzip_90days_file(filepath[MFED], dates) - - if len(dates) == len(unzipped_sfed): - break - except Exception as err: - self.logger.error( - f"Failed to extract {filepath[SFED]} or {filepath[MFED]}: {err}" - ) + # try: + unzipped_sfed += self._unzip_90days_file(filepath[SFED], dates) + unzipped_mfed += self._unzip_90days_file(filepath[MFED], dates) + + if len(dates) == len(unzipped_sfed): + break + # except Exception as err: + # print(err) + # self.logger.error( + # f"Failed to extract {filepath[SFED]} or {filepath[MFED]}: {err}" + # ) unzipped_files = list(zip(unzipped_sfed, unzipped_mfed)) From d95b2b50c2a58dedca66aa6b9b9f54afd4aaf48a Mon Sep 17 00:00:00 2001 From: hannahker Date: Sun, 19 Jan 2025 19:07:42 -0500 Subject: [PATCH 08/13] add docstrings and adjust error handling --- src/pipelines/floodscan_pipeline.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/pipelines/floodscan_pipeline.py b/src/pipelines/floodscan_pipeline.py index f77fe0e..58ae46e 100644 --- a/src/pipelines/floodscan_pipeline.py +++ b/src/pipelines/floodscan_pipeline.py @@ -104,7 +104,12 @@ def _get_dates_with_pairs(self, filenames): return sorted([date for date, types in pairs.items() if len(types) == 2]) def _get_90_days_filenames_for_dates(self, dates, max_days=5): - # --- Check existing files + """ + Given an input list of dates, this function returns a list of the + raw filenames associated with SFED and MFED data for each date. If no + raw file exists for a specifc date, then will search for a file up to + `max_days` ahead of that date. + """ if self.mode != "local": existing_files = [ x.name @@ -210,25 +215,21 @@ def _unzip_90days_file(self, file_to_unzip, dates): new_filename = os.path.basename( filename.replace(date_str[0], date.strftime(DATE_FORMAT)) ) + full_path = zipobj.extract(filename, self.local_raw_dir) + new_full_path = os.path.join( + os.path.dirname(full_path), new_filename + ) + os.rename(full_path, new_full_path) try: - full_path = zipobj.extract(filename, self.local_raw_dir) - new_full_path = os.path.join( - os.path.dirname(full_path), new_filename - ) - os.rename(full_path, new_full_path) unzipped_files.append( os.path.basename( shutil.move(new_full_path, self.local_raw_dir) ) ) - except FileExistsError: - self.logger.warning(f"File already exists: {new_filename}") - # except Exception as e: - # self.logger.error(f" ** Failed to extract: {e}") + except Exception as e: + self.logger.error(f" ** Failed to extract: {e}") return unzipped_files - # except Exception as e: - # self.logger.error(f"-- Failed to extract: {e}") def process_historical_zipped_data(self, zipped_filepaths, dates): unzipped_sfed = [] From 21b190da22403b950e69637833f39e7461a0858f Mon Sep 17 00:00:00 2001 From: hannahker Date: Sun, 19 Jan 2025 19:30:04 -0500 Subject: [PATCH 09/13] remove some error catching --- src/pipelines/floodscan_pipeline.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/pipelines/floodscan_pipeline.py b/src/pipelines/floodscan_pipeline.py index 58ae46e..b33b6e6 100644 --- a/src/pipelines/floodscan_pipeline.py +++ b/src/pipelines/floodscan_pipeline.py @@ -240,17 +240,11 @@ def process_historical_zipped_data(self, zipped_filepaths, dates): f"Unzipping data from from {filepath[SFED]} and {filepath[MFED]} to {self.local_raw_dir}" ) - # try: unzipped_sfed += self._unzip_90days_file(filepath[SFED], dates) unzipped_mfed += self._unzip_90days_file(filepath[MFED], dates) if len(dates) == len(unzipped_sfed): break - # except Exception as err: - # print(err) - # self.logger.error( - # f"Failed to extract {filepath[SFED]} or {filepath[MFED]}: {err}" - # ) unzipped_files = list(zip(unzipped_sfed, unzipped_mfed)) From f132ade24e563c7d25e3e67a5c3790dbaf52da46 Mon Sep 17 00:00:00 2001 From: hannahker Date: Mon, 20 Jan 2025 18:00:15 -0500 Subject: [PATCH 10/13] cleanup local when getting latest update --- src/pipelines/floodscan_pipeline.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/pipelines/floodscan_pipeline.py b/src/pipelines/floodscan_pipeline.py index b33b6e6..ee57c1f 100644 --- a/src/pipelines/floodscan_pipeline.py +++ b/src/pipelines/floodscan_pipeline.py @@ -425,6 +425,7 @@ def process_latest_update(self): sfed_da = self.process_data(sfed, band_type=SFED) mdfed_da = self.process_data(mfed, band_type=MFED) self._combine_bands(sfed_da, mdfed_da, latest_date) + self._cleanup_local() return True def run_pipeline(self): From 2967bf3f252ce7bc22a4351a558bdef92b22730b Mon Sep 17 00:00:00 2001 From: hannahker Date: Tue, 21 Jan 2025 10:16:49 -0500 Subject: [PATCH 11/13] bump up the match interval to 90 days --- src/pipelines/floodscan_pipeline.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/pipelines/floodscan_pipeline.py b/src/pipelines/floodscan_pipeline.py index ee57c1f..8921110 100644 --- a/src/pipelines/floodscan_pipeline.py +++ b/src/pipelines/floodscan_pipeline.py @@ -103,7 +103,7 @@ def _get_dates_with_pairs(self, filenames): pairs.setdefault(date, set()).add(file_type) return sorted([date for date, types in pairs.items() if len(types) == 2]) - def _get_90_days_filenames_for_dates(self, dates, max_days=5): + def _get_90_days_filenames_for_dates(self, dates, max_days=90): """ Given an input list of dates, this function returns a list of the raw filenames associated with SFED and MFED data for each date. If no @@ -126,7 +126,6 @@ def _get_90_days_filenames_for_dates(self, dates, max_days=5): if f.startswith("aer_floodscan") ] available_dates = self._get_dates_with_pairs(existing_files) - print(f"The following dates have zipped data: {available_dates}") filenames = [] for target in dates: From 02c6ec6903e1afb52db94773bf92f8ee1a7673b4 Mon Sep 17 00:00:00 2001 From: hannahker Date: Tue, 21 Jan 2025 12:26:17 -0500 Subject: [PATCH 12/13] minor logging updates --- src/pipelines/floodscan_pipeline.py | 12 +++++++----- src/pipelines/pipeline.py | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/pipelines/floodscan_pipeline.py b/src/pipelines/floodscan_pipeline.py index 8921110..32ffb2e 100644 --- a/src/pipelines/floodscan_pipeline.py +++ b/src/pipelines/floodscan_pipeline.py @@ -139,7 +139,7 @@ def _get_90_days_filenames_for_dates(self, dates, max_days=90): ) continue - # Check next 5 days + # Check next 90 days found = False for days in range(1, max_days + 1): future_date = target + timedelta(days=days) @@ -154,7 +154,7 @@ def _get_90_days_filenames_for_dates(self, dates, max_days=90): break if not found: - self.logger.warning( + self.logger.error( f"No available data within {max_days} days of {target}" ) @@ -249,10 +249,11 @@ def process_historical_zipped_data(self, zipped_filepaths, dates): for file in unzipped_files: date = get_datetime_from_filename(file[0]) - self.logger.info(f"Processing historical {SFED} data from {date}") + date_pretty = date.strftime("%Y-%m-%d") + self.logger.info(f"Processing historical {SFED} data from {date_pretty}") sfed_da = self.process_data(file[0], band_type=SFED) - self.logger.info(f"Processing historical {MFED} data from {date}") + self.logger.info(f"Processing historical {MFED} data from {date_pretty}") mfed_da = self.process_data(file[1], band_type=MFED) self._combine_bands(sfed_da, mfed_da, date=date) @@ -329,7 +330,8 @@ def query_api(self, date): return sfed_unzipped, mfed_unzipped, latest_date def process_historical_data(self, filepath, date, band_type): - self.logger.info(f"Processing historical {band_type} data from {date}") + date_pretty = date.strftime("%Y-%m-%d") + self.logger.info(f"Processing historical {band_type} data from {date_pretty}") with xr.open_dataset(filepath) as ds: ds = ds.transpose("time", "lat", "lon") diff --git a/src/pipelines/pipeline.py b/src/pipelines/pipeline.py index 02f7356..1066659 100644 --- a/src/pipelines/pipeline.py +++ b/src/pipelines/pipeline.py @@ -246,7 +246,7 @@ def print_coverage_report( self.logger.info(f"Overall coverage: {coverage_pct:.1f}%") if missing_dates: - self.logger.info("Missing Dates after 2024:") + self.logger.info(f"Missing dates after 2024 ({len(missing_dates)} total):") for date in missing_dates: self.logger.info(f" - {date.strftime('%Y-%m-%d')}") else: From b512e4386bbdb9b750a8b51d7bd51975531f6c1a Mon Sep 17 00:00:00 2001 From: hannahker Date: Tue, 21 Jan 2025 17:16:38 -0500 Subject: [PATCH 13/13] add testing file --- examples/floodscan_testing.md | 246 ++++++++++++++++++++++++++++++++++ 1 file changed, 246 insertions(+) create mode 100644 examples/floodscan_testing.md diff --git a/examples/floodscan_testing.md b/examples/floodscan_testing.md new file mode 100644 index 0000000..309153e --- /dev/null +++ b/examples/floodscan_testing.md @@ -0,0 +1,246 @@ +--- +jupyter: + jupytext: + text_representation: + extension: .md + format_name: markdown + format_version: '1.3' + jupytext_version: 1.16.3 + kernelspec: + display_name: venv + language: python + name: python3 +--- + +## Floodscan Pipeline testing + +This notebook runs through some basic testing for data updates with the `FloodscanPipeline`. All outputs are saved locally. Running this notebook will empty any local processed Flooscan COGs. + +```python +%load_ext autoreload +%autoreload 2 +``` + +Load Floodscan config and dependencies + +```python +from datetime import datetime, timedelta +from src.pipelines.floodscan_pipeline import FloodScanPipeline +from dotenv import load_dotenv +from src.config.settings import load_pipeline_config +from src.utils.azure_utils import blob_client, download_from_azure +import os +import glob + +load_dotenv() + +settings = load_pipeline_config("floodscan") +``` + +Set up an instance of the class. Note: Starting in `local` mode + +```python +settings.update( + { + "mode": "local", + "is_update": True, + "backfill": True, + "start_date": "2024-01-01", + "end_date": "2024-01-02", + "version": 5, + "log_level": "INFO", + "use_cache": False, # NOTE: Bug if use_cache: True + } +) + +floodscan = FloodScanPipeline(**settings) +``` + +Set up expected local directory with raw files + +```python +blob = blob_client("dev") + +sfed_local_file_path = floodscan.local_raw_dir / floodscan.sfed_historical +mfed_local_file_path = floodscan.local_raw_dir / floodscan.mfed_historical + +# Download the pre-2024 files +download_from_azure( + blob_service_client=blob, + container_name=floodscan.container_name, + blob_path=floodscan.raw_path / floodscan.sfed_historical, + local_file_path=sfed_local_file_path, +) + +download_from_azure( + blob_service_client=blob, + container_name=floodscan.container_name, + blob_path=floodscan.raw_path / floodscan.mfed_historical, + local_file_path=mfed_local_file_path, +) + +# Download the zip files +sfed_dates = [datetime(2024,2, 26), datetime(2024,2, 28)] +mfed_dates = [datetime(2024,2, 26), datetime(2024,2, 27), datetime(2024,2, 28)] + +for date in sfed_dates: + raw_filename = floodscan._generate_raw_filename(date, "SFED") + download_from_azure( + blob_service_client=blob, + container_name=floodscan.container_name, + blob_path=floodscan.raw_path / raw_filename, + local_file_path=floodscan.local_raw_dir / raw_filename, + ) + +for date in mfed_dates: + raw_filename = floodscan._generate_raw_filename(date, "MFED") + download_from_azure( + blob_service_client=blob, + container_name=floodscan.container_name, + blob_path=floodscan.raw_path / raw_filename, + local_file_path=floodscan.local_raw_dir / raw_filename, + ) +``` + +```python +def check_and_clean_directory(valid_dates, just_clean=False): + raw_dir = floodscan.local_processed_dir + expected_filenames = {f'aer_area_300s_v{date.strftime('%Y-%m-%d')}_v05r01.tif' for date in valid_dates} + all_files = set(os.path.basename(f) for f in glob.glob(os.path.join(raw_dir, '*.tif'))) + + if not just_clean: + # Check if files match exactly with expected files + if all_files != expected_filenames: + extra_files = all_files - expected_filenames + missing_files = expected_filenames - all_files + + error_msg = [] + if extra_files: + error_msg.append(f"Unexpected files found: {extra_files}") + if missing_files: + error_msg.append(f"Missing expected files: {missing_files}") + + raise AssertionError(" | ".join(error_msg)) + + # If we get here, the assertion passed, now remove all files + for file in all_files: + full_path = os.path.join(raw_dir, file) + os.remove(full_path) + print(f"Removed: {full_path}") +``` + +```python +check_and_clean_directory([], just_clean=True) +``` + +## Testing basic functionality + +**Test 1**: Run latest update + +```python +floodscan.process_latest_update() + +# Check and cleanup +yesterday = (datetime.now() - timedelta(days=1)).date() +check_and_clean_directory([yesterday]) +``` + +**Test 2**: Process single historical date where we have both SFED and MFED raw files present + +```python +test_date = [datetime(2024,2, 26)] +floodscan.process_historical_dates(test_date) + +check_and_clean_directory(test_date) +``` + +**Test 3**: Process single historical date where only MFED raw file present, but has raw file from within the preceeding 90 days + +```python +test_date = [datetime(2024,2, 27)] +floodscan.process_historical_dates(test_date) + +check_and_clean_directory(test_date) +``` + +**Test 4**: Processing single historical date where no raw file present, and no raw file from within the preceeding 90 days + +```python +test_date = [datetime(2024,3, 27)] +floodscan.process_historical_dates(test_date) + +check_and_clean_directory([]) # No data updates! Should this error instead? +``` + +**Test 5**: Process single historical date from before 2024 (so pulling from NetCDF file) + +```python +test_date = [datetime(2023,1, 27)] +floodscan.process_historical_dates(test_date) + +check_and_clean_directory(test_date) +``` + +**Test 6**: Process multiple historical dates, which have a mix of the above conditions + +```python +test_dates = [ + datetime(2023, 1, 25), # Pre 2024 + datetime(2024, 1, 25), # Raw file in preceeding 90 days + datetime(2024, 2, 25), # Raw file in preceeding 90 days + datetime(2024, 2, 26), # All there + datetime(2024, 2, 27), # Only mfed + datetime(2024, 2, 28), # All there +] +floodscan.process_historical_dates(test_dates) + +check_and_clean_directory(test_dates) +``` + +**Test 7**: Print coverage report + +```python +floodscan.print_coverage_report() +``` + +## End to end tests + +**Test 1**: Basic daily update, without any backfilling + +```python +settings.update({"is_update": True, "backfill": False}) +floodscan = FloodScanPipeline(**settings) +floodscan.run_pipeline() +``` + +**Test 2**: Basic daily update, with backfilling + +```python +settings.update({"is_update": True, "backfill": True}) +floodscan = FloodScanPipeline(**settings) +floodscan.run_pipeline() +``` + +**Test 3**: Test historical update, between two dates in 2024 + +```python +settings.update({"start_date": "2024-02-26","end_date": "2024-02-28", "backfill": False, "is_update": False}) +floodscan = FloodScanPipeline(**settings) +floodscan.run_pipeline() +``` + +**Test 4**: Test historical update, between two dates in 2023 + +```python +settings.update({"start_date": "2023-02-26","end_date": "2023-02-28"}) +floodscan = FloodScanPipeline(**settings) +floodscan.run_pipeline() +``` + +**Test 5**: Test historical update, between two dates that span 2023 and 2024 + +```python +settings.update({"start_date": "2023-12-26","end_date": "2024-01-05", "is_update": False}) +floodscan = FloodScanPipeline(**settings) +floodscan.run_pipeline() +```