Skip to content

Commit 6344a2e

Browse files
authored
Data sharing manager bug and scaling fixes (#970)
* Data sharing manager bug and scaling fixes Quality in report should be optional Fixed table expiry and expiry query Push S3 uses distributed map Fix map iter issue Add concurrency Comment out lambda example executions * Add distributed map policy to nag suppression Distributed map policy is not a child of the state machine object so must be explicitly added to the nag suppressions
1 parent 902c199 commit 6344a2e

File tree

9 files changed

+304
-168
lines changed

9 files changed

+304
-168
lines changed

lib/workload/stateless/stacks/data-sharing-manager/deploy/stack.ts

+29-2
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ export class DataSharingStack extends Stack {
327327
// Add container to task role
328328
const dataSummaryReportContainer = taskDefinition.addContainer('dataSummaryReportContainer', {
329329
image: ecs.ContainerImage.fromDockerImageAsset(
330-
new ecrAssets.DockerImageAsset(this, 'gzipRawMd5sum', {
330+
new ecrAssets.DockerImageAsset(this, 'data_summary_reporter', {
331331
directory: path.join(__dirname, '../ecs/tasks/generate_data_summary_report'),
332332
buildArgs: {
333333
TARGETPLATFORM: architecture.dockerPlatform,
@@ -656,10 +656,37 @@ export class DataSharingStack extends Stack {
656656
})
657657
);
658658

659+
// Pusher requires permissions to execute itself
660+
// Because this steps execution uses a distributed map in its step function, we
661+
// have to wire up some extra permissions
662+
// Grant the state machine's role to execute itself
663+
// However we cannot just grant permission to the role as this will result in a circular dependency
664+
// between the state machine and the role
665+
// Instead we use the workaround here - https://github.com/aws/aws-cdk/issues/28820#issuecomment-1936010520
666+
// packagingStateMachine.grantStartExecution(packagingStateMachine);
667+
const distributedMapPolicy = new iam.Policy(this, 'push-sfn-distributed-map-policy', {
668+
document: new iam.PolicyDocument({
669+
statements: [
670+
new iam.PolicyStatement({
671+
resources: [s3PushStateMachine.stateMachineArn],
672+
actions: ['states:StartExecution'],
673+
}),
674+
new iam.PolicyStatement({
675+
resources: [
676+
`arn:aws:states:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:execution:${s3PushStateMachine.stateMachineName}/*:*`,
677+
],
678+
actions: ['states:RedriveExecution'],
679+
}),
680+
],
681+
}),
682+
});
683+
// Add the policy to the state machine role
684+
s3PushStateMachine.role.attachInlinePolicy(distributedMapPolicy);
685+
659686
// https://docs.aws.amazon.com/step-functions/latest/dg/connect-stepfunctions.html#sync-async-iam-policies
660687
// Polling requires permission for states:DescribeExecution
661688
NagSuppressions.addResourceSuppressions(
662-
s3PushStateMachine,
689+
[s3PushStateMachine, distributedMapPolicy],
663690
[
664691
{
665692
id: 'AwsSolutions-IAM5',

lib/workload/stateless/stacks/data-sharing-manager/ecs/tasks/generate_data_summary_report/data_summary_reporting_tools/src/data_summary_reporting_tools/models.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ class LibraryModel(pa.DataFrameModel):
8282
"tumor", "normal", "negative-control"
8383
])
8484
workflow: str = pa.Field()
85-
quality: str = pa.Field()
85+
quality: Optional[str] = pa.Field(nullable=True)
8686
type: str = pa.Field()
8787
assay: str = pa.Field()
8888
coverage: float = pa.Field(ge=0, coerce=True)

lib/workload/stateless/stacks/data-sharing-manager/interface/data_sharing_api/models/package.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from dyntastic import Dyntastic
1313
from fastapi.encoders import jsonable_encoder
1414
from pydantic import Field, BaseModel, ConfigDict, model_validator, computed_field
15-
from datetime import datetime, timedelta
15+
from datetime import datetime, timedelta, timezone
1616
from enum import Enum
1717

1818
from data_sharing_tools.utils.models import SecondaryAnalysisDataTypeEnum
@@ -157,7 +157,7 @@ class PackageResponse(PackageWithId):
157157
@computed_field
158158
def has_expired(self) -> bool:
159159
return (
160-
True if PackageData(**self.model_dump()).is_expired()
160+
True if PackageData.get(self.id).is_expired()
161161
else False
162162
)
163163

@@ -230,7 +230,7 @@ def model_dump(self, **kwargs):
230230

231231
def is_expired(self):
232232
return (
233-
True if (self.request_time + timedelta(days=30)) < datetime.now()
233+
True if (self.request_time + timedelta(days=30)) < datetime.now(timezone.utc)
234234
else False
235235
)
236236

lib/workload/stateless/stacks/data-sharing-manager/lambdas/generate_presigned_urls_for_data_objects_py/generate_presigned_urls_for_data_objects.py

+24-24
Original file line numberDiff line numberDiff line change
@@ -96,27 +96,27 @@ def handler(event, context) -> Dict[str, List[Dict[str, str]]]:
9696
}
9797

9898

99-
if __name__ == "__main__":
100-
from os import environ
101-
import json
102-
environ['AWS_REGION'] = 'ap-southeast-2'
103-
environ['AWS_PROFILE'] = 'umccr-production'
104-
environ['HOSTNAME_SSM_PARAMETER'] = '/hosted_zone/umccr/name'
105-
environ['ORCABUS_TOKEN_SECRET_ID'] = 'orcabus/token-service-jwt'
106-
print(json.dumps(
107-
handler(
108-
{
109-
"ingestIdList": [
110-
"01960822-f3ee-77d0-8fa4-91d5c2e17ad9",
111-
]
112-
},
113-
None
114-
),
115-
indent=4)
116-
)
117-
118-
# {
119-
# "ingestId": "01960822-f3ee-77d0-8fa4-91d5c2e17ad9",
120-
# "presignedUrl": "...",
121-
# "presignedExpiry": "2025-04-14T22:52:35Z"
122-
# },
99+
# if __name__ == "__main__":
100+
# from os import environ
101+
# import json
102+
# environ['AWS_REGION'] = 'ap-southeast-2'
103+
# environ['AWS_PROFILE'] = 'umccr-production'
104+
# environ['HOSTNAME_SSM_PARAMETER'] = '/hosted_zone/umccr/name'
105+
# environ['ORCABUS_TOKEN_SECRET_ID'] = 'orcabus/token-service-jwt'
106+
# print(json.dumps(
107+
# handler(
108+
# {
109+
# "ingestIdList": [
110+
# "01960822-f3ee-77d0-8fa4-91d5c2e17ad9",
111+
# ]
112+
# },
113+
# None
114+
# ),
115+
# indent=4)
116+
# )
117+
#
118+
# # {
119+
# # "ingestId": "01960822-f3ee-77d0-8fa4-91d5c2e17ad9",
120+
# # "presignedUrl": "...",
121+
# # "presignedExpiry": "2025-04-14T22:52:35Z"
122+
# # },

lib/workload/stateless/stacks/data-sharing-manager/lambdas/get_files_list_from_portal_run_id_py/get_files_list_from_portal_run_id.py

+44-44
Original file line numberDiff line numberDiff line change
@@ -327,48 +327,48 @@ def handler(event: Dict, context: Any) -> Dict[str, List[str]]:
327327
# #
328328

329329

330-
if __name__ == "__main__":
331-
from os import environ
332-
import json
333-
334-
environ['AWS_PROFILE'] = 'umccr-production'
335-
environ['AWS_REGION'] = 'ap-southeast-2'
336-
environ['HOSTNAME_SSM_PARAMETER'] = '/hosted_zone/umccr/name'
337-
environ['ORCABUS_TOKEN_SECRET_ID'] = 'orcabus/token-service-jwt'
338-
print(
339-
json.dumps(
340-
handler(
341-
{
342-
"workflowRunObject": {
343-
"orcabusId": "wv1.01HZB3XW00SD59RB456ZFBM1M6",
344-
"timestamp": "2024-06-02T00:00:00Z",
345-
"workflowName": "umccrise",
346-
"workflowVersion": "2.3.1--1--9344851",
347-
"portalRunId": "20240602e4238704",
348-
"libraries": [
349-
{
350-
"libraryId": "L2400668",
351-
"orcabusId": "lib.01JBMVY7RK8ZDRZEKMPV8K60Z3"
352-
},
353-
{
354-
"libraryId": "L2400667",
355-
"orcabusId": "lib.01JBMVY7QFA11HR5J4JS3Y2Y6K"
356-
}
357-
]
358-
}
359-
},
360-
None
361-
),
362-
indent=4
363-
)
364-
)
365-
366-
# {
367-
# "ingestIdList": [
368-
# "01932e37-01a1-7d12-9a69-006d44292b5b",
369-
# "01932e37-0290-76e0-8d03-b624c925c4ae",
370-
# ....
371-
# "01932e41-26a2-7b31-9487-c6e679d80cd1"
372-
# ]
373-
# }
330+
# if __name__ == "__main__":
331+
# from os import environ
332+
# import json
374333
#
334+
# environ['AWS_PROFILE'] = 'umccr-production'
335+
# environ['AWS_REGION'] = 'ap-southeast-2'
336+
# environ['HOSTNAME_SSM_PARAMETER'] = '/hosted_zone/umccr/name'
337+
# environ['ORCABUS_TOKEN_SECRET_ID'] = 'orcabus/token-service-jwt'
338+
# print(
339+
# json.dumps(
340+
# handler(
341+
# {
342+
# "workflowRunObject": {
343+
# "orcabusId": "wv1.01HZB3XW00SD59RB456ZFBM1M6",
344+
# "timestamp": "2024-06-02T00:00:00Z",
345+
# "workflowName": "umccrise",
346+
# "workflowVersion": "2.3.1--1--9344851",
347+
# "portalRunId": "20240602e4238704",
348+
# "libraries": [
349+
# {
350+
# "libraryId": "L2400668",
351+
# "orcabusId": "lib.01JBMVY7RK8ZDRZEKMPV8K60Z3"
352+
# },
353+
# {
354+
# "libraryId": "L2400667",
355+
# "orcabusId": "lib.01JBMVY7QFA11HR5J4JS3Y2Y6K"
356+
# }
357+
# ]
358+
# }
359+
# },
360+
# None
361+
# ),
362+
# indent=4
363+
# )
364+
# )
365+
#
366+
# # {
367+
# # "ingestIdList": [
368+
# # "01932e37-01a1-7d12-9a69-006d44292b5b",
369+
# # "01932e37-0290-76e0-8d03-b624c925c4ae",
370+
# # ....
371+
# # "01932e41-26a2-7b31-9487-c6e679d80cd1"
372+
# # ]
373+
# # }
374+
# #

lib/workload/stateless/stacks/data-sharing-manager/lambdas/get_s3_destination_and_source_uri_mappings_py/get_s3_destination_and_source_uri_mappings.py

+37
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ def handler(event, context) -> Dict[str, List[Dict[str, str]]]:
5454
# Extract the jobId and pushLocation from the event
5555
job_id = event.get("packagingJobId")
5656
push_location = event.get("pushLocation")
57+
count_only = event.get("countOnly", False)
58+
pagination_index = event.get("paginationIndex", None)
5759

5860
# Check if the jobId and pushLocation are provided
5961
if not job_id or not push_location:
@@ -108,6 +110,41 @@ def handler(event, context) -> Dict[str, List[Dict[str, str]]]:
108110
}
109111
)
110112

113+
destination_and_source_uri_mappings_list.sort(
114+
key=lambda x: x["destinationUri"]
115+
)
116+
117+
# If count only is true, return the count of the destination and source uri mappings list
118+
if count_only:
119+
return {
120+
"listCount": len(destination_and_source_uri_mappings_list)
121+
}
122+
123+
if pagination_index:
124+
return {
125+
"destinationAndSourceUriMappingsList": destination_and_source_uri_mappings_list[pagination_index[0]:pagination_index[1]+1]
126+
}
127+
111128
return {
112129
"destinationAndSourceUriMappingsList": destination_and_source_uri_mappings_list
113130
}
131+
132+
133+
# if __name__ == "__main__":
134+
# import json
135+
# from os import environ
136+
#
137+
# environ['AWS_PROFILE'] = 'umccr-production'
138+
# environ['PACKAGING_TABLE_NAME'] = 'data-sharing-packaging-lookup-table'
139+
# environ['CONTENT_INDEX_NAME'] = 'content-index'
140+
#
141+
# print(
142+
# handler(
143+
# {
144+
# "packagingJobId": "pkg.01JS1553E52WEZ43CXVYRSPM5N",
145+
# "pushLocation": "s3://radio-fastq-landing/2025-04-17-cttsov2/",
146+
# "paginationList": [0, 23],
147+
# },
148+
# None
149+
# )
150+
# )

lib/workload/stateless/stacks/data-sharing-manager/lambdas/get_workflow_from_portal_run_id_py/get_workflow_from_portal_run_id.py

+42-42
Original file line numberDiff line numberDiff line change
@@ -206,45 +206,45 @@ def handler(event, context) -> Dict[str, WorkflowRunModelSlim]:
206206
}
207207

208208

209-
if __name__ == "__main__":
210-
from os import environ
211-
import json
212-
213-
# Set envs
214-
environ['AWS_PROFILE'] = 'umccr-production'
215-
environ['AWS_REGION'] = 'ap-southeast-2'
216-
environ['HOSTNAME_SSM_PARAMETER'] = '/hosted_zone/umccr/name'
217-
environ['ORCABUS_TOKEN_SECRET_ID'] = 'orcabus/token-service-jwt'
218-
environ['ATHENA_WORKGROUP_NAME'] = 'orcahouse'
219-
environ['ATHENA_DATASOURCE_NAME'] = 'orcavault'
220-
environ['ATHENA_DATABASE_NAME'] = 'mart'
221-
222-
print(json.dumps(
223-
handler(
224-
{
225-
"portalRunId": "20240420746761e7"
226-
},
227-
None
228-
),
229-
indent=4
230-
))
231-
232-
# {
233-
# "workflowRunObject": {
234-
# "orcabusId": "wv1.01HKGKG700SJ3EW38M7RP8K8BR",
235-
# "timestamp": "2024-01-07T00:00:00Z",
236-
# "workflowName": "umccrise",
237-
# "workflowVersion": "2.3.1--1--9344851",
238-
# "portalRunId": "202401075d94d609",
239-
# "libraries": [
240-
# {
241-
# "libraryId": "L2301517",
242-
# "orcabusId": "lib.01JBMVJ2EPCW8W051H82JF4MTX"
243-
# },
244-
# {
245-
# "libraryId": "L2301512",
246-
# "orcabusId": "lib.01JBMVJ1DQHB5HA6MP7BDYE94K"
247-
# }
248-
# ]
249-
# }
250-
# }
209+
# if __name__ == "__main__":
210+
# from os import environ
211+
# import json
212+
#
213+
# # Set envs
214+
# environ['AWS_PROFILE'] = 'umccr-production'
215+
# environ['AWS_REGION'] = 'ap-southeast-2'
216+
# environ['HOSTNAME_SSM_PARAMETER'] = '/hosted_zone/umccr/name'
217+
# environ['ORCABUS_TOKEN_SECRET_ID'] = 'orcabus/token-service-jwt'
218+
# environ['ATHENA_WORKGROUP_NAME'] = 'orcahouse'
219+
# environ['ATHENA_DATASOURCE_NAME'] = 'orcavault'
220+
# environ['ATHENA_DATABASE_NAME'] = 'mart'
221+
#
222+
# print(json.dumps(
223+
# handler(
224+
# {
225+
# "portalRunId": "20240420746761e7"
226+
# },
227+
# None
228+
# ),
229+
# indent=4
230+
# ))
231+
#
232+
# # {
233+
# # "workflowRunObject": {
234+
# # "orcabusId": "wv1.01HKGKG700SJ3EW38M7RP8K8BR",
235+
# # "timestamp": "2024-01-07T00:00:00Z",
236+
# # "workflowName": "umccrise",
237+
# # "workflowVersion": "2.3.1--1--9344851",
238+
# # "portalRunId": "202401075d94d609",
239+
# # "libraries": [
240+
# # {
241+
# # "libraryId": "L2301517",
242+
# # "orcabusId": "lib.01JBMVJ2EPCW8W051H82JF4MTX"
243+
# # },
244+
# # {
245+
# # "libraryId": "L2301512",
246+
# # "orcabusId": "lib.01JBMVJ1DQHB5HA6MP7BDYE94K"
247+
# # }
248+
# # ]
249+
# # }
250+
# # }

0 commit comments

Comments
 (0)