Skip to content

Commit

Permalink
Feature/issue 7 - Remove wait for GITC response (#30)
Browse files Browse the repository at this point in the history
* fix type in readme

* change task token to uuid in send to gitc

* remove wait for task token & gitc response handler from state machine

* change gitc response handler to invoke save cma message

* fix typo

* fix context parameter

* convert uuid to string

* fix json typo

* remove uuid lib to use stdlib uuid

* change identifier to image set name instead of uuid

* remove uuid import

* fix json formatting of cma invoke

* add granule concept ID to identifier

* update image set name with granule conceptid

* remove whitespace

* move get umm json to utils and save gitc outgoing cnm

* remove unused import

* try setting cmr query env based on stage

* remove task token from tests

* fix granule index

* change how image set name defined

* add region to ssm client in utils

* add ssm parameters to gitc lambdas

* change region reference

* update how granule name referenced in send to gitc

* remove unused cmr var

* fix case

* fix cnm parsing

* reformat save cnm to separate step

* update tf vars for save cnm

* fix type in module definition

* fix cnm in cma

* fix collection reference in save cnm

* add debugging log statements

* fix input

* update save cnm input

* debugging

* debugging

* linting

* debugging

* debugging

* linting

* fix state machine

* update state machine

* reorg state machine

* missing comma

* move save cnm into map

* fix state machine transitions

* fix sm

* fix boolean

* debugging cnm input

* fix input

* change input

* debug input

* add cnm as config parameter

* lint

* fix cnm payload

* remove debugging statements

* add prefix to cnm path and fix gitc response

* increase handle gitc response timeout

* update changelog

* add original shortname as cnm prefix

* revert shortname and change collection ref to save cnm & cnm-r in same location

* remove save cma lambda no longer used

* remove EDL env params from sendtogitc

* change parsing of granule concept id
  • Loading branch information
torimcd authored Jun 17, 2024
1 parent d47a467 commit b95dbf3
Show file tree
Hide file tree
Showing 14 changed files with 171 additions and 141 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
### Deprecated
### Removed
- [issues/7](https://github.com/podaac/bignbit/issues/15): Remove the wait for GITC response
### Fixed
### Security

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ _Visual representation of the bignbit step function state machine:_
## MacOS

1. Install miniconda (or conda) and [poetry](https://python-poetry.org/)
2. Run `conda env create -f conda-environment.yml` to install GDAL
2. Run `conda env create -f conda-environment.yaml` to install GDAL
3. Activate the bignbit conda environment `conda activate bignbit`
4. Install python package and dependencies `poetry install`
5. Verify tests pass `poetry run pytest tests/`
13 changes: 10 additions & 3 deletions bignbit/build_image_sets.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from cumulus_logger import CumulusLogger
from cumulus_process import Process

from bignbit.image_set import from_big_output, IncompleteImageSet
from bignbit.image_set import from_big_output, IncompleteImageSet, ImageSet

CUMULUS_LOGGER = CumulusLogger('build_image_sets')

Expand Down Expand Up @@ -52,11 +52,18 @@ def process(self):
del response_payload['big']
response_payload['pobit'] = []

for image_set in image_sets:
for big_image_set in image_sets:
pobit_image_set = ImageSet(
name=big_image_set.name + '_' + self.input['granules'][0]['cmrConceptId'],
image=big_image_set.image,
image_metadata=big_image_set.image_metadata,
world_file=big_image_set.world_file)

response_payload['pobit'].append({
'image_set': image_set._asdict(),
'image_set': pobit_image_set._asdict(),
'cmr_provider': self.config.get('cmr_provider'),
'collection_name': self.config.get('collection').get('name'),
'granule_ur': self.input['granules'][0]['granuleId']
})

return response_payload
Expand Down
32 changes: 2 additions & 30 deletions bignbit/get_granule_umm_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import logging
import os

import requests
from cumulus_logger import CumulusLogger
from cumulus_process import Process

Expand Down Expand Up @@ -32,39 +31,12 @@ def process(self):
"""
cmr_environment = self.config['cmr_environment']
cmr_link = self.input['granules'][0]['cmrLink']
cmr_concept_id = self.input['granules'][0]['cmrConceptId']

self.input['granule_umm_json'] = download_umm_json(cmr_link, cmr_environment)
self.input['granule_umm_json'] = utils.get_umm_json(cmr_concept_id, cmr_environment)
return self.input


def download_umm_json(cmr_link: str, cmr_environment: str) -> dict:
"""
Retrieve the umm-json document from the given cmr_link
Parameters
----------
cmr_link: str
Link to the umm-g for downloading
cmr_environment: str
CMR environment used to retrieve user token
Returns
-------
dict
The umm-json document
"""
edl_user, edl_pass = utils.get_edl_creds()
token = utils.get_cmr_user_token(edl_user, edl_pass, cmr_environment)

umm_json_response = requests.get(cmr_link, headers={'Authorization': f'Bearer {token}'}, timeout=10)
umm_json_response.raise_for_status()
umm_json = umm_json_response.json()

return umm_json


def lambda_handler(event, context):
"""handler that gets called by aws lambda
Parameters
Expand Down
27 changes: 16 additions & 11 deletions bignbit/handle_gitc_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import logging
import os
from json import loads
import boto3
from botocore.exceptions import ClientError

from bignbit import utils


def handler(event, _):
Expand Down Expand Up @@ -32,13 +32,18 @@ def handler(event, _):

for message in event["Records"]:
message_body = loads(message["body"])
task_token = message_body["identifier"]
client = boto3.client('stepfunctions')
try:
client.send_task_success(taskToken=task_token, output=json.dumps(message_body))
logger.info("Step function triggered for task token %s", task_token)
except ClientError:
logger.warning("Error sending task success for messageId %s task token %s",
message['messageId'], task_token,
exc_info=True)
gitc_id = message_body["identifier"]
collection_name = message_body["collection"]
cmr_env = os.environ['CMR_ENVIRONMENT']

granule_concept_id = gitc_id.rpartition('_')[-1]
umm_json = utils.get_umm_json(granule_concept_id, cmr_env)
granule_ur = umm_json['GranuleUR']

cnm_key_name = os.environ['POBIT_AUDIT_PATH_NAME'] + "/" + collection_name + "/" + granule_ur + "." + message_body['submissionTime'] + "." + "cnm-r.json"

utils.upload_cnm(os.environ['POBIT_AUDIT_BUCKET_NAME'], cnm_key_name, json.dumps(message_body))

logging.debug('CNM-R uploaded to s3 audit bucket for id %s', gitc_id)

return {"statusCode": 200, "body": "All good"}
39 changes: 23 additions & 16 deletions bignbit/save_cma_message.py → bignbit/save_cnm_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
from cumulus_logger import CumulusLogger
from cumulus_process import Process

CUMULUS_LOGGER = CumulusLogger('save_cma_message')
CUMULUS_LOGGER = CumulusLogger('save_cmm_message')


class CMA(Process):
class CNM(Process):
"""
A cumulus message adapter
"""
Expand All @@ -21,7 +21,7 @@ def __init__(self, *args, **kwargs):

def process(self):
"""
Upload CMA message into a s3 bucket
Upload CNM message into a s3 bucket
Returns
-------
Expand All @@ -30,37 +30,44 @@ def process(self):
"""
pobit_audit_bucket = self.config['pobit_audit_bucket']
cma_key_name = self.config['cma_key_name']
pobit_audit_path = self.config['pobit_audit_path']

upload_cma(pobit_audit_bucket, cma_key_name, self.input)
granule_ur = self.config['granule_ur']

cnm_content = self.config['cnm']
collection_name = cnm_content['collection']

cnm_key_name = pobit_audit_path + "/" + collection_name + "/" + granule_ur + "." + cnm_content['submissionTime'] + "." + "cnm.json"

upload_cnm(pobit_audit_bucket, cnm_key_name, cnm_content)

return self.input


def upload_cma(pobit_audit_bucket: str, cma_key_name: str, cma_content: dict):
def upload_cnm(pobit_audit_bucket: str, cnm_key_name: str, cnm_content: dict):
"""
Upload CMA message into a s3 bucket
Upload CNM message into a s3 bucket
Parameters
----------
pobit_audit_bucket: str
Bucket name containing where CMA should be uploaded
Bucket name containing where CNM should be uploaded
cma_key_name: str
cnm_key_name: str
Key to object location in bucket
cma_content: dict
The CMA message to upload
cnm_content: dict
The CNM message to upload
Returns
-------
None
"""
s3_client = boto3.client('s3')
s3_client.put_object(
Body=json.dumps(cma_content, default=str).encode("utf-8"),
Body=json.dumps(cnm_content, default=str).encode("utf-8"),
Bucket=pobit_audit_bucket,
Key=cma_key_name
Key=cnm_key_name
)


Expand All @@ -75,7 +82,7 @@ def lambda_handler(event, context):
Returns
----------
dict
A CMA json message
A CNM json message
"""
# pylint: disable=duplicate-code
levels = {
Expand All @@ -91,8 +98,8 @@ def lambda_handler(event, context):
CUMULUS_LOGGER.logger.level = levels.get(logging_level, 'info')
CUMULUS_LOGGER.setMetadata(event, context)

return CMA.cumulus_handler(event, context=context)
return CNM.cumulus_handler(event, context=context)


if __name__ == "__main__":
CMA()
CNM()
29 changes: 14 additions & 15 deletions bignbit/send_to_gitc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from bignbit.image_set import ImageSet, to_cnm_product_dict

REGION_NAME = 'us-west-2'
CUMULUS_LOGGER = CumulusLogger('send_to_gitc')

GIBS_REGION_ENV_NAME = "GIBS_REGION"
Expand Down Expand Up @@ -45,20 +44,19 @@ def process(self):
list of granules
"""

notification_id = ""
token = self.config.get('token')

if self.input is not None:
# Send ImageSet(s) to GITC for processing
collection_name = self.input.get('collection_name')
cmr_provider = self.input.get('cmr_provider')
image_set = ImageSet(**self.input['image_set'])
notification_id = notify_gitc(image_set, cmr_provider, token, collection_name)
gitc_id = image_set.name

cnm_message = notify_gitc(image_set, cmr_provider, gitc_id, collection_name)

return notification_id
return cnm_message


def notify_gitc(image_set: ImageSet, cmr_provider: str, token: str, collection_name: str):
def notify_gitc(image_set: ImageSet, cmr_provider: str, gitc_id: str, collection_name: str):
"""
Builds and sends a CNM message to GITC
Expand All @@ -68,8 +66,8 @@ def notify_gitc(image_set: ImageSet, cmr_provider: str, token: str, collection_n
The image set to send
cmr_provider: str
The provider sent in the CNM message
token: str
The token identifying this particular request to GITC
gitc_id: str
The unique identifier for this particular request to GITC
collection_name: str
Collection that this image set belongs to
Expand All @@ -82,7 +80,7 @@ def notify_gitc(image_set: ImageSet, cmr_provider: str, token: str, collection_n
queue_url = os.environ.get(GIBS_SQS_URL_ENV_NAME)
CUMULUS_LOGGER.info(f'Sending SQS message to GITC for image {image_set.name}')

cnm = construct_cnm(image_set, cmr_provider, token, collection_name)
cnm = construct_cnm(image_set, cmr_provider, gitc_id, collection_name)

cnm_json = json.dumps(cnm)
sqs_message_params = {
Expand All @@ -99,10 +97,11 @@ def notify_gitc(image_set: ImageSet, cmr_provider: str, token: str, collection_n
response = sqs.send_message(**sqs_message_params)

CUMULUS_LOGGER.debug(f'SQS send_message output: {response}')
return cnm['identifier']

return cnm


def construct_cnm(image_set: ImageSet, cmr_provider: str, token: str, collection_name: str):
def construct_cnm(image_set: ImageSet, cmr_provider: str, gitc_id: str, collection_name: str):
"""
Construct the CNM message for GITC
Expand All @@ -112,8 +111,8 @@ def construct_cnm(image_set: ImageSet, cmr_provider: str, token: str, collection
ImageSet for one image to be sent to gibs
cmr_provider: str
The provider sent in the CNM message
token: str
The token identifying this particular request to GITC
gitc_id: str
The unique identifier for this particular request to GITC
collection_name: str
Collection that this image set belongs to
Expand All @@ -131,7 +130,7 @@ def construct_cnm(image_set: ImageSet, cmr_provider: str, token: str, collection
"duplicationid": image_set.name,
"collection": new_collection,
"submissionTime": submission_time,
"identifier": token,
"identifier": gitc_id,
"product": product,
'provider': cmr_provider
}
Expand Down
Loading

0 comments on commit b95dbf3

Please sign in to comment.