Skip to content

Commit

Permalink
Update to place harmony results in our bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
frankinspace committed Dec 16, 2024
1 parent 2f6fc1a commit a523404
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 74 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/cicd-pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,12 @@ jobs:
- name: Install conda
uses: conda-incubator/setup-miniconda@v3
with:
channels: conda-forge
activate-environment: bignbit
environment-file: conda-environment.yaml
auto-activate-base: false
conda-remove-defaults: "true"
miniforge-version: latest
- name: Install package
run: poetry install
- name: Lint
Expand Down
15 changes: 8 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ should define the bignbit module and the bignbit step function state machine. Se

> [!IMPORTANT]
> bignbit uses the [user owned bucket](https://harmony.earthdata.nasa.gov/docs#user-owned-buckets-for-harmony-output) parameter
> when making Harmony requests. If an existing bucket is configured for the `harmony_staging_bucket` parameter, it must
> have a bucket policy that allows Harmony to write objects to it. If `harmony_staging_bucket` is left blank, bignbit will
> create a new S3 bucket (named `{app_name}-{stage}-harmony-staging`) and apply the correct permissions automatically. This bucket will also automatically expire objects
> older than 30 days.
> when making Harmony requests. If an existing bucket is configured for the `bignbit_staging_bucket` parameter, it must
> have a bucket policy that allows Harmony write permission and GIBS read permission. If `bignbit_staging_bucket` is left blank, bignbit will
> create a new S3 bucket (named `svc-${var.app_name}-${var.prefix}-staging`) and apply the correct permissions automatically.
> This bucket will also automatically expire objects older than 30 days.
bignbit uses the harmony-py library to construct the Harmony requests for generating images. Most of the parameters
are extracted from the CMA message as a granule is being processed but the `width` and `height` parameters
Expand All @@ -131,8 +131,8 @@ This module uses the following input variables:
| config_dir | string | Path relative to `config_bucket` where dataset configuration is stored | "datset-config" |
| pobit_audit_bucket | string | S3 bucket where messages exchanged with GITC will be saved. Typically the cumulus internal bucket | |
| pobit_audit_path | string | Path relative to `pobit_audit_bucket` where messages exchanged with GITC will be saved. | "pobit-cma-output" |
| harmony_staging_bucket | string | S3 bucket where Harmony results will be saved. Leave blank to use bucket managed by this module. | _create new bucket named {app_name}-{stage}-harmony-staging_ |
| harmony_staging_path | string | Path relative to `harmony_staging_bucket` where harmony results will be saved. | "bignbit-harmony-output" |
| bignbit_staging_bucket | string | S3 bucket where generated images will be saved. Leave blank to use bucket managed by this module. | _create new bucket named {app_name}-{stage}-harmony-staging_ |
| harmony_staging_path | string | Path relative to `bignbit_staging_bucket` where harmony results will be saved. | "bignbit-harmony-output" |
| gibs_region | string | Region where GIBS resources are deployed | |
| gibs_queue_name | string | Name of the GIBS SQS queue where outgoing CNM messages will be sent | |
| gibs_account_id | string | AWS account ID for GIBS | |
Expand Down Expand Up @@ -173,8 +173,9 @@ This module supplies the following outputs:
| pobit_send_to_gitc_arn | ARN of the lambda function | aws_lambda_function.send_to_gitc.arn |
| pobit_save_cnm_message_arn | ARN of the lambda function | aws_lambda_function.save_cnm_message.arn |
| workflow_definition | Rendered state machine definition | rendered version of state_machine_definition.tpl |
| harmony_staging_bucket | Name of harmony staging bucket | var.harmony_staging_bucket |
| bignbit_staging_bucket | Name of bignbit staging bucket | var.bignbit_staging_bucket |
| harmony_staging_path | Path to harmony requests relative to harmony staging bucket | var.harmony_staging_path |
| bignbit_lambda_role | Role created by the module applied to lambda functions | aws_iam_role.bignbit_lambda_role |

# Assumptions
- Using `ContentBasedDeduplication` strategy for GITC input queue
Expand Down
34 changes: 20 additions & 14 deletions bignbit/apply_opera_hls_treatment.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""
Transforms each image in the input using specific processing required to produce an image for display in GITC
"""
import datetime
import logging
import os
import pathlib
Expand Down Expand Up @@ -51,15 +52,18 @@ def process(self) -> List[Dict]:
A list of CMA file dictionaries pointing to the transformed image(s)
"""
cma_file_list = self.input['big']
staging_bucket = self.config.get('bignbit_staging_bucket')

mgrs_grid_code = utils.extract_mgrs_grid_code(self.input['granule_umm_json'])
file_metadata_list = transform_images(cma_file_list, pathlib.Path(f"{self.path}"), mgrs_grid_code)
file_metadata_list = transform_images(cma_file_list, pathlib.Path(f"{self.path}"), mgrs_grid_code,
staging_bucket)
del self.input['big']
self.input['big'] = file_metadata_list
return self.input


def transform_images(cma_file_list: List[Dict], temp_dir: pathlib.Path, mgrs_grid_code: str) -> List[Dict]:
def transform_images(cma_file_list: List[Dict], temp_dir: pathlib.Path, mgrs_grid_code: str,
staging_bucket: str) -> List[Dict]:
"""
Applies special OPERA HLS processing to each input image. Each input image will result in multiple output transformed
images.
Expand All @@ -72,6 +76,8 @@ def transform_images(cma_file_list: List[Dict], temp_dir: pathlib.Path, mgrs_gri
Temporary working directory on local disk
mgrs_grid_code
MGRS grid code for the current granule being processed
staging_bucket
Staging bucket to which transformed files should be written
Returns
-------
Expand All @@ -96,7 +102,7 @@ def transform_images(cma_file_list: List[Dict], temp_dir: pathlib.Path, mgrs_gri
CUMULUS_LOGGER.info(f'Created new images: {[str(t) for t in transformed_images_filepaths]}')

# Create new file metadata for each new image
file_metadata_dicts = create_file_metadata(cma_file_meta, transformed_images_filepaths)
file_metadata_dicts = create_file_metadata(transformed_images_filepaths, staging_bucket)
file_metadata_results.extend(file_metadata_dicts)

# Upload new images to s3
Expand Down Expand Up @@ -205,7 +211,7 @@ def the_opera_hls_treatment(source_image_filepath: pathlib.Path, working_dirpath
return result_image_filepaths


def create_file_metadata(original_cma_file_meta: dict, transformed_images_filepaths: List[pathlib.Path]) -> List[Dict]:
def create_file_metadata(transformed_images_filepaths: List[pathlib.Path], staging_bucket: str) -> List[Dict]:
"""
Generate a new CMA file metadata dictionary for each transformed image using the original CMA metadata as a
template.
Expand All @@ -215,10 +221,10 @@ def create_file_metadata(original_cma_file_meta: dict, transformed_images_filepa
Parameters
----------
original_cma_file_meta
CMA file metadata dict of the original source image
transformed_images_filepaths
Local filepaths to each output transformed image
staging_bucket
Staging bucket to which transformed files should be written
Returns
-------
Expand All @@ -227,14 +233,14 @@ def create_file_metadata(original_cma_file_meta: dict, transformed_images_filepa
"""
new_cma_file_meta_list = []
for transformed_image in transformed_images_filepaths:
new_cma_file_meta = original_cma_file_meta.copy()
new_cma_file_meta["fileName"] = transformed_image.name
# Takes the 'key' from the original and replace just the last part with the new filename
new_cma_file_meta["key"] = str(pathlib.Path(*pathlib.Path(original_cma_file_meta["key"]).parts[0:-1]).joinpath(
transformed_image.name))
new_cma_file_meta["local_filepath"] = str(transformed_image.resolve())

new_cma_file_meta_list.append(new_cma_file_meta)
file_dict = {
"fileName": transformed_image.name,
"bucket": staging_bucket,
"key": f'opera_hls_processing/{datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d")}/{transformed_image.name}',
"local_filepath": str(transformed_image.resolve())
}

new_cma_file_meta_list.append(file_dict)

return new_cma_file_meta_list

Expand Down
2 changes: 1 addition & 1 deletion bignbit/generate_image_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def generate_metadata(cma_file_list: List[Dict], granule_umm_json: dict, temp_di
granule_type = "browse" if granule_extension in BROWSE_IMAGE_EXTENSION_SUBTYPES else "metadata"
if granule_type == "browse":
granule_subtype = BROWSE_IMAGE_EXTENSION_SUBTYPES.get(granule_extension, None)
elif granule_extension.lower() == '.wld' or granule_extension.lower() == '.pgw' :
elif granule_extension.lower() == '.wld' or granule_extension.lower() == '.pgw':
granule_subtype = "world file"
else:
granule_subtype = None
Expand Down
8 changes: 4 additions & 4 deletions bignbit/submit_harmony_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,20 @@ def process(self):
current_item = self.config.get('current_item')
variable = current_item.get('id')
big_config = self.config.get('big_config')
harmony_staging_bucket = self.config.get('harmony_staging_bucket')
bignbit_staging_bucket = self.config.get('bignbit_staging_bucket')
harmony_staging_path = self.config.get('harmony_staging_path')

harmony_job = submit_harmony_job(cmr_env, collection_concept_id, collection_name, granule_concept_id,
granule_id, variable, big_config, harmony_staging_bucket, harmony_staging_path)
granule_id, variable, big_config, bignbit_staging_bucket, harmony_staging_path)
self.input['harmony_job'] = harmony_job
return self.input


def submit_harmony_job(cmr_env, collection_concept_id, collection_name, granule_concept_id, granule_id, variable,
big_config, harmony_staging_bucket, harmony_staging_path):
big_config, bignbit_staging_bucket, harmony_staging_path):
"""Generate harmony job and returns harmony job id"""

destination_bucket_url = f's3://{harmony_staging_bucket}/{harmony_staging_path}/{collection_name}/{datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d")}'.lower()
destination_bucket_url = f's3://{bignbit_staging_bucket}/{harmony_staging_path}/{collection_name}/{datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d")}'.lower()
harmony_client = utils.get_harmony_client(cmr_env)
harmony_request = generate_harmony_request(collection_concept_id, granule_concept_id, variable, big_config,
destination_bucket_url)
Expand Down
4 changes: 2 additions & 2 deletions terraform/lambda_functions.tf
Original file line number Diff line number Diff line change
Expand Up @@ -571,10 +571,10 @@ data "aws_iam_policy_document" "bignbit_lambda_policy" {
"s3:DeleteObjectVersion",
]
resources = [
"arn:aws:s3:::${local.harmony_bucket_name}",
"arn:aws:s3:::${local.staging_bucket_name}",
"arn:aws:s3:::${var.config_bucket}",
"arn:aws:s3:::${var.pobit_audit_bucket}",
"arn:aws:s3:::${local.harmony_bucket_name}/*",
"arn:aws:s3:::${local.staging_bucket_name}/*",
"arn:aws:s3:::${var.config_bucket}/*",
"arn:aws:s3:::${var.pobit_audit_bucket}/*"
]
Expand Down
24 changes: 0 additions & 24 deletions terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,6 @@ provider "aws" {
data "aws_caller_identity" "current" {}
data "aws_region" "current" {}

#
# data "template_file" "workflow_definition" {
# template = file("${path.module}/state_machine_definition.tpl")
# vars = {
# GetDatasetConfigurationLambda = aws_lambda_function.get_dataset_configuration.arn,
# ConfigBucket = var.config_bucket,
# ConfigDir = var.config_dir,
# GetGranuleUmmJsonLambda = aws_lambda_function.get_granule_umm_json.arn,
# IdentifyImageFileLambda = aws_lambda_function.identify_image_file.arn,
# ApplyOperaHLSTreatmentLambda = aws_lambda_function.apply_opera_hls_treatment.arn,
# GetCollectionConceptIdLambda = aws_lambda_function.get_collection_concept_id.arn,
# SubmitHarmonyJobLambda = aws_lambda_function.submit_harmony_job.arn,
# GetHarmonyJobStatusLambda = aws_lambda_function.get_harmony_job_status.arn,
# GenerateImageMetadataLambda = aws_lambda_function.generate_image_metadata.arn,
# BuildImageSetsLambda = aws_lambda_function.build_image_sets.arn,
# SendToGITCLambda = aws_lambda_function.send_to_gitc.arn,
# SaveCNMMessageLambda = aws_lambda_function.save_cnm_message.arn,
# PobitAuditBucket = var.pobit_audit_bucket,
# PobitAuditPath = var.pobit_audit_path,
# HarmonyStagingBucket = var.harmony_staging_bucket,
# HarmonyStagingPath = var.harmony_staging_path
# }
# }


locals {
environment = var.stage
Expand Down
10 changes: 7 additions & 3 deletions terraform/outputs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,19 @@ output "workflow_definition" {
SaveCNMMessageLambda = aws_lambda_function.save_cnm_message.arn,
PobitAuditBucket = var.pobit_audit_bucket,
PobitAuditPath = var.pobit_audit_path,
HarmonyStagingBucket = local.harmony_bucket_name,
StagingBucket = local.staging_bucket_name,
HarmonyStagingPath = var.harmony_staging_path
})
}

output "harmony_staging_bucket" {
value = local.harmony_bucket_name
output "bignbit_staging_bucket" {
value = local.staging_bucket_name
}

output "harmony_staging_path" {
value = var.harmony_staging_path
}

output "bignbit_lambda_role" {
value = aws_iam_role.bignbit_lambda_role
}
42 changes: 27 additions & 15 deletions terraform/s3.tf
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Create a bucket to store harmony results in unless one is provided as a variable
# Create a bucket to store bignbit results in unless one is provided as a variable
locals {
create_bucket = var.harmony_staging_bucket == ""
harmony_bucket_name = local.create_bucket ? aws_s3_bucket.harmony_staging_bucket[0].id : var.harmony_staging_bucket
create_bucket = var.bignbit_staging_bucket == ""
staging_bucket_name = local.create_bucket ? aws_s3_bucket.bignbit_staging_bucket[0].id : var.bignbit_staging_bucket
}

resource "aws_s3_bucket" "harmony_staging_bucket" {
resource "aws_s3_bucket" "bignbit_staging_bucket" {
count = local.create_bucket ? 1 : 0
bucket = "${local.aws_resources_name}-harmony-staging"
bucket = "${local.aws_resources_name}-staging"

lifecycle {
ignore_changes = [
Expand All @@ -17,7 +17,7 @@ resource "aws_s3_bucket" "harmony_staging_bucket" {

resource "aws_s3_bucket_ownership_controls" "disable_acls" {
count = local.create_bucket ? 1 : 0
bucket = aws_s3_bucket.harmony_staging_bucket[0].id
bucket = aws_s3_bucket.bignbit_staging_bucket[0].id

rule {
object_ownership = "BucketOwnerEnforced"
Expand All @@ -26,7 +26,7 @@ resource "aws_s3_bucket_ownership_controls" "disable_acls" {

resource "aws_s3_bucket_server_side_encryption_configuration" "enable_bucket_encryption" {
count = local.create_bucket ? 1 : 0
bucket = aws_s3_bucket.harmony_staging_bucket[0].id
bucket = aws_s3_bucket.bignbit_staging_bucket[0].id

rule {
bucket_key_enabled = true
Expand All @@ -39,7 +39,7 @@ resource "aws_s3_bucket_server_side_encryption_configuration" "enable_bucket_enc

resource "aws_s3_bucket_lifecycle_configuration" "expire_objects_30_days" {
count = local.create_bucket ? 1 : 0
bucket = aws_s3_bucket.harmony_staging_bucket[0].id
bucket = aws_s3_bucket.bignbit_staging_bucket[0].id

rule {
id = "ExpireHarmonyObjectsAfter30Days"
Expand All @@ -51,12 +51,23 @@ resource "aws_s3_bucket_lifecycle_configuration" "expire_objects_30_days" {
prefix = var.harmony_staging_path
}
}

rule {
id = "ExpireOperaHLSObjectsAfter30Days"
status = "Enabled"
expiration {
days = 30
}
filter {
prefix = "opera_hls_processing/"
}
}
}

resource "aws_s3_bucket_policy" "harmony_staging_bucket_policy" {
resource "aws_s3_bucket_policy" "bignbit_staging_bucket_policy" {
count = local.create_bucket ? 1 : 0
bucket = aws_s3_bucket.harmony_staging_bucket[0].id
policy = data.aws_iam_policy_document.harmony_staging_bucket_policy[0].json
bucket = aws_s3_bucket.bignbit_staging_bucket[0].id
policy = data.aws_iam_policy_document.bignbit_staging_bucket_policy[0].json
}

data "aws_iam_policy_document" "allow_harmony_write" {
Expand All @@ -68,7 +79,7 @@ data "aws_iam_policy_document" "allow_harmony_write" {
"s3:PutObject",
]
resources = [
"${aws_s3_bucket.harmony_staging_bucket[0].arn}/${var.harmony_staging_path}/*"
"${aws_s3_bucket.bignbit_staging_bucket[0].arn}/${var.harmony_staging_path}/*"
]
principals {
identifiers = ["arn:aws:iam::549360732797:root", "arn:aws:iam::625642860590:root"]
Expand All @@ -86,7 +97,7 @@ data "aws_iam_policy_document" "allow_harmony_location" {
"s3:GetBucketLocation",
]
resources = [
aws_s3_bucket.harmony_staging_bucket[0].arn
aws_s3_bucket.bignbit_staging_bucket[0].arn
]
principals {
identifiers = ["arn:aws:iam::549360732797:root", "arn:aws:iam::625642860590:root"]
Expand All @@ -104,7 +115,8 @@ data "aws_iam_policy_document" "allow_gibs_getobject" {
"s3:GetObject*",
]
resources = [
"${aws_s3_bucket.harmony_staging_bucket[0].arn}/${var.harmony_staging_path}/*"
"${aws_s3_bucket.bignbit_staging_bucket[0].arn}/${var.harmony_staging_path}/*",
"${aws_s3_bucket.bignbit_staging_bucket[0].arn}/opera_hls_processing/*"
]
principals {
identifiers = ["arn:aws:iam::${var.gibs_account_id}:root"]
Expand All @@ -113,7 +125,7 @@ data "aws_iam_policy_document" "allow_gibs_getobject" {
}
}

data "aws_iam_policy_document" "harmony_staging_bucket_policy" {
data "aws_iam_policy_document" "bignbit_staging_bucket_policy" {
count = local.create_bucket ? 1 : 0
source_policy_documents = [
data.aws_iam_policy_document.allow_harmony_write[0].json,
Expand Down
3 changes: 2 additions & 1 deletion terraform/state_machine_definition.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@
"cma":{
"event.$":"$",
"task_config":{
"bignbit_staging_bucket": "${StagingBucket}",
"cumulus_message":{
"input":"{$.payload}"
}
Expand Down Expand Up @@ -260,7 +261,7 @@
"cmr_environment":"{$.meta.cmr.cmrEnvironment}",
"cmr_clientid":"{$.meta.cmr.clientId}",
"current_item":"{$.current_item}",
"harmony_staging_bucket": "${HarmonyStagingBucket}",
"bignbit_staging_bucket": "${StagingBucket}",
"harmony_staging_path": "${HarmonyStagingPath}",
"big_config":"{$.payload.datasetConfigurationForBIG}",
"cumulus_message":{
Expand Down
Loading

0 comments on commit a523404

Please sign in to comment.