|
1 | 1 | import os
|
| 2 | +import time |
| 3 | +from datetime import datetime, timedelta |
2 | 4 |
|
3 | 5 | import boto3
|
| 6 | +from appsync_helpers import send_mutation |
4 | 7 | from aws_lambda_powertools import Logger, Tracer
|
5 | 8 | from aws_lambda_powertools.utilities.typing import LambdaContext
|
6 |
| -from appsync_helpers import send_mutation |
7 |
| - |
8 | 9 |
|
9 | 10 | tracer = Tracer()
|
10 | 11 | logger = Logger()
|
11 |
| - |
12 |
| -# EVENTS_DDB_TABLE_NAME = os.environ["DDB_TABLE"] |
13 |
| -# dynamodb = boto3.resource("dynamodb") |
14 |
| -# ddbTable = dynamodb.Table(EVENTS_DDB_TABLE_NAME) |
15 | 12 | client_ssm = boto3.client("ssm")
|
16 | 13 |
|
| 14 | +UNKNOWN_VERSION = "Unknown Version" |
| 15 | +MINIMUM_LOGGING_VERSION = [2, 1, 2, 7] |
| 16 | + |
17 | 17 |
|
18 | 18 | @logger.inject_lambda_context
|
19 | 19 | @tracer.capture_lambda_handler
|
20 | 20 | def lambda_handler(event: dict, context: LambdaContext):
|
21 | 21 | result = {}
|
| 22 | + start_time = time.time() |
| 23 | + |
22 | 24 | try:
|
| 25 | + logger.info( |
| 26 | + f"Processing {len(event['Instances']['InstanceInformationList'])} instances" |
| 27 | + ) |
23 | 28 | instances = event["Instances"]["InstanceInformationList"]
|
24 | 29 |
|
| 30 | + update_instance_data = [] |
25 | 31 | for instance in instances:
|
26 |
| - # get SW version from SSM |
27 |
| - response = client_ssm.list_inventory_entries( |
28 |
| - InstanceId=instance["InstanceId"], |
29 |
| - TypeName="AWS:Application", |
30 |
| - Filters=[{"Key": "Name", "Values": ["aws-deepracer-core"]}], |
31 |
| - ) |
32 |
| - logger.debug(response) |
33 |
| - |
34 |
| - instance["DeepRacerCoreVersion"] = "Unknown Version" |
35 |
| - for entry in response.get("Entries", []): |
36 |
| - if entry.get("Name") == "aws-deepracer-core": |
37 |
| - instance["DeepRacerCoreVersion"] = entry.get( |
38 |
| - "Version", "Unknown Version" |
39 |
| - ) |
40 |
| - break |
41 | 32 |
|
42 |
| - instance["LoggingCapable"] = False |
43 | 33 | try:
|
44 |
| - version_str = instance["DeepRacerCoreVersion"].split("+")[0] |
45 |
| - version_parts = list(map(int, version_str.split("."))) |
46 |
| - if version_parts >= [2, 1, 2, 7]: |
47 |
| - instance["LoggingCapable"] = True |
48 |
| - except ValueError: |
49 |
| - pass |
50 |
| - |
51 |
| - # get tags from SSM |
52 |
| - tags_response = client_ssm.list_tags_for_resource( |
53 |
| - ResourceType="ManagedInstance", |
54 |
| - ResourceId=instance["InstanceId"], |
55 |
| - ) |
56 |
| - logger.debug(tags_response) |
57 |
| - |
58 |
| - # list of tags that we copy from SSM to DynamoBD table |
59 |
| - tag_keys_to_copy = [ |
60 |
| - "fleetName", |
61 |
| - "fleetId", |
62 |
| - "DeviceUiPassword", |
63 |
| - "Type", |
64 |
| - ] |
65 |
| - for tag in tags_response["TagList"]: |
66 |
| - if tag["Key"] in tag_keys_to_copy: |
67 |
| - instance[tag["Key"]] = tag["Value"] |
68 |
| - |
69 |
| - # delete all keys in instance containing 'Association' |
70 |
| - keys_to_delete = [ |
71 |
| - key |
72 |
| - for key in instance.keys() |
73 |
| - if "Association" in key or "Source" in key |
74 |
| - ] |
75 |
| - for key in keys_to_delete: |
76 |
| - del instance[key] |
77 |
| - |
78 |
| - logger.info(instances) |
79 |
| - send_status_update(instances) |
| 34 | + if instance["PingStatus"] == "Online": |
| 35 | + # Get core version info and check logging capability |
| 36 | + fetch_deepracer_core_version_and_check_logging(instance) |
| 37 | + |
| 38 | + # Get and process tags |
| 39 | + fetch_and_process_tags(instance) |
| 40 | + |
| 41 | + else: |
| 42 | + # Skip any update if LastPingDateTime is more than 90 days in the past |
| 43 | + last_ping = datetime.strptime( |
| 44 | + instance["LastPingDateTime"], "%Y-%m-%dT%H:%M:%S.%fZ" |
| 45 | + ) |
| 46 | + if datetime.utcnow() - last_ping > timedelta(days=90): |
| 47 | + logger.info( |
| 48 | + f"Instance {instance['InstanceId']} last pinged more than 90 days ago, skipping." |
| 49 | + ) |
| 50 | + continue |
| 51 | + |
| 52 | + # Clean up instance data |
| 53 | + clean_instance_data(instance) |
| 54 | + |
| 55 | + except Exception as e: |
| 56 | + logger.warning( |
| 57 | + f"Error processing instance {instance['InstanceId']}: {e}" |
| 58 | + ) |
| 59 | + |
| 60 | + update_instance_data.append(instance) |
| 61 | + |
| 62 | + logger.info(update_instance_data) |
| 63 | + send_status_update(update_instance_data) |
80 | 64 |
|
81 | 65 | if "NextToken" in event["Instances"]:
|
82 | 66 | result = {"result": "success", "NextToken": event["Instances"]["NextToken"]}
|
83 | 67 | else:
|
84 | 68 | result = {"result": "success"}
|
85 | 69 |
|
| 70 | + logger.info( |
| 71 | + f"Successfully processed all instances in {time.time() - start_time:.2f} seconds" |
| 72 | + ) |
86 | 73 | except Exception as e:
|
87 |
| - logger.exception(e) |
88 |
| - result = {"result": "error"} |
| 74 | + logger.exception(f"Error processing instances: {e}") |
| 75 | + result = {"result": "error", "message": str(e)} |
89 | 76 |
|
90 | 77 | return result
|
91 | 78 |
|
92 | 79 |
|
| 80 | +def fetch_deepracer_core_version_and_check_logging(instance): |
| 81 | + """Fetch DeepRacer core version from SSM inventory and check logging capability.""" |
| 82 | + # Set default values |
| 83 | + instance["DeepRacerCoreVersion"] = UNKNOWN_VERSION |
| 84 | + instance["LoggingCapable"] = False |
| 85 | + |
| 86 | + try: |
| 87 | + # Fetch version from inventory |
| 88 | + response = client_ssm.list_inventory_entries( |
| 89 | + InstanceId=instance["InstanceId"], |
| 90 | + TypeName="AWS:Application", |
| 91 | + Filters=[{"Key": "Name", "Values": ["aws-deepracer-core"]}], |
| 92 | + ) |
| 93 | + logger.debug(response) |
| 94 | + |
| 95 | + # Process version information |
| 96 | + for entry in response.get("Entries", []): |
| 97 | + if entry.get("Name") == "aws-deepracer-core": |
| 98 | + version = entry.get("Version", UNKNOWN_VERSION) |
| 99 | + instance["DeepRacerCoreVersion"] = version |
| 100 | + |
| 101 | + # Check logging capability based on version |
| 102 | + if version != UNKNOWN_VERSION: |
| 103 | + try: |
| 104 | + version_str = version.split("+")[0] |
| 105 | + version_parts = list(map(int, version_str.split("."))) |
| 106 | + if version_parts >= MINIMUM_LOGGING_VERSION: |
| 107 | + instance["LoggingCapable"] = True |
| 108 | + except ValueError: |
| 109 | + pass |
| 110 | + break |
| 111 | + except Exception as e: |
| 112 | + logger.warning(f"Error fetching DeepRacer core version: {e}") |
| 113 | + |
| 114 | + |
| 115 | +def fetch_and_process_tags(instance): |
| 116 | + """Fetch and process tags from SSM.""" |
| 117 | + tags_response = client_ssm.list_tags_for_resource( |
| 118 | + ResourceType=instance["ResourceType"], |
| 119 | + ResourceId=instance["InstanceId"], |
| 120 | + ) |
| 121 | + logger.debug(tags_response) |
| 122 | + |
| 123 | + tag_keys_to_copy = [ |
| 124 | + "fleetName", |
| 125 | + "fleetId", |
| 126 | + "DeviceUiPassword", |
| 127 | + "Type", |
| 128 | + ] |
| 129 | + for tag in tags_response["TagList"]: |
| 130 | + if tag["Key"] in tag_keys_to_copy: |
| 131 | + instance[tag["Key"]] = tag["Value"] |
| 132 | + |
| 133 | + |
| 134 | +def clean_instance_data(instance): |
| 135 | + """Clean up instance data by keeping only the required fields.""" |
| 136 | + # Define the list of fields to keep |
| 137 | + if instance.get("PingStatus") == "ConnectionLost": |
| 138 | + # For disconnected instances, only keep minimal information |
| 139 | + allowed_fields = ["InstanceId", "PingStatus", "LastPingDateTime"] |
| 140 | + else: |
| 141 | + # For connected instances, keep all relevant fields |
| 142 | + allowed_fields = [ |
| 143 | + "InstanceId", |
| 144 | + "PingStatus", |
| 145 | + "LastPingDateTime", |
| 146 | + "AgentVersion", |
| 147 | + "IsLatestVersion", |
| 148 | + "PlatformType", |
| 149 | + "PlatformName", |
| 150 | + "PlatformVersion", |
| 151 | + "ActivationId", |
| 152 | + "IamRole", |
| 153 | + "RegistrationDate", |
| 154 | + "ResourceType", |
| 155 | + "Name", |
| 156 | + "IpAddress", |
| 157 | + "ComputerName", |
| 158 | + "fleetId", |
| 159 | + "fleetName", |
| 160 | + "Type", |
| 161 | + "DeviceUiPassword", |
| 162 | + "DeepRacerCoreVersion", |
| 163 | + "LoggingCapable", |
| 164 | + ] |
| 165 | + |
| 166 | + # Get a list of keys to delete (keys that aren't in the allowed list) |
| 167 | + keys_to_delete = [key for key in list(instance.keys()) if key not in allowed_fields] |
| 168 | + |
| 169 | + # Delete all unwanted keys |
| 170 | + for key in keys_to_delete: |
| 171 | + del instance[key] |
| 172 | + |
| 173 | + |
93 | 174 | def send_status_update(instances):
|
94 | 175 |
|
95 | 176 | # Prepare the mutation
|
|
0 commit comments