diff --git a/lib/workload/stateless/stacks/fastq-unarchiving/app/step_functions_templates/run_s3_steps_copy_sfns_sfn_template.asl.json b/lib/workload/stateless/stacks/fastq-unarchiving/app/step_functions_templates/run_s3_steps_copy_sfns_sfn_template.asl.json index 40a935479..8d72c16d1 100644 --- a/lib/workload/stateless/stacks/fastq-unarchiving/app/step_functions_templates/run_s3_steps_copy_sfns_sfn_template.asl.json +++ b/lib/workload/stateless/stacks/fastq-unarchiving/app/step_functions_templates/run_s3_steps_copy_sfns_sfn_template.asl.json @@ -45,32 +45,56 @@ "JitterStrategy": "FULL" } ], - "Next": "Split fastq ids by instrument run id" + "Next": "For each fastq id (Batched)" }, - "Split fastq ids by instrument run id": { - "Type": "Task", - "Resource": "arn:aws:states:::lambda:invoke", - "Output": "{% $states.result.Payload %}", - "Arguments": { - "FunctionName": "${__split_fastq_ids_by_instrument_run_id_lambda_function_arn__}", - "Payload": { - "fastqIdList": "{% $fastqIdList %}" - } + "For each fastq id (Batched)": { + "Type": "Map", + "Label": "ForeachfastqidBatched", + "Items": "{% $fastqIdList %}", + "ItemBatcher": { + "MaxItemsPerBatch": 10 }, - "Retry": [ - { - "ErrorEquals": [ - "Lambda.ServiceException", - "Lambda.AWSLambdaException", - "Lambda.SdkClientException", - "Lambda.TooManyRequestsException" - ], - "IntervalSeconds": 1, - "MaxAttempts": 3, - "BackoffRate": 2, - "JitterStrategy": "FULL" + "MaxConcurrency": 1, + "ItemProcessor": { + "ProcessorConfig": { + "Mode": "DISTRIBUTED", + "ExecutionType": "STANDARD" + }, + "StartAt": "Split fastq ids by instrument run id", + "States": { + "Split fastq ids by instrument run id": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Arguments": { + "FunctionName": "${__split_fastq_ids_by_instrument_run_id_lambda_function_arn__}", + "Payload": { + "fastqIdList": "{% $states.input.Items %}" + } + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 1, + "MaxAttempts": 3, + "BackoffRate": 2, + "JitterStrategy": "FULL" + } + ], + "End": true, + "Output": { + "fastqIdsByInstrumentRunId": "{% $states.result.Payload.fastqIdsByInstrumentRunId %}" + } + } } - ], + }, + "Output": { + "fastqIdsByInstrumentRunId": "{% /* https://try.jsonata.org/TdN5_ctqk */\n(\n /* Functions */\n $appendDistinct := function($i, $j){$distinct($append($i, $j))};\n $flattenedResultList := [\n $reduce(\n [\n $map($states.result, function($v){\n [\n $v.fastqIdsByInstrumentRunId\n ]\n })\n ],\n $append\n )\n ];\n\n /* Collect all instrument run ids */\n $allInstrumentRunIds := [\n $reduce(\n [\n $map(\n [\n $flattenedResultList\n ], \n function($v){\n $v.instrumentRunId\n }\n )\n ],\n $appendDistinct\n )\n ];\n\n /* Iterate over each instrumentRunId and combine with any fastq id lists */\n [\n $map(\n $allInstrumentRunIds, \n function($v){\n {\n \"instrumentRunId\": $v,\n \"fastqIdList\": [\n /* Append all into one list */\n $reduce(\n [\n $map(\n /* Any results with this instrument run id */\n [\n $filter(\n $flattenedResultList,\n function($y){\n $y.instrumentRunId = $v\n }\n )\n ],\n /* Get the fastq id lists */\n function($z){\n $z.fastqIdList\n }\n )\n ],\n $append\n )\n ]\n }\n }\n )\n ]\n) %}" + }, "Next": "For each instrument run id" }, "For each instrument run id": { diff --git a/lib/workload/stateless/stacks/fastq-unarchiving/deploy/index.ts b/lib/workload/stateless/stacks/fastq-unarchiving/deploy/index.ts index cae145ffc..225775eab 100644 --- a/lib/workload/stateless/stacks/fastq-unarchiving/deploy/index.ts +++ b/lib/workload/stateless/stacks/fastq-unarchiving/deploy/index.ts @@ -374,7 +374,13 @@ export class FastqUnarchivingManagerStack extends Stack { statements: [ new iam.PolicyStatement({ resources: [runUnarchivingStateMachine.stateMachineArn], - actions: ['states:StartExecution', 'states:DescribeExecution', 'states:StopExecution'], + actions: ['states:StartExecution'], + }), + new iam.PolicyStatement({ + resources: [ + `arn:aws:states:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:execution:${runUnarchivingStateMachine.stateMachineName}/*:*`, + ], + actions: ['states:RedriveExecution'], }), ], }), @@ -400,7 +406,7 @@ export class FastqUnarchivingManagerStack extends Stack { ); NagSuppressions.addResourceSuppressions( - runUnarchivingStateMachine, + [runUnarchivingStateMachine, distributedMapPolicy], [ { id: 'AwsSolutions-IAM5',