From 1aafef3a0cbdfd9e67144ab2e999d790c93a165d Mon Sep 17 00:00:00 2001 From: Luigi Di Fraia Date: Fri, 14 Jun 2024 16:54:52 +0100 Subject: [PATCH 1/2] fix: address typo in task definition variable name --- .../lambda_function.py | 14 +++++++------- .../fargate-examples/queue-processing/main.tf | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/application-code/lambda-function-queue-trigger/lambda_function.py b/application-code/lambda-function-queue-trigger/lambda_function.py index a6a94ab0..cd37f6bb 100644 --- a/application-code/lambda-function-queue-trigger/lambda_function.py +++ b/application-code/lambda-function-queue-trigger/lambda_function.py @@ -12,7 +12,7 @@ def lambda_handler(event, context): pipeline_enabled = None TASK_CLUSTER = None TASK_CONTAINER = None - TASK_DEFINITON = None + TASK_DEFINITION = None TASK_SUBNET = None TASK_SECURITYGROUP = None response = ssm.get_parameters( @@ -22,7 +22,7 @@ def lambda_handler(event, context): 'PIPELINE_ECS_MAX_TASKS', 'PIPELINE_ECS_CLUSTER', 'PIPELINE_ECS_TASK_CONTAINER', - 'PIPELINE_ECS_TASK_DEFINITON', + 'PIPELINE_ECS_TASK_DEFINITION', 'PIPELINE_ECS_TASK_SECURITYGROUP', 'PIPELINE_ECS_TASK_SUBNET', 'PIPELINE_S3_DEST_PREFIX' @@ -42,9 +42,9 @@ def lambda_handler(event, context): TASK_CLUSTER = param['Value'] if param['Name'] == 'PIPELINE_ECS_TASK_CONTAINER': TASK_CONTAINER = param['Value'] - if param['Name'] == 'PIPELINE_ECS_TASK_DEFINITON': + if param['Name'] == 'PIPELINE_ECS_TASK_DEFINITION': taskdef = param['Value'] - TASK_DEFINITON = taskdef[:taskdef.rindex(':')] + TASK_DEFINITION = taskdef[:taskdef.rindex(':')] if param['Name'] == 'PIPELINE_ECS_TASK_SUBNET': TASK_SUBNET = param['Value'] if param['Name'] == 'PIPELINE_ECS_TASK_SECURITYGROUP': @@ -52,11 +52,11 @@ def lambda_handler(event, context): if param['Name'] == 'PIPELINE_S3_DEST_PREFIX': S3_DEST_PREFIX = param['Value'] if (sqs_url and pipeline_enabled and max_tasks and - TASK_CLUSTER and TASK_CONTAINER and TASK_DEFINITON and TASK_SUBNET and TASK_SECURITYGROUP): + TASK_CLUSTER and TASK_CONTAINER and TASK_DEFINITION and TASK_SUBNET and TASK_SECURITYGROUP): max_tasks = int(max_tasks) else: raise Exception("Required SSM: PIPELINE_ECS_MAX_TASKS,PIPELINE_UNPROCESSED_SQS_URL,PIPELINE_ENABLED,PIPELINE_ECS_CLUSTER," - "PIPELINE_ECS_TASK_CONTAINER,PIPELINE_ECS_TASK_DEFINITON,PIPELINE_ECS_TASK_SUBNET,PIPELINE_ECS_TASK_SECURITYGROUP,PIPELINE_S3_DEST_PREFIX") + "PIPELINE_ECS_TASK_CONTAINER,PIPELINE_ECS_TASK_DEFINITION,PIPELINE_ECS_TASK_SUBNET,PIPELINE_ECS_TASK_SECURITYGROUP,PIPELINE_S3_DEST_PREFIX") if (pipeline_enabled != "1"): print("ECS Pipeline is Disabled. Not starting tasks via Lambda.") return @@ -89,7 +89,7 @@ def lambda_handler(event, context): } ], cluster=TASK_CLUSTER, - taskDefinition=TASK_DEFINITON, + taskDefinition=TASK_DEFINITION, overrides={ 'containerOverrides': [ { diff --git a/terraform/fargate-examples/queue-processing/main.tf b/terraform/fargate-examples/queue-processing/main.tf index f9cd7f58..d997da3f 100644 --- a/terraform/fargate-examples/queue-processing/main.tf +++ b/terraform/fargate-examples/queue-processing/main.tf @@ -245,7 +245,7 @@ resource "aws_ssm_parameter" "ecs_cluster_name" { } resource "aws_ssm_parameter" "ecs_task_definition" { - name = "PIPELINE_ECS_TASK_DEFINITON" + name = "PIPELINE_ECS_TASK_DEFINITION" type = "String" value = module.ecs_service.task_definition_arn From 021b69cc0fff528c0596bc39c7b0bcc92505d85a Mon Sep 17 00:00:00 2001 From: Luigi Di Fraia Date: Sat, 15 Jun 2024 08:28:05 +0100 Subject: [PATCH 2/2] chore: start tasks based on processing batch size in the queue-processing example --- .../lambda_function.py | 59 ++++++++++--------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/application-code/lambda-function-queue-trigger/lambda_function.py b/application-code/lambda-function-queue-trigger/lambda_function.py index cd37f6bb..13f6dff9 100644 --- a/application-code/lambda-function-queue-trigger/lambda_function.py +++ b/application-code/lambda-function-queue-trigger/lambda_function.py @@ -1,14 +1,17 @@ -import boto3, os +import boto3 +from math import ceil sqs = boto3.client('sqs') ecs = boto3.client('ecs') ssm = boto3.client('ssm') + +batch_size = 10 max_tasks_per_run = 100 + def lambda_handler(event, context): max_tasks = None sqs_url = None - job_mode = None pipeline_enabled = None TASK_CLUSTER = None TASK_CONTAINER = None @@ -52,58 +55,59 @@ def lambda_handler(event, context): if param['Name'] == 'PIPELINE_S3_DEST_PREFIX': S3_DEST_PREFIX = param['Value'] if (sqs_url and pipeline_enabled and max_tasks and - TASK_CLUSTER and TASK_CONTAINER and TASK_DEFINITION and TASK_SUBNET and TASK_SECURITYGROUP): + TASK_CLUSTER and TASK_CONTAINER and TASK_DEFINITION and TASK_SUBNET and TASK_SECURITYGROUP): max_tasks = int(max_tasks) else: - raise Exception("Required SSM: PIPELINE_ECS_MAX_TASKS,PIPELINE_UNPROCESSED_SQS_URL,PIPELINE_ENABLED,PIPELINE_ECS_CLUSTER," - "PIPELINE_ECS_TASK_CONTAINER,PIPELINE_ECS_TASK_DEFINITION,PIPELINE_ECS_TASK_SUBNET,PIPELINE_ECS_TASK_SECURITYGROUP,PIPELINE_S3_DEST_PREFIX") - if (pipeline_enabled != "1"): + raise Exception("Required SSM: PIPELINE_ECS_MAX_TASKS,PIPELINE_UNPROCESSED_SQS_URL,PIPELINE_ENABLED," + "PIPELINE_ECS_CLUSTER,PIPELINE_ECS_TASK_CONTAINER,PIPELINE_ECS_TASK_DEFINITION," + "PIPELINE_ECS_TASK_SUBNET,PIPELINE_ECS_TASK_SECURITYGROUP,PIPELINE_S3_DEST_PREFIX") + if pipeline_enabled != "1": print("ECS Pipeline is Disabled. Not starting tasks via Lambda.") return sqs_response = sqs.get_queue_attributes( QueueUrl=sqs_url, - AttributeNames=[ 'ApproximateNumberOfMessages' ] + AttributeNames=['ApproximateNumberOfMessages'] ) sqs_queue_size = int(sqs_response['Attributes']['ApproximateNumberOfMessages']) print("Current SQS Queue size: " + str(sqs_queue_size)) if sqs_queue_size == 0: return ecs_response = ecs.list_tasks( - cluster=TASK_CLUSTER,maxResults=100,desiredStatus='RUNNING',family=TASK_CONTAINER) + cluster=TASK_CLUSTER, maxResults=100, desiredStatus='RUNNING', family=TASK_CONTAINER) current_running_tasks = len(ecs_response["taskArns"]) available_tasks = max_tasks - current_running_tasks - tasks_to_start = min([sqs_queue_size, available_tasks, max_tasks_per_run, max_tasks]) + tasks_to_start = min([ceil(sqs_queue_size / batch_size), available_tasks, max_tasks_per_run, max_tasks]) print("ECS Tasks to start: " + str(tasks_to_start)) - if tasks_to_start<=0: + if tasks_to_start <= 0: return run_task_response = ecs.run_task( capacityProviderStrategy=[ { - 'capacityProvider': 'FARGATE', - 'weight': 1, - 'base': 2 + 'capacityProvider': 'FARGATE', + 'weight': 1, + 'base': 2 }, { - 'capacityProvider': 'FARGATE_SPOT', - 'weight': 4, - 'base': 0 + 'capacityProvider': 'FARGATE_SPOT', + 'weight': 4, + 'base': 0 } ], cluster=TASK_CLUSTER, taskDefinition=TASK_DEFINITION, overrides={ 'containerOverrides': [ - { - 'name': TASK_CONTAINER, - 'environment': [ { - 'name': 'PIPELINE_ECS_JOB_MODE', - 'value': '1' - }, { - 'name': 'PIPELINE_S3_DEST_PREFIX', - 'value': S3_DEST_PREFIX + 'name': TASK_CONTAINER, + 'environment': [ + { + 'name': 'PIPELINE_ECS_JOB_MODE', + 'value': '1' + }, { + 'name': 'PIPELINE_S3_DEST_PREFIX', + 'value': S3_DEST_PREFIX + } + ] } - ] - } ] }, count=tasks_to_start, @@ -114,6 +118,7 @@ def lambda_handler(event, context): 'securityGroups': [TASK_SECURITYGROUP], 'assignPublicIp': 'DISABLED' } - } + }, + propagateTags='TASK_DEFINITION' ) return tasks_to_start