Skip to content

Commit 8b77858

Browse files
authored
Add the fastq sync manager (#900)
1 parent 1643f97 commit 8b77858

File tree

25 files changed

+2687
-0
lines changed

25 files changed

+2687
-0
lines changed

config/config.ts

+6
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ import {
7272
getFastqUnarchivingManagerStackProps,
7373
getFastqUnarchivingManagerTableStackProps,
7474
} from './stacks/fastqUnarchivingManager';
75+
import {
76+
getFastqSyncManagerStackProps,
77+
getFastqSyncManagerTableStackProps,
78+
} from './stacks/fastqSyncManager';
7579

7680
interface EnvironmentConfig {
7781
name: string;
@@ -115,6 +119,7 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null
115119
accessKeySecretStackProps: getAccessKeySecretStackProps(stage),
116120
fastqManagerTableStackProps: getFastqManagerTableStackProps(stage),
117121
fastqUnarchivingManagerTableStackProps: getFastqUnarchivingManagerTableStackProps(),
122+
fastqSyncManagerTableStackProps: getFastqSyncManagerTableStackProps(),
118123
},
119124
statelessConfig: {
120125
metadataManagerStackProps: getMetadataManagerStackProps(stage),
@@ -147,6 +152,7 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null
147152
pgDDProps: getPgDDProps(stage),
148153
fastqManagerStackProps: getFastqManagerStackProps(stage),
149154
fastqUnarchivingManagerStackProps: getFastqUnarchivingManagerStackProps(stage),
155+
fastqSyncManagerStackProps: getFastqSyncManagerStackProps(stage),
150156
},
151157
};
152158

config/constants.ts

+5
Original file line numberDiff line numberDiff line change
@@ -961,3 +961,8 @@ export const fastqUnarchivingEventDetailType = {
961961
updateJob: 'FastqUnarchivingJobUpdated',
962962
};
963963
export const fastqUnarchivingManagerEventSource = 'orcabus.fastqunarchivingmanager';
964+
965+
/*
966+
Fastq sync service
967+
*/
968+
export const fastqSyncEventDetailType = 'fastqSync';

config/stacks/fastqSyncManager.ts

+91
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import {
2+
AppStage,
3+
/* Events */
4+
eventBusName,
5+
fastqManagerEventDetails,
6+
fastqManagerEventSource,
7+
fastqSyncEventDetailType,
8+
fastqUnarchivingEventDetailType,
9+
fastqUnarchivingManagerEventSource,
10+
hostedZoneNameParameterPath,
11+
icav2PipelineCacheBucket,
12+
icav2PipelineCachePrefix,
13+
jwtSecretName,
14+
} from '../constants';
15+
16+
import { FastqSyncManagerStackConfig } from '../../lib/workload/stateless/stacks/fastq-sync/deploy/interfaces';
17+
import { FastqSyncTableConfig } from '../../lib/workload/stateful/stacks/fastq-sync-dynamodb/deploy/stack';
18+
19+
/*
20+
export interface FastqSyncManagerStackConfig {
21+
/*
22+
Orcabus token and zone name for external lambda functions
23+
*/
24+
//orcabusTokenSecretsManagerPath: string;
25+
//hostedZoneNameSsmParameterPath: string;
26+
27+
/*
28+
Data tables
29+
*/
30+
//fastqSyncDynamodbTableName: string;
31+
/*
32+
Event bus stuff
33+
*/
34+
//eventBusName: string;
35+
// eventTriggers: FastqSyncEventTriggers
36+
//}
37+
//*/
38+
39+
const fastqSyncDynamodbTableName = 'fastqSyncTokenTable';
40+
41+
// Stateful
42+
export const getFastqSyncManagerTableStackProps = (): FastqSyncTableConfig => {
43+
return {
44+
/* DynamoDB table for fastq list rows */
45+
dynamodbTableName: fastqSyncDynamodbTableName,
46+
};
47+
};
48+
49+
// Stateless
50+
export const getFastqSyncManagerStackProps = (stage: AppStage): FastqSyncManagerStackConfig => {
51+
return {
52+
/*
53+
Table stuff
54+
*/
55+
fastqSyncDynamodbTableName: fastqSyncDynamodbTableName,
56+
57+
/*
58+
Events stuff
59+
*/
60+
eventBusName: eventBusName,
61+
eventTriggers: {
62+
fastqSetUpdated: {
63+
eventSource: fastqManagerEventSource,
64+
eventDetailType: fastqManagerEventDetails.updateFastqSet,
65+
},
66+
fastqListRowUpdated: {
67+
eventSource: fastqManagerEventSource,
68+
eventDetailType: fastqManagerEventDetails.updateFastqListRow,
69+
},
70+
fastqUnarchiving: {
71+
eventSource: fastqUnarchivingManagerEventSource,
72+
eventDetailType: fastqUnarchivingEventDetailType.updateJob,
73+
},
74+
fastqSync: {
75+
eventDetailType: fastqSyncEventDetailType,
76+
},
77+
},
78+
79+
/*
80+
Orcabus token and zone name for external lambda functions
81+
*/
82+
orcabusTokenSecretsManagerPath: jwtSecretName,
83+
hostedZoneNameSsmParameterPath: hostedZoneNameParameterPath,
84+
85+
/*
86+
S3 Stuff
87+
*/
88+
pipelineCacheBucketName: icav2PipelineCacheBucket[stage],
89+
pipelineCachePrefix: icav2PipelineCachePrefix[stage],
90+
};
91+
};
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# Fastq Sync dynamodb
2+
3+
Simple database with dynamodb to store task tokens and link them to fastq set ids (And vice versa)
4+
5+
A task token can only map to one fastq set id, but a fastq set id can map to multiple task tokens.
6+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import * as cdk from 'aws-cdk-lib';
2+
import { Construct } from 'constructs';
3+
import { DynamodbPartitionedPipelineConstruct } from '../../../../components/dynamodb-partitioned-table';
4+
5+
export interface FastqSyncTableConfig {
6+
dynamodbTableName: string;
7+
}
8+
9+
export type FastqSyncManagerTableStackProps = FastqSyncTableConfig & cdk.StackProps;
10+
11+
export class FastqSyncManagerTable extends cdk.Stack {
12+
constructor(scope: Construct, id: string, props: FastqSyncManagerTableStackProps) {
13+
super(scope, id, props);
14+
15+
/*
16+
Initialise dynamodb table, where id_type is the primary sort key
17+
*/
18+
const dynamodbTable = new DynamodbPartitionedPipelineConstruct(this, 'fastq_sync_table', {
19+
tableName: props.dynamodbTableName,
20+
});
21+
}
22+
}

lib/workload/stateful/statefulStackCollectionClass.ts

+15
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ import {
7474
FastqUnarchivingManagerTable,
7575
FastqUnarchivingManagerTableStackProps,
7676
} from './stacks/fastq-unarchiving-dynamodb/deploy';
77+
import {
78+
FastqSyncManagerTable,
79+
FastqSyncManagerTableStackProps,
80+
} from './stacks/fastq-sync-dynamodb/deploy/stack';
7781

7882
export interface StatefulStackCollectionProps {
7983
dataBucketStackProps: DataBucketStackProps;
@@ -98,6 +102,7 @@ export interface StatefulStackCollectionProps {
98102
accessKeySecretStackProps: AccessKeySecretStackProps;
99103
fastqManagerTableStackProps: FastqManagerTableStackProps;
100104
fastqUnarchivingManagerTableStackProps: FastqUnarchivingManagerTableStackProps;
105+
fastqSyncManagerTableStackProps: FastqSyncManagerTableStackProps;
101106
}
102107

103108
export class StatefulStackCollection {
@@ -125,6 +130,7 @@ export class StatefulStackCollection {
125130
readonly accessKeySecretStack: Stack;
126131
readonly fastqManagerTableStack: Stack;
127132
readonly fastqUnarchivingManagerTableStack: Stack;
133+
readonly fastqSyncManagerTableStack: Stack;
128134

129135
constructor(
130136
scope: Construct,
@@ -292,6 +298,15 @@ export class StatefulStackCollection {
292298
...statefulConfiguration.fastqUnarchivingManagerTableStackProps,
293299
}
294300
);
301+
302+
this.fastqSyncManagerTableStack = new FastqSyncManagerTable(
303+
scope,
304+
'FastqSyncManagerTableStack',
305+
{
306+
...this.createTemplateProps(env, 'FastqSyncManagerTableStack'),
307+
...statefulConfiguration.fastqSyncManagerTableStackProps,
308+
}
309+
);
295310
}
296311

297312
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# Fastq Sync Service
2+
3+
The fastq sync service is a simple service that allows step functions with fastq set ids as inputs to 'hang'
4+
until the requirements of the fastq set have been met.
5+
6+
This is useful for workflow-glue services that have fastq set ids but need to wait for either
7+
8+
1. The fastq set readsets to be created
9+
2. The fastq set to have been qc'd AND have a fingerprint file and compression information
10+
3. This is also useful for data sharing services that require the fastqs to be unarchived before they can be shared
11+
12+
The step function will then hang at that step until the task token has been 'unlocked' by the fastq sync service.
13+
14+
## Registering task tokens
15+
16+
Workflow glue services can use the fastq sync service by generating the following event
17+
18+
```json5
19+
{
20+
"EventBusName": "OrcaBusMain",
21+
"Source": "doesnt matter",
22+
"DetailType": "FastqSync",
23+
"Detail": {
24+
"taskToken": "uuid",
25+
"fastqSetId": "fqs.123456",
26+
// Then one or more of the following
27+
// Requirements can be left out if not needed
28+
"requirements": {
29+
// Do all fastq list rows in the set contain readsets?
30+
"hasActiveReadSet": true,
31+
// Do all fastq list rows in the set contain an ntsm uri?
32+
"hasFingerprint": true,
33+
// Do all fastq list rows in the set contain compression information?
34+
// Useful if the fastq list rows are in ora format.
35+
// Some pipelines require the gzip file size in bytes in order
36+
// to stream the gzip file from ora back into s3
37+
"hasFileCompressionInformation": true,
38+
// Do all fastq list rows in the set contain qc information?
39+
// We don't use this for anything yet but we may use this in the future
40+
// to ensure that a fastq set has met the ideal coverage levels
41+
"hasQc": true,
42+
},
43+
"forceUnarchiving": true, // Force unarchiving of a fastq file if necessary, will fail if not set and fastq is in archive
44+
}
45+
}
46+
```
47+
48+
The fastq sync service will also trigger the qc, fingerprint or compression information services if they do not exist.
49+
50+
If any of the fastq list rows are in archive, the fastq sync service will also trigger the fastq unarchiving service to thaw out these fastq list rows.
51+
And place them into the 'byob' bucket.
52+
53+
54+
## Unlocking task tokens
55+
56+
The fastq sync service will then also listen for the following event types:
57+
58+
1. FastqListRowUpdated (from the fastq management service)
59+
2. UnarchivingJobUpdated (from the fastq unarchiving service, where the status is 'SUCCEEDED')
60+
61+
Everytime one of the events is triggered, the fastq sync service will check if the fastq list row or fastq set has met the requirements.
62+
If all requirements are met for the fastq set, the fastq sync service will unlock the task token.

0 commit comments

Comments
 (0)