diff --git a/CHANGELOG.md b/CHANGELOG.md index d2c9010..eea1736 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added ### Deprecated ### Removed +- [issues/7](https://github.com/podaac/bignbit/issues/15): Remove the wait for GITC response ### Fixed ### Security diff --git a/README.md b/README.md index abf20a3..fa8c066 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ _Visual representation of the bignbit step function state machine:_ ## MacOS 1. Install miniconda (or conda) and [poetry](https://python-poetry.org/) -2. Run `conda env create -f conda-environment.yml` to install GDAL +2. Run `conda env create -f conda-environment.yaml` to install GDAL 3. Activate the bignbit conda environment `conda activate bignbit` 4. Install python package and dependencies `poetry install` 5. Verify tests pass `poetry run pytest tests/` diff --git a/bignbit/build_image_sets.py b/bignbit/build_image_sets.py index 2d13c86..381bf18 100644 --- a/bignbit/build_image_sets.py +++ b/bignbit/build_image_sets.py @@ -6,7 +6,7 @@ from cumulus_logger import CumulusLogger from cumulus_process import Process -from bignbit.image_set import from_big_output, IncompleteImageSet +from bignbit.image_set import from_big_output, IncompleteImageSet, ImageSet CUMULUS_LOGGER = CumulusLogger('build_image_sets') @@ -52,11 +52,18 @@ def process(self): del response_payload['big'] response_payload['pobit'] = [] - for image_set in image_sets: + for big_image_set in image_sets: + pobit_image_set = ImageSet( + name=big_image_set.name + '_' + self.input['granules'][0]['cmrConceptId'], + image=big_image_set.image, + image_metadata=big_image_set.image_metadata, + world_file=big_image_set.world_file) + response_payload['pobit'].append({ - 'image_set': image_set._asdict(), + 'image_set': pobit_image_set._asdict(), 'cmr_provider': self.config.get('cmr_provider'), 'collection_name': self.config.get('collection').get('name'), + 'granule_ur': self.input['granules'][0]['granuleId'] }) return response_payload diff --git a/bignbit/get_granule_umm_json.py b/bignbit/get_granule_umm_json.py index b2f05e8..2e45e8e 100644 --- a/bignbit/get_granule_umm_json.py +++ b/bignbit/get_granule_umm_json.py @@ -4,7 +4,6 @@ import logging import os -import requests from cumulus_logger import CumulusLogger from cumulus_process import Process @@ -32,39 +31,12 @@ def process(self): """ cmr_environment = self.config['cmr_environment'] - cmr_link = self.input['granules'][0]['cmrLink'] + cmr_concept_id = self.input['granules'][0]['cmrConceptId'] - self.input['granule_umm_json'] = download_umm_json(cmr_link, cmr_environment) + self.input['granule_umm_json'] = utils.get_umm_json(cmr_concept_id, cmr_environment) return self.input -def download_umm_json(cmr_link: str, cmr_environment: str) -> dict: - """ - Retrieve the umm-json document from the given cmr_link - - Parameters - ---------- - cmr_link: str - Link to the umm-g for downloading - - cmr_environment: str - CMR environment used to retrieve user token - - Returns - ------- - dict - The umm-json document - """ - edl_user, edl_pass = utils.get_edl_creds() - token = utils.get_cmr_user_token(edl_user, edl_pass, cmr_environment) - - umm_json_response = requests.get(cmr_link, headers={'Authorization': f'Bearer {token}'}, timeout=10) - umm_json_response.raise_for_status() - umm_json = umm_json_response.json() - - return umm_json - - def lambda_handler(event, context): """handler that gets called by aws lambda Parameters diff --git a/bignbit/handle_gitc_response.py b/bignbit/handle_gitc_response.py index 1c40a37..b31f50e 100644 --- a/bignbit/handle_gitc_response.py +++ b/bignbit/handle_gitc_response.py @@ -3,8 +3,8 @@ import logging import os from json import loads -import boto3 -from botocore.exceptions import ClientError + +from bignbit import utils def handler(event, _): @@ -32,13 +32,18 @@ def handler(event, _): for message in event["Records"]: message_body = loads(message["body"]) - task_token = message_body["identifier"] - client = boto3.client('stepfunctions') - try: - client.send_task_success(taskToken=task_token, output=json.dumps(message_body)) - logger.info("Step function triggered for task token %s", task_token) - except ClientError: - logger.warning("Error sending task success for messageId %s task token %s", - message['messageId'], task_token, - exc_info=True) + gitc_id = message_body["identifier"] + collection_name = message_body["collection"] + cmr_env = os.environ['CMR_ENVIRONMENT'] + + granule_concept_id = gitc_id.rpartition('_')[-1] + umm_json = utils.get_umm_json(granule_concept_id, cmr_env) + granule_ur = umm_json['GranuleUR'] + + cnm_key_name = os.environ['POBIT_AUDIT_PATH_NAME'] + "/" + collection_name + "/" + granule_ur + "." + message_body['submissionTime'] + "." + "cnm-r.json" + + utils.upload_cnm(os.environ['POBIT_AUDIT_BUCKET_NAME'], cnm_key_name, json.dumps(message_body)) + + logging.debug('CNM-R uploaded to s3 audit bucket for id %s', gitc_id) + return {"statusCode": 200, "body": "All good"} diff --git a/bignbit/save_cma_message.py b/bignbit/save_cnm_message.py similarity index 62% rename from bignbit/save_cma_message.py rename to bignbit/save_cnm_message.py index dca1d8e..c926992 100644 --- a/bignbit/save_cma_message.py +++ b/bignbit/save_cnm_message.py @@ -7,10 +7,10 @@ from cumulus_logger import CumulusLogger from cumulus_process import Process -CUMULUS_LOGGER = CumulusLogger('save_cma_message') +CUMULUS_LOGGER = CumulusLogger('save_cmm_message') -class CMA(Process): +class CNM(Process): """ A cumulus message adapter """ @@ -21,7 +21,7 @@ def __init__(self, *args, **kwargs): def process(self): """ - Upload CMA message into a s3 bucket + Upload CNM message into a s3 bucket Returns ------- @@ -30,27 +30,34 @@ def process(self): """ pobit_audit_bucket = self.config['pobit_audit_bucket'] - cma_key_name = self.config['cma_key_name'] + pobit_audit_path = self.config['pobit_audit_path'] - upload_cma(pobit_audit_bucket, cma_key_name, self.input) + granule_ur = self.config['granule_ur'] + + cnm_content = self.config['cnm'] + collection_name = cnm_content['collection'] + + cnm_key_name = pobit_audit_path + "/" + collection_name + "/" + granule_ur + "." + cnm_content['submissionTime'] + "." + "cnm.json" + + upload_cnm(pobit_audit_bucket, cnm_key_name, cnm_content) return self.input -def upload_cma(pobit_audit_bucket: str, cma_key_name: str, cma_content: dict): +def upload_cnm(pobit_audit_bucket: str, cnm_key_name: str, cnm_content: dict): """ - Upload CMA message into a s3 bucket + Upload CNM message into a s3 bucket Parameters ---------- pobit_audit_bucket: str - Bucket name containing where CMA should be uploaded + Bucket name containing where CNM should be uploaded - cma_key_name: str + cnm_key_name: str Key to object location in bucket - cma_content: dict - The CMA message to upload + cnm_content: dict + The CNM message to upload Returns ------- @@ -58,9 +65,9 @@ def upload_cma(pobit_audit_bucket: str, cma_key_name: str, cma_content: dict): """ s3_client = boto3.client('s3') s3_client.put_object( - Body=json.dumps(cma_content, default=str).encode("utf-8"), + Body=json.dumps(cnm_content, default=str).encode("utf-8"), Bucket=pobit_audit_bucket, - Key=cma_key_name + Key=cnm_key_name ) @@ -75,7 +82,7 @@ def lambda_handler(event, context): Returns ---------- dict - A CMA json message + A CNM json message """ # pylint: disable=duplicate-code levels = { @@ -91,8 +98,8 @@ def lambda_handler(event, context): CUMULUS_LOGGER.logger.level = levels.get(logging_level, 'info') CUMULUS_LOGGER.setMetadata(event, context) - return CMA.cumulus_handler(event, context=context) + return CNM.cumulus_handler(event, context=context) if __name__ == "__main__": - CMA() + CNM() diff --git a/bignbit/send_to_gitc.py b/bignbit/send_to_gitc.py index 66ac7f1..4e3f42e 100644 --- a/bignbit/send_to_gitc.py +++ b/bignbit/send_to_gitc.py @@ -11,7 +11,6 @@ from bignbit.image_set import ImageSet, to_cnm_product_dict -REGION_NAME = 'us-west-2' CUMULUS_LOGGER = CumulusLogger('send_to_gitc') GIBS_REGION_ENV_NAME = "GIBS_REGION" @@ -45,20 +44,19 @@ def process(self): list of granules """ - notification_id = "" - token = self.config.get('token') - if self.input is not None: # Send ImageSet(s) to GITC for processing collection_name = self.input.get('collection_name') cmr_provider = self.input.get('cmr_provider') image_set = ImageSet(**self.input['image_set']) - notification_id = notify_gitc(image_set, cmr_provider, token, collection_name) + gitc_id = image_set.name + + cnm_message = notify_gitc(image_set, cmr_provider, gitc_id, collection_name) - return notification_id + return cnm_message -def notify_gitc(image_set: ImageSet, cmr_provider: str, token: str, collection_name: str): +def notify_gitc(image_set: ImageSet, cmr_provider: str, gitc_id: str, collection_name: str): """ Builds and sends a CNM message to GITC @@ -68,8 +66,8 @@ def notify_gitc(image_set: ImageSet, cmr_provider: str, token: str, collection_n The image set to send cmr_provider: str The provider sent in the CNM message - token: str - The token identifying this particular request to GITC + gitc_id: str + The unique identifier for this particular request to GITC collection_name: str Collection that this image set belongs to @@ -82,7 +80,7 @@ def notify_gitc(image_set: ImageSet, cmr_provider: str, token: str, collection_n queue_url = os.environ.get(GIBS_SQS_URL_ENV_NAME) CUMULUS_LOGGER.info(f'Sending SQS message to GITC for image {image_set.name}') - cnm = construct_cnm(image_set, cmr_provider, token, collection_name) + cnm = construct_cnm(image_set, cmr_provider, gitc_id, collection_name) cnm_json = json.dumps(cnm) sqs_message_params = { @@ -99,10 +97,11 @@ def notify_gitc(image_set: ImageSet, cmr_provider: str, token: str, collection_n response = sqs.send_message(**sqs_message_params) CUMULUS_LOGGER.debug(f'SQS send_message output: {response}') - return cnm['identifier'] + + return cnm -def construct_cnm(image_set: ImageSet, cmr_provider: str, token: str, collection_name: str): +def construct_cnm(image_set: ImageSet, cmr_provider: str, gitc_id: str, collection_name: str): """ Construct the CNM message for GITC @@ -112,8 +111,8 @@ def construct_cnm(image_set: ImageSet, cmr_provider: str, token: str, collection ImageSet for one image to be sent to gibs cmr_provider: str The provider sent in the CNM message - token: str - The token identifying this particular request to GITC + gitc_id: str + The unique identifier for this particular request to GITC collection_name: str Collection that this image set belongs to @@ -131,7 +130,7 @@ def construct_cnm(image_set: ImageSet, cmr_provider: str, token: str, collection "duplicationid": image_set.name, "collection": new_collection, "submissionTime": submission_time, - "identifier": token, + "identifier": gitc_id, "product": product, 'provider': cmr_provider } diff --git a/bignbit/utils.py b/bignbit/utils.py index 7844acf..55a1156 100644 --- a/bignbit/utils.py +++ b/bignbit/utils.py @@ -27,7 +27,7 @@ def get_edl_creds() -> (str, str): global ED_USER # pylint: disable=W0603 global ED_PASS # pylint: disable=W0603 - ssm = boto3.client('ssm') + ssm = boto3.client('ssm', region_name='us-west-2') if not ED_USER: edl_user_ssm_name = os.environ.get('EDL_USER_SSM') @@ -110,6 +110,35 @@ def get_cmr_user_token(edl_user: str, edl_pass: str, cmr_env: str) -> str: return EDL_USER_TOKEN['access_token'] +def get_umm_json(granule_concept_id: str, cmr_environment): + """ + Get the granuleUR for the given concept ID + + Parameters + ---------- + granule_concept_id: str + the concept ID for the granule to find + + cmr_environment: str + CMR environment used to retrieve user token + + Returns + ------- + dict + The umm-json document + """ + + edl_user, edl_pass = get_edl_creds() + token = get_cmr_user_token(edl_user, edl_pass, cmr_environment) + + cmr_link = f'https://cmr.{"uat." if cmr_environment == "UAT" else ""}earthdata.nasa.gov/search/concepts/{granule_concept_id}.umm_json' + umm_json_response = requests.get(cmr_link, headers={'Authorization': f'Bearer {token}'}, timeout=10) + umm_json_response.raise_for_status() + umm_json = umm_json_response.json() + + return umm_json + + def sha512sum(filepath: pathlib.Path): """ Generate a SHA512 hash for the given file @@ -131,6 +160,34 @@ def sha512sum(filepath: pathlib.Path): return hash512.hexdigest() +def upload_cnm(bucket_name: str, key_name: str, cnm_content: dict): + """ + Upload CNM message into a s3 bucket + + Parameters + ---------- + bucket_name: str + Bucket name containing where CNM should be uploaded + + key_name: str + Key to object location in bucket + + cnm_content: dict + The CNM message to upload + + Returns + ------- + None + """ + s3_client = boto3.client('s3') + s3_client.put_object( + Body=cnm_content, + Bucket=bucket_name, + Key=key_name + ) + return f's3://{bucket_name}/{key_name}' + + def upload_to_s3(filepath: pathlib.Path, bucket_name: str, object_key: str): """ Uploads a file to S3 diff --git a/poetry.lock b/poetry.lock index 78d0990..95c8bac 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. [[package]] name = "aiohttp" @@ -1463,7 +1463,6 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, - {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, diff --git a/terraform/lambda_functions.tf b/terraform/lambda_functions.tf index f91efa2..e788d8c 100644 --- a/terraform/lambda_functions.tf +++ b/terraform/lambda_functions.tf @@ -430,7 +430,7 @@ resource "aws_lambda_function" "handle_gitc_response" { } function_name = "${local.lambda_resources_name}-handle_gitc_response-lambda" role = var.lambda_role.arn - timeout = 5 + timeout = 15 memory_size = 128 environment { @@ -438,6 +438,11 @@ resource "aws_lambda_function" "handle_gitc_response" { STACK_NAME = local.lambda_resources_name CUMULUS_MESSAGE_ADAPTER_DIR = "/opt/" REGION = var.region + POBIT_AUDIT_BUCKET_NAME = var.pobit_audit_bucket + POBIT_AUDIT_PATH_NAME = var.pobit_audit_path + CMR_ENVIRONMENT = local.environment != "OPS" ? "UAT" : "" + EDL_USER_SSM = var.edl_user_ssm + EDL_PASS_SSM = var.edl_pass_ssm } } @@ -449,7 +454,7 @@ resource "aws_lambda_function" "handle_gitc_response" { tags = local.tags } -resource "aws_lambda_function" "save_cma_message" { +resource "aws_lambda_function" "save_cnm_message" { depends_on = [ null_resource.upload_ecr_image ] @@ -457,9 +462,9 @@ resource "aws_lambda_function" "save_cma_message" { package_type = "Image" image_uri = "${aws_ecr_repository.lambda-image-repo.repository_url}:${local.ecr_image_tag}" image_config { - command = ["bignbit.save_cma_message.lambda_handler"] + command = ["bignbit.save_cnm_message.lambda_handler"] } - function_name = "${local.lambda_resources_name}-save_cma_message-lambda" + function_name = "${local.lambda_resources_name}-save_cnm_message-lambda" role = var.lambda_role.arn timeout = 15 memory_size = 128 diff --git a/terraform/main.tf b/terraform/main.tf index 41847e7..9df6a79 100644 --- a/terraform/main.tf +++ b/terraform/main.tf @@ -27,7 +27,7 @@ data "template_file" "workflow_definition" { GenerateImageMetadataLambda = aws_lambda_function.generate_image_metadata.arn, BuildImageSetsLambda = aws_lambda_function.build_image_sets.arn, SendToGITCLambda = aws_lambda_function.send_to_gitc.arn, - SaveCMAMessageLambda = aws_lambda_function.save_cma_message.arn, + SaveCNMMessageLambda = aws_lambda_function.save_cnm_message.arn, PobitAuditBucket = var.pobit_audit_bucket, PobitAuditPath = var.pobit_audit_path } diff --git a/terraform/outputs.tf b/terraform/outputs.tf index 6eb36d6..dc89d6b 100644 --- a/terraform/outputs.tf +++ b/terraform/outputs.tf @@ -74,8 +74,8 @@ output "pobit_send_to_gitc_arn" { value = aws_lambda_function.send_to_gitc.arn } -output "pobit_save_cma_message_arn" { - value = aws_lambda_function.save_cma_message.arn +output "pobit_save_cnm_message_arn" { + value = aws_lambda_function.save_cnm_message.arn } output "workflow_definition" { diff --git a/terraform/state_machine_definition.tpl b/terraform/state_machine_definition.tpl index 20a9b3d..d472fa6 100644 --- a/terraform/state_machine_definition.tpl +++ b/terraform/state_machine_definition.tpl @@ -501,7 +501,6 @@ "task_config": { "collection": "{$.collection}", "cmr_provider": "{$.cmr_provider}", - "token.$": "$$.Task.Token", "cumulus_message": { "input": "{$}" } @@ -510,24 +509,43 @@ } }, "Type": "Task", - "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken", + "Resource": "arn:aws:states:::lambda:invoke", "TimeoutSeconds": 86400, - "End": true, - "ResultPath": "$.gitc_response", - "Catch": [ + "ResultPath": "$.cnm", + "Next": "SaveCNMMessage" + }, + "SaveCNMMessage": { + "Type": "Task", + "Resource": "${SaveCNMMessageLambda}", + "Parameters": { + "cma": { + "event.$": "$", + "task_config": { + "collection": "{$.collection_name}", + "granule_ur": "{$.granule_ur}", + "cnm": "{$.cnm.Payload.payload}", + "pobit_audit_bucket": "${PobitAuditBucket}", + "pobit_audit_path": "${PobitAuditPath}", + "cumulus_message": { + "input": "{$}" + } + } + } + }, + "Retry": [ { "ErrorEquals": [ - "States.Timeout" - ], - "ResultPath": "$.gitc_response", - "Next": "GITC Timeout" + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 2, + "MaxAttempts": 6, + "BackoffRate": 2 } - ] - }, - "GITC Timeout": { - "Type": "Pass", - "End": true, - "Comment": "No response was received from GITC within the configured timeout" + ], + "End": true } } }, @@ -550,44 +568,6 @@ "MaxAttempts": 3 } ], - "Next": "Save CMA Message" - }, - "Save CMA Message": { - "Type": "Task", - "Resource": "${SaveCMAMessageLambda}", - "Parameters": { - "cma": { - "event.$": "$", - "task_config": { - "pobit_audit_bucket": "${PobitAuditBucket}", - "cma_key_name.$": "States.Format('${PobitAuditPath}/{}/{}.{}.cma.json', $.meta.collection.name, $.payload.granules[0].granuleId, $$.State.EnteredTime)", - "cumulus_message": { - "input": "{$.payload}" - } - } - } - }, - "Retry": [ - { - "ErrorEquals": [ - "Lambda.ServiceException", - "Lambda.AWSLambdaException", - "Lambda.SdkClientException", - "Lambda.TooManyRequestsException" - ], - "IntervalSeconds": 2, - "MaxAttempts": 6, - "BackoffRate": 2 - }, - { - "ErrorEquals": [ - "Lambda.Unknown" - ], - "BackoffRate": 2, - "IntervalSeconds": 2, - "MaxAttempts": 2 - } - ], "Next": "WorkflowSucceeded" }, "WorkflowSucceeded": { diff --git a/tests/test_send_to_gibs_moto.py b/tests/test_send_to_gibs_moto.py index edbd2b1..50828f8 100644 --- a/tests/test_send_to_gibs_moto.py +++ b/tests/test_send_to_gibs_moto.py @@ -8,7 +8,6 @@ import os import threading import urllib.request -import uuid import boto3 import jsonschema @@ -106,7 +105,6 @@ def test_process_sends_message(fake_response_sqs_queue, mock_gitc_success, cnm_v sub_event = event.copy() del sub_event['cma']['event']['payload']['pobit'] sub_event['cma']['event']['payload'] = imageset - sub_event['cma']['task_config']['token'] = str(uuid.uuid4()) bignbit.send_to_gitc.lambda_handler(sub_event, {}) sent_messages = mock_gitc_success.wait_for_messages(count=1)