Skip to content

Commit

Permalink
reformat save cnm to separate step
Browse files Browse the repository at this point in the history
  • Loading branch information
torimcd committed May 28, 2024
1 parent bf86736 commit b146dcc
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 18 deletions.
104 changes: 104 additions & 0 deletions bignbit/save_cnm_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""lambda function that stores the CMA message into a s3 bucket"""
import json
import logging
import os

import boto3
from cumulus_logger import CumulusLogger
from cumulus_process import Process

CUMULUS_LOGGER = CumulusLogger('save_cmm_message')


class CNM(Process):
"""
A cumulus message adapter
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.logger = CUMULUS_LOGGER

def process(self):
"""
Upload CNM message into a s3 bucket
Returns
-------
dict
Same input sent to this function
"""
pobit_audit_bucket = self.config['pobit_audit_bucket']

collection_name = self.config['collection']
granule_ur = self.input.get('granule_ur')

cnm_content = self.input

cnm_key_name = collection_name + "/" + granule_ur + "." + cnm_content['submissionTime'] + "." + "cnm.json"

upload_cnm(pobit_audit_bucket, cnm_key_name, cnm_content)

return self.input


def upload_cnm(pobit_audit_bucket: str, cnm_key_name: str, cnm_content: dict):
"""
Upload CNM message into a s3 bucket
Parameters
----------
pobit_audit_bucket: str
Bucket name containing where CNM should be uploaded
cnm_key_name: str
Key to object location in bucket
cnm_content: dict
The CNM message to upload
Returns
-------
None
"""
s3_client = boto3.client('s3')
s3_client.put_object(
Body=json.dumps(cnm_content, default=str).encode("utf-8"),
Bucket=pobit_audit_bucket,
Key=cnm_key_name
)


def lambda_handler(event, context):
"""handler that gets called by aws lambda
Parameters
----------
event: dictionary
event from a lambda call
context: dictionary
context from a lambda call
Returns
----------
dict
A CNM json message
"""
# pylint: disable=duplicate-code
levels = {
'critical': logging.CRITICAL,
'error': logging.ERROR,
'warn': logging.WARNING,
'warning': logging.WARNING,
'info': logging.INFO,
'debug': logging.DEBUG
}

logging_level = os.environ.get('LOGGING_LEVEL', 'info')
CUMULUS_LOGGER.logger.level = levels.get(logging_level, 'info')
CUMULUS_LOGGER.setMetadata(event, context)

return CNM.cumulus_handler(event, context=context)


if __name__ == "__main__":
CNM()
18 changes: 2 additions & 16 deletions bignbit/send_to_gitc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
from cumulus_process import Process

from bignbit.image_set import ImageSet, to_cnm_product_dict
from bignbit import utils

REGION_NAME = 'us-west-2'
CUMULUS_LOGGER = CumulusLogger('send_to_gitc')

GIBS_REGION_ENV_NAME = "GIBS_REGION"
Expand Down Expand Up @@ -55,15 +53,12 @@ def process(self):
image_set = ImageSet(**self.input['image_set'])
gitc_id = image_set.name

granule_ur = self.input.get('granule_ur')
pobit_audit_bucket = self.input.get('pobit_audit_bucket')

notification_id = notify_gitc(image_set, cmr_provider, gitc_id, collection_name, granule_ur, pobit_audit_bucket)
notification_id = notify_gitc(image_set, cmr_provider, gitc_id, collection_name)

return notification_id


def notify_gitc(image_set: ImageSet, cmr_provider: str, gitc_id: str, collection_name: str, granule_ur: str, audit_bucket):
def notify_gitc(image_set: ImageSet, cmr_provider: str, gitc_id: str, collection_name: str):
"""
Builds and sends a CNM message to GITC
Expand All @@ -77,10 +72,6 @@ def notify_gitc(image_set: ImageSet, cmr_provider: str, gitc_id: str, collection
The unique identifier for this particular request to GITC
collection_name: str
Collection that this image set belongs to
granuleUR: str
The granuleUR for this image set
audit_bucket: str
The name of the S3 bucket where a copy of the outgoing CNM will be saved
Returns
-------
Expand Down Expand Up @@ -109,11 +100,6 @@ def notify_gitc(image_set: ImageSet, cmr_provider: str, gitc_id: str, collection

CUMULUS_LOGGER.debug(f'SQS send_message output: {response}')

cnm_key_name = collection_name + "/" + granule_ur + "." + cnm['submissionTime'] + "." + "cnm.json"

utils.upload_cnm(audit_bucket, cnm_key_name, cnm_json)
CUMULUS_LOGGER.debug('CNM uploaded to s3 audit bucket')

return cnm['identifier']


Expand Down
32 changes: 30 additions & 2 deletions terraform/state_machine_definition.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,6 @@
"task_config": {
"collection": "{$.collection}",
"cmr_provider": "{$.cmr_provider}",
"cmr_environment":"{$.meta.cmr.cmrEnvironment}",
"pobit_audit_bucket": "${PobitAuditBucket}",
"cumulus_message": {
"input": "{$}"
}
Expand Down Expand Up @@ -474,6 +472,36 @@
"MaxAttempts": 1
}
],
"Next": "SaveCNMMessage"
},
"SaveCNMMessage": {
"Type": "Task",
"Resource": "${SaveCNMMessageLambda}",
"Parameters": {
"cma": {
"event.$": "$",
"task_config": {
"collection": "{$.collection}",
"pobit_audit_bucket": "${PobitAuditBucket}",
"cumulus_message": {
"input": "{$.payload}"
}
}
}
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"Next": "WorkflowSucceeded"
},
"WorkflowSucceeded": {
Expand Down

0 comments on commit b146dcc

Please sign in to comment.