Skip to content

Commit d22ac9e

Browse files
committed
Implement mirror_file, mirror_part & finalize_file (#6862)
1 parent 37c4900 commit d22ac9e

File tree

7 files changed

+185
-17
lines changed

7 files changed

+185
-17
lines changed

lambdas/indexer/.chalice/config.json.template.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
{
5858
indexer.mirror.name: {
5959
'reserved_concurrency': config.mirroring_concurrency,
60-
'lambda_memory_size': 256,
60+
'lambda_memory_size': 512,
6161
'lambda_timeout': config.mirror_lambda_timeout
6262
},
6363
}

scripts/mirror.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
"""
22
Copy all files from the public sources in a catalog to the current deployment's
3-
mirroring bucket. The actual file-copying is not yet implemented, so all this
4-
currently does is send messages to the indexer app that don't do anything.
3+
mirroring bucket.
54
"""
65
import argparse
76
import logging

src/azul/azulclient.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
)
2525
from typing import (
2626
Self,
27+
Sequence,
2728
TYPE_CHECKING,
2829
cast,
2930
)
@@ -63,11 +64,16 @@
6364
from azul.indexer.index_service import (
6465
IndexService,
6566
)
67+
from azul.indexer.mirror_service import (
68+
FilePart,
69+
MirrorService,
70+
)
6671
from azul.json import (
6772
Serializable,
6873
)
6974
from azul.plugins import (
7075
File,
76+
MetadataPlugin,
7177
RepositoryPlugin,
7278
)
7379
from azul.queues import (
@@ -113,6 +119,8 @@ class MirrorAction(Action):
113119
mirror_source = auto()
114120
mirror_partition = auto()
115121
mirror_file = auto()
122+
mirror_part = auto()
123+
finalize_file = auto()
116124

117125

118126
@attrs.frozen(kw_only=True)
@@ -122,6 +130,9 @@ class AzulClient(SignatureHelper, HasCachedHttpClient):
122130
def repository_plugin(self, catalog: CatalogName) -> RepositoryPlugin:
123131
return self.index_service.repository_plugin(catalog)
124132

133+
def metadata_plugin(self, catalog: CatalogName) -> MetadataPlugin:
134+
return self.index_service.metadata_plugin(catalog)
135+
125136
def notification(self, bundle_fqid: SourcedBundleFQID) -> JSON:
126137
"""
127138
Generate an indexer notification for the given bundle.
@@ -210,6 +221,42 @@ def mirror_file_message(self,
210221
group_id=f'{source.id}:{file.uuid}'
211222
)
212223

224+
def mirror_part_message(self,
225+
catalog: CatalogName,
226+
file: File,
227+
part: FilePart,
228+
upload_id: str,
229+
etags: Sequence[str] = ()
230+
) -> SQSFifoMessage:
231+
return SQSFifoMessage(
232+
body={
233+
'catalog': catalog,
234+
'file': file.to_json(),
235+
'upload_id': upload_id,
236+
'action': MirrorAction.mirror_part.to_json(),
237+
'part': part.to_json(),
238+
'etags': etags
239+
},
240+
group_id=self.mirror_service.mirror_object_key(file)
241+
)
242+
243+
def finalize_file_message(self,
244+
catalog: CatalogName,
245+
file: File,
246+
upload_id: str,
247+
etags: Sequence[str]
248+
) -> SQSFifoMessage:
249+
return SQSFifoMessage(
250+
body={
251+
'catalog': catalog,
252+
'file': file.to_json(),
253+
'upload_id': upload_id,
254+
'action': MirrorAction.finalize_file.to_json(),
255+
'etags': etags
256+
},
257+
group_id=self.mirror_service.mirror_object_key(file)
258+
)
259+
213260
def local_reindex(self, catalog: CatalogName, prefix: str) -> int:
214261
notifications = [
215262
self.notification(bundle_fqid)
@@ -488,6 +535,10 @@ def group_key(fqid: SourcedBundleFQID):
488535
def index_service(self) -> IndexService:
489536
return IndexService()
490537

538+
@cached_property
539+
def mirror_service(self) -> MirrorService:
540+
return MirrorService()
541+
491542
def delete_all_indices(self, catalog: CatalogName):
492543
self.index_service.delete_indices(catalog)
493544

@@ -644,6 +695,61 @@ def message(file: File) -> SQSMessage:
644695
messages = map(message, plugin.list_files(source, prefix))
645696
self.queue_mirror_messages(messages)
646697

698+
def mirror_file(self,
699+
catalog: CatalogName,
700+
file_json: JSON
701+
):
702+
file = self.load_file(catalog, file_json)
703+
assert file.size is not None, R('File size unknown', file)
704+
part_size = FilePart.default_size
705+
if file.size <= part_size:
706+
log.info('Mirroring file %r via single-part upload', file.uuid)
707+
self.mirror_service.mirror_file(catalog, file)
708+
else:
709+
log.info('Mirroring file %r via multi-part upload', file.uuid)
710+
upload_id = self.mirror_service.begin_mirroring_file(file)
711+
part = FilePart.head(file, part_size)
712+
messages = [self.mirror_part_message(catalog, file, part, upload_id)]
713+
self.queue_mirror_messages(messages)
714+
715+
def mirror_file_part(self,
716+
catalog: CatalogName,
717+
file_json: JSON,
718+
part_json: JSON,
719+
upload_id: str,
720+
etags: Sequence[str]
721+
):
722+
file = self.load_file(catalog, file_json)
723+
part = FilePart.from_json(part_json)
724+
etag = self.mirror_service.mirror_file_part(catalog, file, part, upload_id)
725+
etags = [*etags, etag]
726+
next_part = part.next(file)
727+
if next_part is None:
728+
log.info('File %r fully uploaded in %d parts', file.uuid, len(etags))
729+
message = self.finalize_file_message(catalog,
730+
file,
731+
upload_id,
732+
etags)
733+
else:
734+
message = self.mirror_part_message(catalog,
735+
file,
736+
next_part,
737+
upload_id,
738+
etags)
739+
self.queue_mirror_messages([message])
740+
741+
def finalize_file(self,
742+
catalog: CatalogName,
743+
file_json: JSON,
744+
upload_id: str,
745+
etags: Sequence[str]
746+
):
747+
file = self.load_file(catalog, file_json)
748+
self.mirror_service.finish_mirroring_file(file, upload_id, etags)
749+
750+
def load_file(self, catalog: CatalogName, file: JSON) -> File:
751+
return self.metadata_plugin(catalog).file_class.from_json(file)
752+
647753

648754
class AzulClientError(RuntimeError):
649755
pass

src/azul/indexer/lambda_iam_policy.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,20 @@
113113
},
114114
] if config.enable_log_forwarding else []
115115
),
116+
*(
117+
[
118+
{
119+
'Effect': 'Allow',
120+
'Action': [
121+
's3:PutObject',
122+
's3:GetObject',
123+
],
124+
'Resource': [
125+
f'arn:aws:s3:::{aws.mirror_bucket}/*'
126+
]
127+
}
128+
] if config.enable_mirroring else []
129+
),
116130
{
117131
'Effect': 'Allow',
118132
'Action': [

src/azul/indexer/mirror_controller.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import logging
2-
import time
32
from typing import (
43
Iterable,
54
)
@@ -18,9 +17,11 @@
1817
from azul.indexer.action_controller import (
1918
ActionController,
2019
)
20+
from azul.indexer.mirror_service import (
21+
MirrorService,
22+
)
2123
from azul.types import (
2224
JSON,
23-
json_mapping,
2425
json_str,
2526
)
2627

@@ -33,6 +34,10 @@ class MirrorController(ActionController[MirrorAction]):
3334
def client(self) -> AzulClient:
3435
return AzulClient()
3536

37+
@property
38+
def service(self) -> MirrorService:
39+
return self.client.mirror_service
40+
3641
def mirror(self, event: Iterable[SQSRecord]):
3742
self._handle_events(event, self._mirror)
3843

@@ -45,11 +50,18 @@ def _mirror(self, message: JSON):
4550
message['source'],
4651
message['prefix'])
4752
elif action is MirrorAction.mirror_file:
48-
# FIXME: Implement mirror_file, mirror_part & finalize_file
49-
# https://github.com/DataBiosphere/azul/issues/6862
50-
log.info('Would mirror parts of file %r', json_mapping(message['file'])['uuid'])
51-
time.sleep(1)
53+
self.client.mirror_file(message['catalog'],
54+
message['file'])
55+
elif action is MirrorAction.mirror_part:
56+
self.client.mirror_file_part(message['catalog'],
57+
message['file'],
58+
message['part'],
59+
message['upload_id'],
60+
message['etags'])
61+
elif action is MirrorAction.finalize_file:
62+
self.client.finalize_file(message['catalog'],
63+
message['file'],
64+
message['upload_id'],
65+
message['etags'])
5266
else:
53-
# FIXME: Implement mirror_file, mirror_part & finalize_file
54-
# https://github.com/DataBiosphere/azul/issues/6862
5567
assert False, action

src/azul/indexer/mirror_service.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,16 @@ def _storage(self) -> StorageService:
124124
def repository_plugin(self, catalog: CatalogName) -> RepositoryPlugin:
125125
return RepositoryPlugin.load(catalog).create(catalog)
126126

127+
def mirror_file(self, catalog: CatalogName, file: File):
128+
"""
129+
Upload the file in a single request. For larger files, use
130+
:meth:`begin_mirroring_file` instead.
131+
"""
132+
file_content = self._download(catalog, file)
133+
self._storage.put(object_key=self.mirror_object_key(file),
134+
data=file_content,
135+
content_type=file.content_type)
136+
127137
def begin_mirroring_file(self, file: File) -> str:
128138
"""
129139
Initiate a multipart upload of the file's content and return the upload
@@ -137,10 +147,11 @@ def mirror_file_part(self,
137147
catalog: CatalogName,
138148
file: File,
139149
part: FilePart,
140-
upload_id: str):
150+
upload_id: str
151+
) -> str:
141152
"""
142153
Upload a part of a file to a multipart upload begun with
143-
:meth:`begin_mirroring_file`
154+
:meth:`begin_mirroring_file` and return the uploaded part's ETag.
144155
"""
145156
upload = self._get_upload(file, upload_id)
146157
file_content = self._download(catalog, file, part)

test/indexer/test_mirror_controller.py

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import hashlib
12
from unittest.mock import (
23
MagicMock,
34
PropertyMock,
@@ -14,9 +15,15 @@
1415
from azul import (
1516
config,
1617
)
18+
from azul.deployment import (
19+
aws,
20+
)
1721
from azul.indexer.mirror_controller import (
1822
MirrorController,
1923
)
24+
from azul.indexer.mirror_service import (
25+
MirrorService,
26+
)
2027
from azul.json import (
2128
copy_json,
2229
)
@@ -30,6 +37,9 @@
3037
from azul_test_case import (
3138
DCP2TestCase,
3239
)
40+
from service import (
41+
S3TestCase,
42+
)
3343
from sqs_test_case import (
3444
WorkQueueTestCase,
3545
)
@@ -43,7 +53,7 @@ def setUpModule():
4353

4454

4555
@mock_aws
46-
class TestMirrorController(DCP2TestCase, WorkQueueTestCase):
56+
class TestMirrorController(DCP2TestCase, WorkQueueTestCase, S3TestCase):
4757

4858
@classmethod
4959
def _patch_enable_mirroring(cls):
@@ -60,6 +70,11 @@ def setUp(self):
6070
super().setUp()
6171
self._create_mock_queues(config.mirror_queue.name,
6272
config.mirror_queue.to_fail.name)
73+
self._create_test_bucket(self.bucket)
74+
75+
@property
76+
def bucket(self) -> str:
77+
return aws.mirror_bucket
6378

6479
def test_mirroring(self):
6580
with self.subTest('remote_mirror'):
@@ -85,15 +100,17 @@ def test_mirroring(self):
85100
message)
86101
self.assertEqual(list(self.source.spec.prefix.partition_prefixes()), partitions)
87102

103+
file_contents = b'lorem ipsum dolor sit\n'
104+
88105
with self.subTest('mirror_partition'):
89106
event = [self._mock_sqs_record(partition_message)]
90107
file = HCAFile(uuid='405852c9-a0cc-4cd8-b9ff-7c6296223661',
91108
name='foo.txt',
92109
version=None,
93-
drs_uri=None,
94-
size=0,
110+
drs_uri='drs://fake-domain.lan/foo',
111+
size=len(file_contents),
95112
content_type='text/plain',
96-
sha256='123')
113+
sha256=hashlib.sha256(file_contents).hexdigest())
97114
plugin_cls = type(self.client.repository_plugin(self.catalog))
98115
with patch.object(plugin_cls, 'list_files', return_value=[file]):
99116
controller.mirror(event)
@@ -103,3 +120,12 @@ def test_mirroring(self):
103120
source=self.source.to_json(),
104121
file=file.to_json())
105122
self.assertEqual(expected_message, file_message)
123+
124+
with self.subTest('mirror_file'):
125+
event = [self._mock_sqs_record(file_message)]
126+
with patch.object(MirrorService, '_download', return_value=file_contents):
127+
controller.mirror(event)
128+
response = self._s3.get_object(Bucket=self.bucket,
129+
Key=controller.service.mirror_object_key(file))
130+
mirrored_file_contents = response['Body'].read()
131+
self.assertEqual(mirrored_file_contents, file_contents)

0 commit comments

Comments
 (0)