Skip to content

Commit f5c01a4

Browse files
committed
Implement mirror_file, mirror_part & finalize_file (#6862, PR #7043)
2 parents cb13133 + cd69dc8 commit f5c01a4

File tree

10 files changed

+432
-82
lines changed

10 files changed

+432
-82
lines changed

.mypy.ini

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ modules =
5252
azul.indexer.field,
5353
azul.indexer.action_controller,
5454
azul.indexer.mirror_controller,
55-
azul.service.app_controller
55+
azul.service.app_controller,
56+
azul.indexer.mirror_service
5657
packages =
5758
azul.openapi
5859

lambdas/indexer/.chalice/config.json.template.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
{
5858
indexer.mirror.name: {
5959
'reserved_concurrency': config.mirroring_concurrency,
60-
'lambda_memory_size': 256,
60+
'lambda_memory_size': 512,
6161
'lambda_timeout': config.mirror_lambda_timeout
6262
},
6363
}

scripts/mirror.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
"""
22
Copy all files from the public sources in a catalog to the current deployment's
3-
mirroring bucket. The actual file-copying is not yet implemented, so all this
4-
currently does is send messages to the indexer app that don't do anything.
3+
mirroring bucket.
54
"""
65
import argparse
76
import logging

scripts/mirror_file.py

Lines changed: 15 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
"""
66
import argparse
77
import logging
8-
import math
9-
import string
108
import sys
119

1210
from azul import (
@@ -17,24 +15,16 @@
1715
from azul.args import (
1816
AzulArgumentHelpFormatter,
1917
)
20-
from azul.azulclient import (
21-
AzulClient,
22-
)
23-
from azul.deployment import (
24-
aws,
25-
)
26-
from azul.drs import (
27-
AccessMethod,
28-
)
2918
from azul.http import (
3019
http_client,
3120
)
21+
from azul.indexer.mirror_service import (
22+
FilePart,
23+
MirrorService,
24+
)
3225
from azul.logging import (
3326
configure_script_logging,
3427
)
35-
from azul.plugins import (
36-
File,
37-
)
3828
from azul.plugins.metadata.hca import (
3929
HCAFile,
4030
)
@@ -47,9 +37,6 @@
4737
from azul.service.source_service import (
4838
SourceService,
4939
)
50-
from azul.service.storage_service import (
51-
StorageService,
52-
)
5340

5441
log = logging.getLogger(__name__)
5542

@@ -70,63 +57,30 @@ def get_file(catalog: CatalogName, file_uuid: str) -> HCAFile:
7057
return file
7158

7259

73-
def get_download_url(catalog: CatalogName, file: File) -> str:
74-
drs_uri = file.drs_uri
75-
drs = AzulClient().repository_plugin(catalog).drs_client()
76-
access = drs.get_object(drs_uri, AccessMethod.gs)
77-
assert access.method is AccessMethod.https, access
78-
return access.url
79-
80-
81-
def object_key(file: File) -> str:
82-
digest, digest_type = file.digest()
83-
assert all(c in string.hexdigits for c in digest), R(
84-
'Expected a hexadecimal digest', digest)
85-
return f'file/{digest.lower()}.{digest_type}'
86-
87-
8860
def mirror_file(catalog: CatalogName, file_uuid: str, part_size: int) -> str:
8961
assert config.enable_mirroring, R('Mirroring must be enabled')
9062
assert config.is_tdr_enabled(catalog), R('Only TDR catalogs are supported')
91-
assert config.is_hca_enabled(catalog), R('Only HCA catalogs are supported')
92-
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
93-
assert 5 * 2 ** 20 <= part_size <= 5 * 2 ** 30, R(
94-
'Invalid part size', part_size)
9563
file = get_file(catalog, file_uuid)
96-
download_url = get_download_url(catalog, file)
97-
key = object_key(file)
98-
storage = StorageService(bucket_name=aws.mirror_bucket)
99-
upload = storage.create_multipart_upload(key, content_type=file.content_type)
100-
101-
total_size = file.size
102-
part_count = math.ceil(total_size / part_size)
103-
assert part_count <= 10000, R('Part size is too small for this file',
104-
total_size, part_size, part_count)
64+
service = MirrorService()
65+
upload_id = service.begin_mirroring_file(file)
10566

106-
def file_part(part_number: int) -> str:
107-
start = part_number * part_size
108-
end = min((part_number + 1) * part_size - 1, total_size)
109-
response = http.request('GET',
110-
download_url,
111-
headers={'Range': f'bytes={start}-{end}'})
112-
if response.status == 206:
113-
return storage.upload_multipart_part(response.data,
114-
part_number + 1,
115-
upload)
116-
else:
117-
raise RuntimeError('Unexpected response from repository', response.status)
67+
def mirror_parts():
68+
part = FilePart.first(file, part_size)
69+
while part is not None:
70+
yield service.mirror_file_part(catalog, file, part, upload_id)
71+
part = part.next(file)
11872

119-
etags = list(map(file_part, range(part_count)))
120-
storage.complete_multipart_upload(upload, etags)
121-
return storage.get_presigned_url(key, file.name)
73+
etags = list(mirror_parts())
74+
service.finish_mirroring_file(file, upload_id, etags=etags)
75+
return service.get_mirror_url(file)
12276

12377

12478
def main(argv):
12579
parser = argparse.ArgumentParser(description=__doc__,
12680
formatter_class=AzulArgumentHelpFormatter)
12781
parser.add_argument('-c', '--catalog', default=config.default_catalog)
12882
parser.add_argument('-f', '--file-uuid')
129-
parser.add_argument('-p', '--part-size', type=int, default=50 * 2 ** 20)
83+
parser.add_argument('-p', '--part-size', type=int, default=FilePart.default_size)
13084
args = parser.parse_args(argv)
13185
signed_url = mirror_file(args.catalog, args.file_uuid, args.part_size)
13286
print(signed_url)

src/azul/azulclient.py

Lines changed: 123 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
)
2525
from typing import (
2626
Self,
27+
Sequence,
2728
TYPE_CHECKING,
2829
cast,
2930
)
@@ -41,7 +42,6 @@
4142
from azul import (
4243
CatalogName,
4344
R,
44-
cache,
4545
cached_property,
4646
config,
4747
)
@@ -64,11 +64,16 @@
6464
from azul.indexer.index_service import (
6565
IndexService,
6666
)
67+
from azul.indexer.mirror_service import (
68+
FilePart,
69+
MirrorService,
70+
)
6771
from azul.json import (
6872
Serializable,
6973
)
7074
from azul.plugins import (
7175
File,
76+
MetadataPlugin,
7277
RepositoryPlugin,
7378
)
7479
from azul.queues import (
@@ -114,15 +119,19 @@ class MirrorAction(Action):
114119
mirror_source = auto()
115120
mirror_partition = auto()
116121
mirror_file = auto()
122+
mirror_part = auto()
123+
finalize_file = auto()
117124

118125

119126
@attrs.frozen(kw_only=True)
120127
class AzulClient(SignatureHelper, HasCachedHttpClient):
121128
num_workers: int = 16
122129

123-
@cache
124130
def repository_plugin(self, catalog: CatalogName) -> RepositoryPlugin:
125-
return RepositoryPlugin.load(catalog).create(catalog)
131+
return self.index_service.repository_plugin(catalog)
132+
133+
def metadata_plugin(self, catalog: CatalogName) -> MetadataPlugin:
134+
return self.index_service.metadata_plugin(catalog)
126135

127136
def notification(self, bundle_fqid: SourcedBundleFQID) -> JSON:
128137
"""
@@ -212,6 +221,42 @@ def mirror_file_message(self,
212221
group_id=f'{source.id}:{file.uuid}'
213222
)
214223

224+
def mirror_part_message(self,
225+
catalog: CatalogName,
226+
file: File,
227+
part: FilePart,
228+
upload_id: str,
229+
etags: Sequence[str]
230+
) -> SQSFifoMessage:
231+
return SQSFifoMessage(
232+
body={
233+
'catalog': catalog,
234+
'file': file.to_json(),
235+
'upload_id': upload_id,
236+
'action': MirrorAction.mirror_part.to_json(),
237+
'part': part.to_json(),
238+
'etags': etags
239+
},
240+
group_id=self.mirror_service.mirror_object_key(file)
241+
)
242+
243+
def finalize_file_message(self,
244+
catalog: CatalogName,
245+
file: File,
246+
upload_id: str,
247+
etags: Sequence[str]
248+
) -> SQSFifoMessage:
249+
return SQSFifoMessage(
250+
body={
251+
'catalog': catalog,
252+
'file': file.to_json(),
253+
'upload_id': upload_id,
254+
'action': MirrorAction.finalize_file.to_json(),
255+
'etags': etags
256+
},
257+
group_id=self.mirror_service.mirror_object_key(file)
258+
)
259+
215260
def local_reindex(self, catalog: CatalogName, prefix: str) -> int:
216261
notifications = [
217262
self.notification(bundle_fqid)
@@ -490,6 +535,10 @@ def group_key(fqid: SourcedBundleFQID):
490535
def index_service(self) -> IndexService:
491536
return IndexService()
492537

538+
@cached_property
539+
def mirror_service(self) -> MirrorService:
540+
return MirrorService()
541+
493542
def delete_all_indices(self, catalog: CatalogName):
494543
self.index_service.delete_indices(catalog)
495544

@@ -646,6 +695,77 @@ def message(file: File) -> SQSMessage:
646695
messages = map(message, plugin.list_files(source, prefix))
647696
self.queue_mirror_messages(messages)
648697

698+
def mirror_file(self,
699+
catalog: CatalogName,
700+
file_json: JSON
701+
):
702+
file = self.load_file(catalog, file_json)
703+
assert file.size is not None, R('File size unknown', file)
704+
705+
file_is_large = file.size > 10 * 1024 ** 2
706+
deployment_is_stable = (config.deployment.is_stable
707+
and not config.deployment.is_unit_test
708+
and catalog not in config.integration_test_catalogs)
709+
if file_is_large and not deployment_is_stable:
710+
log.info('Not mirroring file %r (%d bytes) to save cost',
711+
file.uuid, file.size)
712+
else:
713+
# Ensure we test with multiple parts on lower deployments
714+
part_size = FilePart.default_size if deployment_is_stable else FilePart.min_size
715+
if file.size <= part_size:
716+
log.info('Mirroring file %r via standard upload', file.uuid)
717+
self.mirror_service.mirror_file(catalog, file)
718+
log.info('Successfully mirrored file %r via standard upload', file.uuid)
719+
else:
720+
log.info('Mirroring file %r via multi-part upload', file.uuid)
721+
upload_id = self.mirror_service.begin_mirroring_file(file)
722+
first_part = FilePart.first(file, part_size)
723+
etag = self.mirror_service.mirror_file_part(catalog, file, first_part, upload_id)
724+
next_part = first_part.next(file)
725+
assert next_part is not None
726+
messages = [self.mirror_part_message(catalog, file, next_part, upload_id, [etag])]
727+
self.queue_mirror_messages(messages)
728+
729+
def mirror_file_part(self,
730+
catalog: CatalogName,
731+
file_json: JSON,
732+
part_json: JSON,
733+
upload_id: str,
734+
etags: Sequence[str]
735+
):
736+
file = self.load_file(catalog, file_json)
737+
part = FilePart.from_json(part_json)
738+
etag = self.mirror_service.mirror_file_part(catalog, file, part, upload_id)
739+
etags = [*etags, etag]
740+
next_part = part.next(file)
741+
if next_part is None:
742+
log.info('File %r fully uploaded in %d parts', file.uuid, len(etags))
743+
message = self.finalize_file_message(catalog,
744+
file,
745+
upload_id,
746+
etags)
747+
else:
748+
message = self.mirror_part_message(catalog,
749+
file,
750+
next_part,
751+
upload_id,
752+
etags)
753+
self.queue_mirror_messages([message])
754+
755+
def finalize_file(self,
756+
catalog: CatalogName,
757+
file_json: JSON,
758+
upload_id: str,
759+
etags: Sequence[str]
760+
):
761+
file = self.load_file(catalog, file_json)
762+
assert len(etags) > 0
763+
self.mirror_service.finish_mirroring_file(file, upload_id, etags)
764+
log.info('Successfully mirrored file %r via multi-part upload', file.uuid)
765+
766+
def load_file(self, catalog: CatalogName, file: JSON) -> File:
767+
return self.metadata_plugin(catalog).file_class.from_json(file)
768+
649769

650770
class AzulClientError(RuntimeError):
651771
pass

src/azul/indexer/lambda_iam_policy.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,20 @@
113113
},
114114
] if config.enable_log_forwarding else []
115115
),
116+
*(
117+
[
118+
{
119+
'Effect': 'Allow',
120+
'Action': [
121+
's3:PutObject',
122+
's3:GetObject',
123+
],
124+
'Resource': [
125+
f'arn:aws:s3:::{aws.mirror_bucket}/*'
126+
]
127+
}
128+
] if config.enable_mirroring else []
129+
),
116130
{
117131
'Effect': 'Allow',
118132
'Action': [

0 commit comments

Comments
 (0)