From b2c0d873e725c8fc30bae9a7694a9bdcd0700d11 Mon Sep 17 00:00:00 2001 From: Luigi Di Fraia Date: Sat, 15 Jun 2024 08:28:05 +0100 Subject: [PATCH] chore: start tasks based on processing batch size in the queue-processing example --- .../lambda_function.py | 57 ++++++++++--------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/application-code/lambda-function-queue-trigger/lambda_function.py b/application-code/lambda-function-queue-trigger/lambda_function.py index cd37f6bb..9e493a59 100644 --- a/application-code/lambda-function-queue-trigger/lambda_function.py +++ b/application-code/lambda-function-queue-trigger/lambda_function.py @@ -1,14 +1,17 @@ -import boto3, os +import boto3 +from math import ceil sqs = boto3.client('sqs') ecs = boto3.client('ecs') ssm = boto3.client('ssm') + +batch_size = 10 max_tasks_per_run = 100 + def lambda_handler(event, context): max_tasks = None sqs_url = None - job_mode = None pipeline_enabled = None TASK_CLUSTER = None TASK_CONTAINER = None @@ -52,58 +55,59 @@ def lambda_handler(event, context): if param['Name'] == 'PIPELINE_S3_DEST_PREFIX': S3_DEST_PREFIX = param['Value'] if (sqs_url and pipeline_enabled and max_tasks and - TASK_CLUSTER and TASK_CONTAINER and TASK_DEFINITION and TASK_SUBNET and TASK_SECURITYGROUP): + TASK_CLUSTER and TASK_CONTAINER and TASK_DEFINITION and TASK_SUBNET and TASK_SECURITYGROUP): max_tasks = int(max_tasks) else: - raise Exception("Required SSM: PIPELINE_ECS_MAX_TASKS,PIPELINE_UNPROCESSED_SQS_URL,PIPELINE_ENABLED,PIPELINE_ECS_CLUSTER," - "PIPELINE_ECS_TASK_CONTAINER,PIPELINE_ECS_TASK_DEFINITION,PIPELINE_ECS_TASK_SUBNET,PIPELINE_ECS_TASK_SECURITYGROUP,PIPELINE_S3_DEST_PREFIX") - if (pipeline_enabled != "1"): + raise Exception("Required SSM: PIPELINE_ECS_MAX_TASKS,PIPELINE_UNPROCESSED_SQS_URL,PIPELINE_ENABLED," + "PIPELINE_ECS_CLUSTER,PIPELINE_ECS_TASK_CONTAINER,PIPELINE_ECS_TASK_DEFINITION," + "PIPELINE_ECS_TASK_SUBNET,PIPELINE_ECS_TASK_SECURITYGROUP,PIPELINE_S3_DEST_PREFIX") + if pipeline_enabled != "1": print("ECS Pipeline is Disabled. Not starting tasks via Lambda.") return sqs_response = sqs.get_queue_attributes( QueueUrl=sqs_url, - AttributeNames=[ 'ApproximateNumberOfMessages' ] + AttributeNames=['ApproximateNumberOfMessages'] ) sqs_queue_size = int(sqs_response['Attributes']['ApproximateNumberOfMessages']) print("Current SQS Queue size: " + str(sqs_queue_size)) if sqs_queue_size == 0: return ecs_response = ecs.list_tasks( - cluster=TASK_CLUSTER,maxResults=100,desiredStatus='RUNNING',family=TASK_CONTAINER) + cluster=TASK_CLUSTER, maxResults=100, desiredStatus='RUNNING', family=TASK_CONTAINER) current_running_tasks = len(ecs_response["taskArns"]) available_tasks = max_tasks - current_running_tasks tasks_to_start = min([sqs_queue_size, available_tasks, max_tasks_per_run, max_tasks]) print("ECS Tasks to start: " + str(tasks_to_start)) - if tasks_to_start<=0: + if tasks_to_start <= 0: return run_task_response = ecs.run_task( capacityProviderStrategy=[ { - 'capacityProvider': 'FARGATE', - 'weight': 1, - 'base': 2 + 'capacityProvider': 'FARGATE', + 'weight': 1, + 'base': 2 }, { - 'capacityProvider': 'FARGATE_SPOT', - 'weight': 4, - 'base': 0 + 'capacityProvider': 'FARGATE_SPOT', + 'weight': 4, + 'base': 0 } ], cluster=TASK_CLUSTER, taskDefinition=TASK_DEFINITION, overrides={ 'containerOverrides': [ - { - 'name': TASK_CONTAINER, - 'environment': [ { - 'name': 'PIPELINE_ECS_JOB_MODE', - 'value': '1' - }, { - 'name': 'PIPELINE_S3_DEST_PREFIX', - 'value': S3_DEST_PREFIX + 'name': TASK_CONTAINER, + 'environment': [ + { + 'name': 'PIPELINE_ECS_JOB_MODE', + 'value': '1' + }, { + 'name': 'PIPELINE_S3_DEST_PREFIX', + 'value': S3_DEST_PREFIX + } + ] } - ] - } ] }, count=tasks_to_start, @@ -114,6 +118,7 @@ def lambda_handler(event, context): 'securityGroups': [TASK_SECURITYGROUP], 'assignPublicIp': 'DISABLED' } - } + }, + propagateTags='TASK_DEFINITION' ) return tasks_to_start