Skip to content

Commit aa9f3d5

Browse files
committed
Implement mirror_partition (#6861)
1 parent 12aa267 commit aa9f3d5

File tree

8 files changed

+121
-5
lines changed

8 files changed

+121
-5
lines changed

src/azul/azulclient.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
Serializable,
6969
)
7070
from azul.plugins import (
71+
File,
7172
RepositoryPlugin,
7273
)
7374
from azul.queues import (
@@ -112,6 +113,7 @@ class IndexAction(Action):
112113
class MirrorAction(Action):
113114
mirror_source = auto()
114115
mirror_partition = auto()
116+
mirror_file = auto()
115117

116118

117119
@attrs.frozen(kw_only=True)
@@ -185,6 +187,18 @@ def mirror_partition_message(self,
185187
'prefix': prefix
186188
})
187189

190+
def mirror_file_message(self,
191+
catalog: CatalogName,
192+
source: SourceRef,
193+
file: File,
194+
) -> SQSFifoMessage:
195+
return SQSFifoMessage({
196+
'action': MirrorAction.mirror_file.to_json(),
197+
'catalog': catalog,
198+
'source': cast(JSON, source.to_json()),
199+
'file': file.to_json()
200+
})
201+
188202
def local_reindex(self, catalog: CatalogName, prefix: str) -> int:
189203
notifications = [
190204
self.notification(bundle_fqid)
@@ -607,6 +621,18 @@ def message(prefix: str) -> SQSMessage:
607621
messages = map(message, source.spec.prefix.partition_prefixes())
608622
self.queue_mirror_messages(messages)
609623

624+
def mirror_partition(self, catalog: CatalogName, source, prefix: str):
625+
plugin = self.repository_plugin(catalog)
626+
source = plugin.source_ref_cls.from_json(source)
627+
628+
def message(file: File) -> SQSMessage:
629+
log.info('Mirroring file %r in source %r from catalog %r',
630+
file.uuid, str(source.spec), catalog)
631+
return self.mirror_file_message(catalog, source, file)
632+
633+
messages = map(message, plugin.list_files(source, prefix))
634+
self.queue_mirror_messages(messages)
635+
610636

611637
class AzulClientError(RuntimeError):
612638
pass

src/azul/indexer/mirror_controller.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,12 @@ def mirror(self, event: Iterable[SQSRecord]):
3939
if action is MirrorAction.mirror_source:
4040
self.client.mirror_source(message['catalog'], message['source'])
4141
elif action is MirrorAction.mirror_partition:
42-
# FIXME: Implement mirror_partition
43-
# https://github.com/DataBiosphere/azul/issues/6861
44-
log.info('Would mirror files in partition %r of source %r',
45-
message['prefix'], message['source'])
46-
time.sleep(10)
42+
self.client.mirror_partition(message['catalog'],
43+
message['source'],
44+
message['prefix'])
45+
elif action is MirrorAction.mirror_file:
46+
log.info('Would mirror parts of file %r', message['file']['uuid'])
47+
time.sleep(1)
4748
else:
4849
# FIXME: Implement mirror_file, mirror_part & finalize_file
4950
# https://github.com/DataBiosphere/azul/issues/6862

src/azul/plugins/__init__.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,21 @@ def fetch_bundle(self, bundle_fqid: BUNDLE_FQID) -> BUNDLE:
719719
"""
720720
raise NotImplementedError
721721

722+
@abstractmethod
723+
def list_files(self, source: SOURCE_REF, prefix: str) -> list['File']:
724+
"""
725+
List the files in the given source whose UUID starts with the given
726+
prefix.
727+
728+
:param source: A reference to the repository source that contains the
729+
files to list
730+
731+
:param prefix: A string of at most eight lower-case hexadecimal
732+
characters
733+
"""
734+
735+
raise NotImplementedError
736+
722737
@abstractmethod
723738
def drs_client(self,
724739
authentication: Authentication | None = None

src/azul/plugins/repository/canned/__init__.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@
5050
RepositoryFileDownload,
5151
RepositoryPlugin,
5252
)
53+
from azul.plugins.metadata.hca import (
54+
HCAFile,
55+
)
5356
from azul.plugins.metadata.hca.bundle import (
5457
HCABundle,
5558
)
@@ -199,6 +202,18 @@ def fetch_bundle(self, bundle_fqid: CannedBundleFQID) -> CannedBundle:
199202
time.time() - now, bundle.uuid, bundle.version)
200203
return bundle
201204

205+
def list_files(self, source: CannedSourceRef, prefix: str) -> list[HCAFile]:
206+
self._assert_source(source)
207+
self._assert_partition(source, prefix)
208+
staging_area = self.staging_area(source.spec.name)
209+
return [
210+
HCAFile.from_descriptor(descriptor.content,
211+
uuid=file_uuid,
212+
name=descriptor.content['file_name'],
213+
drs_uri=None)
214+
for file_uuid, descriptor in staging_area.descriptors.items()
215+
]
216+
202217
def _construct_file_url(self, url: furl, file_name: str) -> furl:
203218
"""
204219
>>> plugin = Plugin(_sources=set())

src/azul/plugins/repository/dss/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@ def list_bundles(self,
136136
def fetch_bundle(self, bundle_fqid: DSSBundleFQID) -> NoReturn:
137137
assert False, 'DSS is EOL'
138138

139+
def list_files(self, source: DSSSourceRef, prefix: str) -> NoReturn:
140+
assert False, 'DSS is EOL'
141+
139142
def dss_subscription_query(self, prefix: str) -> JSON:
140143
return {
141144
"query": {

src/azul/plugins/repository/tdr_anvil/__init__.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@
4242
EntityReference,
4343
EntityType,
4444
)
45+
from azul.plugins.metadata.anvil import (
46+
AnvilFile,
47+
)
4548
from azul.plugins.metadata.anvil.bundle import (
4649
AnvilBundle,
4750
EntityLink,
@@ -314,6 +317,19 @@ def list_bundles(self,
314317
batch_prefix=batch_prefix))
315318
return bundles
316319

320+
def list_files(self, source: TDRSourceRef, prefix: str) -> list[AnvilFile]:
321+
self._assert_source(source)
322+
self._assert_partition(source, prefix)
323+
return [
324+
AnvilFile(uuid=ref.entity_id,
325+
name=row['file_name'],
326+
version=self._version,
327+
size=row['file_size'],
328+
md5=row['file_md5sum'],
329+
drs_uri=row['file_ref'])
330+
for ref, row in self._get_batch(source.spec, 'anvil_file', prefix)
331+
]
332+
317333
def _emulate_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle:
318334
if bundle_fqid.table_name == BundleType.primary.value:
319335
log.info('Bundle %r is a primary bundle', bundle_fqid.uuid)

src/azul/plugins/repository/tdr_hca/__init__.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,20 @@ def list_bundles(self,
259259
for row in current_bundles
260260
]
261261

262+
def list_files(self, source: TDRSourceRef, prefix: str) -> list[HCAFile]:
263+
self._assert_source(source)
264+
self._assert_partition(source, prefix)
265+
rows = self._run_sql(' UNION ALL '.join(
266+
f'''
267+
SELECT {', '.join(TDRHCABundle.data_columns)}
268+
FROM {backtick(self._full_table_name(source.spec, entity_type))}
269+
WHERE STARTS_WITH({entity_type}_id, {prefix!r})
270+
'''
271+
for entity_type, entity_cls in api.entity_types.items()
272+
if entity_type.endswith('_file')
273+
))
274+
return list(map(TDRHCABundle.file_from_row, rows))
275+
262276
def _query_unique_sorted(self,
263277
query: str,
264278
group_by: str

test/indexer/test_mirror_controller.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,16 @@
1717
from azul.indexer.mirror_controller import (
1818
MirrorController,
1919
)
20+
from azul.json import (
21+
copy_json,
22+
)
2023
from azul.logging import (
2124
configure_test_logging,
2225
get_test_logger,
2326
)
27+
from azul.plugins.metadata.hca import (
28+
HCAFile,
29+
)
2430
from azul_test_case import (
2531
DCP2TestCase,
2632
)
@@ -69,6 +75,7 @@ def test_mirroring(self):
6975
controller = MirrorController(app=MagicMock())
7076
controller.mirror(event)
7177
partition_messages = self._read_queue(self.client.mirror_queue())
78+
partition_message = copy_json(partition_messages[0])
7279
partitions = []
7380
for message in partition_messages:
7481
partitions.append(message.pop('prefix'))
@@ -77,3 +84,22 @@ def test_mirroring(self):
7784
source=self.source.to_json()),
7885
message)
7986
self.assertEqual(list(self.source.spec.prefix.partition_prefixes()), partitions)
87+
88+
with self.subTest('mirror_partition'):
89+
event = [self._mock_sqs_record(partition_message)]
90+
file = HCAFile(uuid='405852c9-a0cc-4cd8-b9ff-7c6296223661',
91+
name='foo.txt',
92+
version=None,
93+
drs_uri=None,
94+
size=0,
95+
content_type='text/plain',
96+
sha256='123')
97+
plugin_cls = type(self.client.repository_plugin(self.catalog))
98+
with patch.object(plugin_cls, 'list_files', return_value=[file]):
99+
controller.mirror(event)
100+
file_message = one(self._read_queue(self.client.mirror_queue()))
101+
expected_message = dict(action='mirror_file',
102+
catalog=self.catalog,
103+
source=self.source.to_json(),
104+
file=file.to_json())
105+
self.assertEqual(expected_message, file_message)

0 commit comments

Comments
 (0)