From a523404db821eedd2bb074881ab6f991a26bac78 Mon Sep 17 00:00:00 2001 From: Frank Greguska <89428916+frankinspace@users.noreply.github.com> Date: Mon, 16 Dec 2024 11:37:38 -0800 Subject: [PATCH] Update to place harmony results in our bucket --- .github/workflows/cicd-pipeline.yml | 3 ++ README.md | 15 ++++----- bignbit/apply_opera_hls_treatment.py | 34 ++++++++++++--------- bignbit/generate_image_metadata.py | 2 +- bignbit/submit_harmony_job.py | 8 ++--- terraform/lambda_functions.tf | 4 +-- terraform/main.tf | 24 --------------- terraform/outputs.tf | 10 ++++-- terraform/s3.tf | 42 +++++++++++++++++--------- terraform/state_machine_definition.tpl | 3 +- terraform/variables.tf | 6 ++-- 11 files changed, 77 insertions(+), 74 deletions(-) diff --git a/.github/workflows/cicd-pipeline.yml b/.github/workflows/cicd-pipeline.yml index 81cf2cf..3b9e492 100644 --- a/.github/workflows/cicd-pipeline.yml +++ b/.github/workflows/cicd-pipeline.yml @@ -137,9 +137,12 @@ jobs: - name: Install conda uses: conda-incubator/setup-miniconda@v3 with: + channels: conda-forge activate-environment: bignbit environment-file: conda-environment.yaml auto-activate-base: false + conda-remove-defaults: "true" + miniforge-version: latest - name: Install package run: poetry install - name: Lint diff --git a/README.md b/README.md index 28572b1..40dc48b 100644 --- a/README.md +++ b/README.md @@ -107,10 +107,10 @@ should define the bignbit module and the bignbit step function state machine. Se > [!IMPORTANT] > bignbit uses the [user owned bucket](https://harmony.earthdata.nasa.gov/docs#user-owned-buckets-for-harmony-output) parameter -> when making Harmony requests. If an existing bucket is configured for the `harmony_staging_bucket` parameter, it must -> have a bucket policy that allows Harmony to write objects to it. If `harmony_staging_bucket` is left blank, bignbit will -> create a new S3 bucket (named `{app_name}-{stage}-harmony-staging`) and apply the correct permissions automatically. This bucket will also automatically expire objects -> older than 30 days. +> when making Harmony requests. If an existing bucket is configured for the `bignbit_staging_bucket` parameter, it must +> have a bucket policy that allows Harmony write permission and GIBS read permission. If `bignbit_staging_bucket` is left blank, bignbit will +> create a new S3 bucket (named `svc-${var.app_name}-${var.prefix}-staging`) and apply the correct permissions automatically. +> This bucket will also automatically expire objects older than 30 days. bignbit uses the harmony-py library to construct the Harmony requests for generating images. Most of the parameters are extracted from the CMA message as a granule is being processed but the `width` and `height` parameters @@ -131,8 +131,8 @@ This module uses the following input variables: | config_dir | string | Path relative to `config_bucket` where dataset configuration is stored | "datset-config" | | pobit_audit_bucket | string | S3 bucket where messages exchanged with GITC will be saved. Typically the cumulus internal bucket | | | pobit_audit_path | string | Path relative to `pobit_audit_bucket` where messages exchanged with GITC will be saved. | "pobit-cma-output" | -| harmony_staging_bucket | string | S3 bucket where Harmony results will be saved. Leave blank to use bucket managed by this module. | _create new bucket named {app_name}-{stage}-harmony-staging_ | -| harmony_staging_path | string | Path relative to `harmony_staging_bucket` where harmony results will be saved. | "bignbit-harmony-output" | +| bignbit_staging_bucket | string | S3 bucket where generated images will be saved. Leave blank to use bucket managed by this module. | _create new bucket named {app_name}-{stage}-harmony-staging_ | +| harmony_staging_path | string | Path relative to `bignbit_staging_bucket` where harmony results will be saved. | "bignbit-harmony-output" | | gibs_region | string | Region where GIBS resources are deployed | | | gibs_queue_name | string | Name of the GIBS SQS queue where outgoing CNM messages will be sent | | | gibs_account_id | string | AWS account ID for GIBS | | @@ -173,8 +173,9 @@ This module supplies the following outputs: | pobit_send_to_gitc_arn | ARN of the lambda function | aws_lambda_function.send_to_gitc.arn | | pobit_save_cnm_message_arn | ARN of the lambda function | aws_lambda_function.save_cnm_message.arn | | workflow_definition | Rendered state machine definition | rendered version of state_machine_definition.tpl | -| harmony_staging_bucket | Name of harmony staging bucket | var.harmony_staging_bucket | +| bignbit_staging_bucket | Name of bignbit staging bucket | var.bignbit_staging_bucket | | harmony_staging_path | Path to harmony requests relative to harmony staging bucket | var.harmony_staging_path | +| bignbit_lambda_role | Role created by the module applied to lambda functions | aws_iam_role.bignbit_lambda_role | # Assumptions - Using `ContentBasedDeduplication` strategy for GITC input queue diff --git a/bignbit/apply_opera_hls_treatment.py b/bignbit/apply_opera_hls_treatment.py index fbad73b..cdcc96b 100644 --- a/bignbit/apply_opera_hls_treatment.py +++ b/bignbit/apply_opera_hls_treatment.py @@ -2,6 +2,7 @@ """ Transforms each image in the input using specific processing required to produce an image for display in GITC """ +import datetime import logging import os import pathlib @@ -51,15 +52,18 @@ def process(self) -> List[Dict]: A list of CMA file dictionaries pointing to the transformed image(s) """ cma_file_list = self.input['big'] + staging_bucket = self.config.get('bignbit_staging_bucket') mgrs_grid_code = utils.extract_mgrs_grid_code(self.input['granule_umm_json']) - file_metadata_list = transform_images(cma_file_list, pathlib.Path(f"{self.path}"), mgrs_grid_code) + file_metadata_list = transform_images(cma_file_list, pathlib.Path(f"{self.path}"), mgrs_grid_code, + staging_bucket) del self.input['big'] self.input['big'] = file_metadata_list return self.input -def transform_images(cma_file_list: List[Dict], temp_dir: pathlib.Path, mgrs_grid_code: str) -> List[Dict]: +def transform_images(cma_file_list: List[Dict], temp_dir: pathlib.Path, mgrs_grid_code: str, + staging_bucket: str) -> List[Dict]: """ Applies special OPERA HLS processing to each input image. Each input image will result in multiple output transformed images. @@ -72,6 +76,8 @@ def transform_images(cma_file_list: List[Dict], temp_dir: pathlib.Path, mgrs_gri Temporary working directory on local disk mgrs_grid_code MGRS grid code for the current granule being processed + staging_bucket + Staging bucket to which transformed files should be written Returns ------- @@ -96,7 +102,7 @@ def transform_images(cma_file_list: List[Dict], temp_dir: pathlib.Path, mgrs_gri CUMULUS_LOGGER.info(f'Created new images: {[str(t) for t in transformed_images_filepaths]}') # Create new file metadata for each new image - file_metadata_dicts = create_file_metadata(cma_file_meta, transformed_images_filepaths) + file_metadata_dicts = create_file_metadata(transformed_images_filepaths, staging_bucket) file_metadata_results.extend(file_metadata_dicts) # Upload new images to s3 @@ -205,7 +211,7 @@ def the_opera_hls_treatment(source_image_filepath: pathlib.Path, working_dirpath return result_image_filepaths -def create_file_metadata(original_cma_file_meta: dict, transformed_images_filepaths: List[pathlib.Path]) -> List[Dict]: +def create_file_metadata(transformed_images_filepaths: List[pathlib.Path], staging_bucket: str) -> List[Dict]: """ Generate a new CMA file metadata dictionary for each transformed image using the original CMA metadata as a template. @@ -215,10 +221,10 @@ def create_file_metadata(original_cma_file_meta: dict, transformed_images_filepa Parameters ---------- - original_cma_file_meta - CMA file metadata dict of the original source image transformed_images_filepaths Local filepaths to each output transformed image + staging_bucket + Staging bucket to which transformed files should be written Returns ------- @@ -227,14 +233,14 @@ def create_file_metadata(original_cma_file_meta: dict, transformed_images_filepa """ new_cma_file_meta_list = [] for transformed_image in transformed_images_filepaths: - new_cma_file_meta = original_cma_file_meta.copy() - new_cma_file_meta["fileName"] = transformed_image.name - # Takes the 'key' from the original and replace just the last part with the new filename - new_cma_file_meta["key"] = str(pathlib.Path(*pathlib.Path(original_cma_file_meta["key"]).parts[0:-1]).joinpath( - transformed_image.name)) - new_cma_file_meta["local_filepath"] = str(transformed_image.resolve()) - - new_cma_file_meta_list.append(new_cma_file_meta) + file_dict = { + "fileName": transformed_image.name, + "bucket": staging_bucket, + "key": f'opera_hls_processing/{datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d")}/{transformed_image.name}', + "local_filepath": str(transformed_image.resolve()) + } + + new_cma_file_meta_list.append(file_dict) return new_cma_file_meta_list diff --git a/bignbit/generate_image_metadata.py b/bignbit/generate_image_metadata.py index a9901c0..aa405bd 100644 --- a/bignbit/generate_image_metadata.py +++ b/bignbit/generate_image_metadata.py @@ -94,7 +94,7 @@ def generate_metadata(cma_file_list: List[Dict], granule_umm_json: dict, temp_di granule_type = "browse" if granule_extension in BROWSE_IMAGE_EXTENSION_SUBTYPES else "metadata" if granule_type == "browse": granule_subtype = BROWSE_IMAGE_EXTENSION_SUBTYPES.get(granule_extension, None) - elif granule_extension.lower() == '.wld' or granule_extension.lower() == '.pgw' : + elif granule_extension.lower() == '.wld' or granule_extension.lower() == '.pgw': granule_subtype = "world file" else: granule_subtype = None diff --git a/bignbit/submit_harmony_job.py b/bignbit/submit_harmony_job.py index e17b6e9..0f1316d 100644 --- a/bignbit/submit_harmony_job.py +++ b/bignbit/submit_harmony_job.py @@ -38,20 +38,20 @@ def process(self): current_item = self.config.get('current_item') variable = current_item.get('id') big_config = self.config.get('big_config') - harmony_staging_bucket = self.config.get('harmony_staging_bucket') + bignbit_staging_bucket = self.config.get('bignbit_staging_bucket') harmony_staging_path = self.config.get('harmony_staging_path') harmony_job = submit_harmony_job(cmr_env, collection_concept_id, collection_name, granule_concept_id, - granule_id, variable, big_config, harmony_staging_bucket, harmony_staging_path) + granule_id, variable, big_config, bignbit_staging_bucket, harmony_staging_path) self.input['harmony_job'] = harmony_job return self.input def submit_harmony_job(cmr_env, collection_concept_id, collection_name, granule_concept_id, granule_id, variable, - big_config, harmony_staging_bucket, harmony_staging_path): + big_config, bignbit_staging_bucket, harmony_staging_path): """Generate harmony job and returns harmony job id""" - destination_bucket_url = f's3://{harmony_staging_bucket}/{harmony_staging_path}/{collection_name}/{datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d")}'.lower() + destination_bucket_url = f's3://{bignbit_staging_bucket}/{harmony_staging_path}/{collection_name}/{datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d")}'.lower() harmony_client = utils.get_harmony_client(cmr_env) harmony_request = generate_harmony_request(collection_concept_id, granule_concept_id, variable, big_config, destination_bucket_url) diff --git a/terraform/lambda_functions.tf b/terraform/lambda_functions.tf index cf1aab7..52b10a9 100644 --- a/terraform/lambda_functions.tf +++ b/terraform/lambda_functions.tf @@ -571,10 +571,10 @@ data "aws_iam_policy_document" "bignbit_lambda_policy" { "s3:DeleteObjectVersion", ] resources = [ - "arn:aws:s3:::${local.harmony_bucket_name}", + "arn:aws:s3:::${local.staging_bucket_name}", "arn:aws:s3:::${var.config_bucket}", "arn:aws:s3:::${var.pobit_audit_bucket}", - "arn:aws:s3:::${local.harmony_bucket_name}/*", + "arn:aws:s3:::${local.staging_bucket_name}/*", "arn:aws:s3:::${var.config_bucket}/*", "arn:aws:s3:::${var.pobit_audit_bucket}/*" ] diff --git a/terraform/main.tf b/terraform/main.tf index 03409cd..866895c 100644 --- a/terraform/main.tf +++ b/terraform/main.tf @@ -26,30 +26,6 @@ provider "aws" { data "aws_caller_identity" "current" {} data "aws_region" "current" {} -# -# data "template_file" "workflow_definition" { -# template = file("${path.module}/state_machine_definition.tpl") -# vars = { -# GetDatasetConfigurationLambda = aws_lambda_function.get_dataset_configuration.arn, -# ConfigBucket = var.config_bucket, -# ConfigDir = var.config_dir, -# GetGranuleUmmJsonLambda = aws_lambda_function.get_granule_umm_json.arn, -# IdentifyImageFileLambda = aws_lambda_function.identify_image_file.arn, -# ApplyOperaHLSTreatmentLambda = aws_lambda_function.apply_opera_hls_treatment.arn, -# GetCollectionConceptIdLambda = aws_lambda_function.get_collection_concept_id.arn, -# SubmitHarmonyJobLambda = aws_lambda_function.submit_harmony_job.arn, -# GetHarmonyJobStatusLambda = aws_lambda_function.get_harmony_job_status.arn, -# GenerateImageMetadataLambda = aws_lambda_function.generate_image_metadata.arn, -# BuildImageSetsLambda = aws_lambda_function.build_image_sets.arn, -# SendToGITCLambda = aws_lambda_function.send_to_gitc.arn, -# SaveCNMMessageLambda = aws_lambda_function.save_cnm_message.arn, -# PobitAuditBucket = var.pobit_audit_bucket, -# PobitAuditPath = var.pobit_audit_path, -# HarmonyStagingBucket = var.harmony_staging_bucket, -# HarmonyStagingPath = var.harmony_staging_path -# } -# } - locals { environment = var.stage diff --git a/terraform/outputs.tf b/terraform/outputs.tf index 1599f39..404f5fa 100644 --- a/terraform/outputs.tf +++ b/terraform/outputs.tf @@ -96,15 +96,19 @@ output "workflow_definition" { SaveCNMMessageLambda = aws_lambda_function.save_cnm_message.arn, PobitAuditBucket = var.pobit_audit_bucket, PobitAuditPath = var.pobit_audit_path, - HarmonyStagingBucket = local.harmony_bucket_name, + StagingBucket = local.staging_bucket_name, HarmonyStagingPath = var.harmony_staging_path }) } -output "harmony_staging_bucket" { - value = local.harmony_bucket_name +output "bignbit_staging_bucket" { + value = local.staging_bucket_name } output "harmony_staging_path" { value = var.harmony_staging_path +} + +output "bignbit_lambda_role" { + value = aws_iam_role.bignbit_lambda_role } \ No newline at end of file diff --git a/terraform/s3.tf b/terraform/s3.tf index 417a2c1..55667f1 100644 --- a/terraform/s3.tf +++ b/terraform/s3.tf @@ -1,12 +1,12 @@ -# Create a bucket to store harmony results in unless one is provided as a variable +# Create a bucket to store bignbit results in unless one is provided as a variable locals { - create_bucket = var.harmony_staging_bucket == "" - harmony_bucket_name = local.create_bucket ? aws_s3_bucket.harmony_staging_bucket[0].id : var.harmony_staging_bucket + create_bucket = var.bignbit_staging_bucket == "" + staging_bucket_name = local.create_bucket ? aws_s3_bucket.bignbit_staging_bucket[0].id : var.bignbit_staging_bucket } -resource "aws_s3_bucket" "harmony_staging_bucket" { +resource "aws_s3_bucket" "bignbit_staging_bucket" { count = local.create_bucket ? 1 : 0 - bucket = "${local.aws_resources_name}-harmony-staging" + bucket = "${local.aws_resources_name}-staging" lifecycle { ignore_changes = [ @@ -17,7 +17,7 @@ resource "aws_s3_bucket" "harmony_staging_bucket" { resource "aws_s3_bucket_ownership_controls" "disable_acls" { count = local.create_bucket ? 1 : 0 - bucket = aws_s3_bucket.harmony_staging_bucket[0].id + bucket = aws_s3_bucket.bignbit_staging_bucket[0].id rule { object_ownership = "BucketOwnerEnforced" @@ -26,7 +26,7 @@ resource "aws_s3_bucket_ownership_controls" "disable_acls" { resource "aws_s3_bucket_server_side_encryption_configuration" "enable_bucket_encryption" { count = local.create_bucket ? 1 : 0 - bucket = aws_s3_bucket.harmony_staging_bucket[0].id + bucket = aws_s3_bucket.bignbit_staging_bucket[0].id rule { bucket_key_enabled = true @@ -39,7 +39,7 @@ resource "aws_s3_bucket_server_side_encryption_configuration" "enable_bucket_enc resource "aws_s3_bucket_lifecycle_configuration" "expire_objects_30_days" { count = local.create_bucket ? 1 : 0 - bucket = aws_s3_bucket.harmony_staging_bucket[0].id + bucket = aws_s3_bucket.bignbit_staging_bucket[0].id rule { id = "ExpireHarmonyObjectsAfter30Days" @@ -51,12 +51,23 @@ resource "aws_s3_bucket_lifecycle_configuration" "expire_objects_30_days" { prefix = var.harmony_staging_path } } + + rule { + id = "ExpireOperaHLSObjectsAfter30Days" + status = "Enabled" + expiration { + days = 30 + } + filter { + prefix = "opera_hls_processing/" + } + } } -resource "aws_s3_bucket_policy" "harmony_staging_bucket_policy" { +resource "aws_s3_bucket_policy" "bignbit_staging_bucket_policy" { count = local.create_bucket ? 1 : 0 - bucket = aws_s3_bucket.harmony_staging_bucket[0].id - policy = data.aws_iam_policy_document.harmony_staging_bucket_policy[0].json + bucket = aws_s3_bucket.bignbit_staging_bucket[0].id + policy = data.aws_iam_policy_document.bignbit_staging_bucket_policy[0].json } data "aws_iam_policy_document" "allow_harmony_write" { @@ -68,7 +79,7 @@ data "aws_iam_policy_document" "allow_harmony_write" { "s3:PutObject", ] resources = [ - "${aws_s3_bucket.harmony_staging_bucket[0].arn}/${var.harmony_staging_path}/*" + "${aws_s3_bucket.bignbit_staging_bucket[0].arn}/${var.harmony_staging_path}/*" ] principals { identifiers = ["arn:aws:iam::549360732797:root", "arn:aws:iam::625642860590:root"] @@ -86,7 +97,7 @@ data "aws_iam_policy_document" "allow_harmony_location" { "s3:GetBucketLocation", ] resources = [ - aws_s3_bucket.harmony_staging_bucket[0].arn + aws_s3_bucket.bignbit_staging_bucket[0].arn ] principals { identifiers = ["arn:aws:iam::549360732797:root", "arn:aws:iam::625642860590:root"] @@ -104,7 +115,8 @@ data "aws_iam_policy_document" "allow_gibs_getobject" { "s3:GetObject*", ] resources = [ - "${aws_s3_bucket.harmony_staging_bucket[0].arn}/${var.harmony_staging_path}/*" + "${aws_s3_bucket.bignbit_staging_bucket[0].arn}/${var.harmony_staging_path}/*", + "${aws_s3_bucket.bignbit_staging_bucket[0].arn}/opera_hls_processing/*" ] principals { identifiers = ["arn:aws:iam::${var.gibs_account_id}:root"] @@ -113,7 +125,7 @@ data "aws_iam_policy_document" "allow_gibs_getobject" { } } -data "aws_iam_policy_document" "harmony_staging_bucket_policy" { +data "aws_iam_policy_document" "bignbit_staging_bucket_policy" { count = local.create_bucket ? 1 : 0 source_policy_documents = [ data.aws_iam_policy_document.allow_harmony_write[0].json, diff --git a/terraform/state_machine_definition.tpl b/terraform/state_machine_definition.tpl index 3513a58..3b67436 100644 --- a/terraform/state_machine_definition.tpl +++ b/terraform/state_machine_definition.tpl @@ -162,6 +162,7 @@ "cma":{ "event.$":"$", "task_config":{ + "bignbit_staging_bucket": "${StagingBucket}", "cumulus_message":{ "input":"{$.payload}" } @@ -260,7 +261,7 @@ "cmr_environment":"{$.meta.cmr.cmrEnvironment}", "cmr_clientid":"{$.meta.cmr.clientId}", "current_item":"{$.current_item}", - "harmony_staging_bucket": "${HarmonyStagingBucket}", + "bignbit_staging_bucket": "${StagingBucket}", "harmony_staging_path": "${HarmonyStagingPath}", "big_config":"{$.payload.datasetConfigurationForBIG}", "cumulus_message":{ diff --git a/terraform/variables.tf b/terraform/variables.tf index 1934d84..461e816 100644 --- a/terraform/variables.tf +++ b/terraform/variables.tf @@ -36,15 +36,15 @@ variable "pobit_audit_path" { default = "pobit-cma-output" } -variable "harmony_staging_bucket" { +variable "bignbit_staging_bucket" { type = string - description = "S3 bucket where Harmony results will be saved. Leave blank to use bucket managed by this module." + description = "S3 bucket where generated images will be saved. Leave blank to use bucket managed by this module." default = "" } variable "harmony_staging_path" { type = string - description = "Path relative to harmony_staging_bucket where harmony results will be saved." + description = "Path relative to bignbit_staging_bucket where harmony results will be saved." default = "bignbit-harmony-output" }