From b146dcc6be7b4c09447fe217fdb287ce12a0a155 Mon Sep 17 00:00:00 2001 From: Victoria McDonald Date: Tue, 28 May 2024 12:57:50 -0700 Subject: [PATCH] reformat save cnm to separate step --- bignbit/save_cnm_message.py | 104 +++++++++++++++++++++++++ bignbit/send_to_gitc.py | 18 +---- terraform/state_machine_definition.tpl | 32 +++++++- 3 files changed, 136 insertions(+), 18 deletions(-) create mode 100644 bignbit/save_cnm_message.py diff --git a/bignbit/save_cnm_message.py b/bignbit/save_cnm_message.py new file mode 100644 index 0000000..90d886b --- /dev/null +++ b/bignbit/save_cnm_message.py @@ -0,0 +1,104 @@ +"""lambda function that stores the CMA message into a s3 bucket""" +import json +import logging +import os + +import boto3 +from cumulus_logger import CumulusLogger +from cumulus_process import Process + +CUMULUS_LOGGER = CumulusLogger('save_cmm_message') + + +class CNM(Process): + """ + A cumulus message adapter + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.logger = CUMULUS_LOGGER + + def process(self): + """ + Upload CNM message into a s3 bucket + + Returns + ------- + dict + Same input sent to this function + + """ + pobit_audit_bucket = self.config['pobit_audit_bucket'] + + collection_name = self.config['collection'] + granule_ur = self.input.get('granule_ur') + + cnm_content = self.input + + cnm_key_name = collection_name + "/" + granule_ur + "." + cnm_content['submissionTime'] + "." + "cnm.json" + + upload_cnm(pobit_audit_bucket, cnm_key_name, cnm_content) + + return self.input + + +def upload_cnm(pobit_audit_bucket: str, cnm_key_name: str, cnm_content: dict): + """ + Upload CNM message into a s3 bucket + + Parameters + ---------- + pobit_audit_bucket: str + Bucket name containing where CNM should be uploaded + + cnm_key_name: str + Key to object location in bucket + + cnm_content: dict + The CNM message to upload + + Returns + ------- + None + """ + s3_client = boto3.client('s3') + s3_client.put_object( + Body=json.dumps(cnm_content, default=str).encode("utf-8"), + Bucket=pobit_audit_bucket, + Key=cnm_key_name + ) + + +def lambda_handler(event, context): + """handler that gets called by aws lambda + Parameters + ---------- + event: dictionary + event from a lambda call + context: dictionary + context from a lambda call + Returns + ---------- + dict + A CNM json message + """ + # pylint: disable=duplicate-code + levels = { + 'critical': logging.CRITICAL, + 'error': logging.ERROR, + 'warn': logging.WARNING, + 'warning': logging.WARNING, + 'info': logging.INFO, + 'debug': logging.DEBUG + } + + logging_level = os.environ.get('LOGGING_LEVEL', 'info') + CUMULUS_LOGGER.logger.level = levels.get(logging_level, 'info') + CUMULUS_LOGGER.setMetadata(event, context) + + return CNM.cumulus_handler(event, context=context) + + +if __name__ == "__main__": + CNM() diff --git a/bignbit/send_to_gitc.py b/bignbit/send_to_gitc.py index 77c3e25..4d7aacc 100644 --- a/bignbit/send_to_gitc.py +++ b/bignbit/send_to_gitc.py @@ -10,9 +10,7 @@ from cumulus_process import Process from bignbit.image_set import ImageSet, to_cnm_product_dict -from bignbit import utils -REGION_NAME = 'us-west-2' CUMULUS_LOGGER = CumulusLogger('send_to_gitc') GIBS_REGION_ENV_NAME = "GIBS_REGION" @@ -55,15 +53,12 @@ def process(self): image_set = ImageSet(**self.input['image_set']) gitc_id = image_set.name - granule_ur = self.input.get('granule_ur') - pobit_audit_bucket = self.input.get('pobit_audit_bucket') - - notification_id = notify_gitc(image_set, cmr_provider, gitc_id, collection_name, granule_ur, pobit_audit_bucket) + notification_id = notify_gitc(image_set, cmr_provider, gitc_id, collection_name) return notification_id -def notify_gitc(image_set: ImageSet, cmr_provider: str, gitc_id: str, collection_name: str, granule_ur: str, audit_bucket): +def notify_gitc(image_set: ImageSet, cmr_provider: str, gitc_id: str, collection_name: str): """ Builds and sends a CNM message to GITC @@ -77,10 +72,6 @@ def notify_gitc(image_set: ImageSet, cmr_provider: str, gitc_id: str, collection The unique identifier for this particular request to GITC collection_name: str Collection that this image set belongs to - granuleUR: str - The granuleUR for this image set - audit_bucket: str - The name of the S3 bucket where a copy of the outgoing CNM will be saved Returns ------- @@ -109,11 +100,6 @@ def notify_gitc(image_set: ImageSet, cmr_provider: str, gitc_id: str, collection CUMULUS_LOGGER.debug(f'SQS send_message output: {response}') - cnm_key_name = collection_name + "/" + granule_ur + "." + cnm['submissionTime'] + "." + "cnm.json" - - utils.upload_cnm(audit_bucket, cnm_key_name, cnm_json) - CUMULUS_LOGGER.debug('CNM uploaded to s3 audit bucket') - return cnm['identifier'] diff --git a/terraform/state_machine_definition.tpl b/terraform/state_machine_definition.tpl index 6e18172..6d5f258 100644 --- a/terraform/state_machine_definition.tpl +++ b/terraform/state_machine_definition.tpl @@ -438,8 +438,6 @@ "task_config": { "collection": "{$.collection}", "cmr_provider": "{$.cmr_provider}", - "cmr_environment":"{$.meta.cmr.cmrEnvironment}", - "pobit_audit_bucket": "${PobitAuditBucket}", "cumulus_message": { "input": "{$}" } @@ -474,6 +472,36 @@ "MaxAttempts": 1 } ], + "Next": "SaveCNMMessage" + }, + "SaveCNMMessage": { + "Type": "Task", + "Resource": "${SaveCNMMessageLambda}", + "Parameters": { + "cma": { + "event.$": "$", + "task_config": { + "collection": "{$.collection}", + "pobit_audit_bucket": "${PobitAuditBucket}", + "cumulus_message": { + "input": "{$.payload}" + } + } + } + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 2, + "MaxAttempts": 6, + "BackoffRate": 2 + } + ], "Next": "WorkflowSucceeded" }, "WorkflowSucceeded": {