Skip to content

Commit 7bec7cb

Browse files
committed
Implement mirror_file, mirror_part & finalize_file (#6862)
1 parent 66b1d3f commit 7bec7cb

File tree

3 files changed

+93
-4
lines changed

3 files changed

+93
-4
lines changed

src/azul/azulclient.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,15 @@
6464
from azul.indexer.index_service import (
6565
IndexService,
6666
)
67+
from azul.indexer.mirror_service import (
68+
MirrorService,
69+
)
6770
from azul.json import (
6871
Serializable,
6972
)
7073
from azul.plugins import (
7174
File,
75+
MetadataPlugin,
7276
RepositoryPlugin,
7377
)
7478
from azul.queues import (
@@ -113,6 +117,8 @@ class MirrorAction(Action):
113117
mirror_source = auto()
114118
mirror_partition = auto()
115119
mirror_file = auto()
120+
mirror_part = auto()
121+
finalize_file = auto()
116122

117123

118124
@attrs.frozen(kw_only=True)
@@ -122,6 +128,9 @@ class AzulClient(SignatureHelper, HasCachedHttpClient):
122128
def repository_plugin(self, catalog: CatalogName) -> RepositoryPlugin:
123129
return self.index_service.repository_plugin(catalog)
124130

131+
def metadata_plugin(self, catalog: CatalogName) -> MetadataPlugin:
132+
return self.index_service.metadata_plugin(catalog)
133+
125134
def notification(self, bundle_fqid: SourcedBundleFQID) -> JSON:
126135
"""
127136
Generate an indexer notification for the given bundle.
@@ -197,6 +206,38 @@ def mirror_file_message(self,
197206
'file': file.to_json()
198207
})
199208

209+
def mirror_part_message(self,
210+
catalog: CatalogName,
211+
source: SourceRef,
212+
file: File,
213+
part: File.PartRange,
214+
upload_id: str
215+
) -> SQSFifoMessage:
216+
key = self.mirror_service.mirror_object_key(file)
217+
return SQSFifoMessage({
218+
'action': MirrorAction.mirror_part.to_json(),
219+
'catalog': catalog,
220+
'source': source.to_json(),
221+
'file': file.to_json(),
222+
'part': part.to_json(),
223+
'upload_id': upload_id
224+
}, group_id=key)
225+
226+
def finalize_file_message(self,
227+
catalog: CatalogName,
228+
source: SourceRef,
229+
file: File,
230+
upload_id: str
231+
) -> SQSFifoMessage:
232+
key = self.mirror_service.mirror_object_key(file)
233+
return SQSFifoMessage({
234+
'action': MirrorAction.finalize_file.to_json(),
235+
'catalog': catalog,
236+
'source': source.to_json(),
237+
'file': file.to_json(),
238+
'upload_id': upload_id
239+
}, group_id=key)
240+
200241
def local_reindex(self, catalog: CatalogName, prefix: str) -> int:
201242
notifications = [
202243
self.notification(bundle_fqid)
@@ -475,6 +516,10 @@ def group_key(fqid: SourcedBundleFQID):
475516
def index_service(self) -> IndexService:
476517
return IndexService()
477518

519+
@cached_property
520+
def mirror_service(self) -> MirrorService:
521+
return MirrorService()
522+
478523
def delete_all_indices(self, catalog: CatalogName):
479524
self.index_service.delete_indices(catalog)
480525

@@ -632,6 +677,26 @@ def message(file: File) -> SQSMessage:
632677
messages = map(message, plugin.list_files(source, prefix))
633678
self.queue_mirror_messages(messages)
634679

680+
def mirror_file(self,
681+
catalog: CatalogName,
682+
source_json: JSON,
683+
file_json: JSON,
684+
part_size: int):
685+
source = self.repository_plugin(catalog).source_ref_cls.from_json(source_json)
686+
file = self.load_file(catalog, file_json)
687+
upload_id = self.mirror_service.begin_mirroring_file(file)
688+
689+
def message(part: File.PartRange) -> SQSMessage:
690+
log.info('Mirroring part %r of file %r in source %r from catalog %r ',
691+
part.part_number, file.uuid, str(source.spec), catalog)
692+
return self.mirror_part_message(catalog, source, file, part, upload_id)
693+
694+
messages = map(message, file.partition(part_size))
695+
self.queue_mirror_messages(messages)
696+
697+
def load_file(self, catalog: CatalogName, file: JSON) -> File:
698+
return self.metadata_plugin(catalog).file_class().from_json(file)
699+
635700

636701
class AzulClientError(RuntimeError):
637702
pass

src/azul/indexer/mirror_controller.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@
1919
from azul.indexer.action_controller import (
2020
ActionController,
2121
)
22+
from azul.indexer.mirror_service import (
23+
MirrorService,
24+
)
25+
from azul.plugins import (
26+
File,
27+
)
2228

2329
log = logging.getLogger(__name__)
2430

@@ -29,6 +35,10 @@ class MirrorController(ActionController[MirrorAction]):
2935
def client(self) -> AzulClient:
3036
return AzulClient()
3137

38+
@property
39+
def service(self) -> MirrorService:
40+
return self.client.mirror_service
41+
3242
def mirror(self, event: Iterable[SQSRecord]):
3343
for record in event:
3444
message = json.loads(record.body)
@@ -43,11 +53,22 @@ def mirror(self, event: Iterable[SQSRecord]):
4353
message['source'],
4454
message['prefix'])
4555
elif action is MirrorAction.mirror_file:
46-
log.info('Would mirror parts of file %r', message['file']['uuid'])
47-
time.sleep(1)
56+
self.client.mirror_file(message['catalog'],
57+
message['source'],
58+
message['file'],
59+
part_size=...)
60+
elif action is MirrorAction.mirror_part:
61+
catalog = message['catalog']
62+
file = self.client.load_file(catalog, message['file'])
63+
part = File.PartRange.from_json(message['part'])
64+
self.service.mirror_file_part(catalog, file, part, message['upload_id'])
65+
elif action is MirrorAction.finalize_file:
66+
file = self.client.load_file(message['catalog'], message['file'])
67+
# We rely on the FIFO mechanics of the mirror queue to
68+
# ensure that all parts have been successfully uploaded
69+
# before the finalization message is handled
70+
self.service.finish_mirroring_file(file, message['upload_id'], etags=None)
4871
else:
49-
# FIXME: Implement mirror_file, mirror_part & finalize_file
50-
# https://github.com/DataBiosphere/azul/issues/6862
5172
assert False, action
5273
except BaseException:
5374
log.warning(f'Worker failed to handle message {message}.', exc_info=True)

src/azul/indexer/mirror_service.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,9 @@ def get_mirror_url(self, file: File) -> str:
9696
def mirror_object_key(self, file: File) -> str:
9797
return self._file_key('file', file)
9898

99+
def info_object_key(self, file: File) -> str:
100+
return self._file_key('info', file)
101+
99102
def _file_key(self, prefix: str, file: File) -> str:
100103
digest, digest_type = file.digest()
101104
assert all(c in string.hexdigits for c in digest), R(

0 commit comments

Comments
 (0)