Skip to content

Commit 2b87a91

Browse files
committed
Implement mirror_partition (#6861)
1 parent d7596a5 commit 2b87a91

File tree

8 files changed

+186
-6
lines changed

8 files changed

+186
-6
lines changed

src/azul/azulclient.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
Serializable,
6969
)
7070
from azul.plugins import (
71+
File,
7172
RepositoryPlugin,
7273
)
7374
from azul.queues import (
@@ -112,6 +113,7 @@ class IndexAction(Action):
112113
class MirrorAction(Action):
113114
mirror_source = auto()
114115
mirror_partition = auto()
116+
mirror_file = auto()
115117

116118

117119
@attrs.frozen(kw_only=True)
@@ -195,6 +197,21 @@ def mirror_partition_message(self,
195197
group_id=f'{source.id}:{prefix}'
196198
)
197199

200+
def mirror_file_message(self,
201+
catalog: CatalogName,
202+
source: SourceRef,
203+
file: File,
204+
) -> SQSFifoMessage:
205+
return SQSFifoMessage(
206+
body={
207+
'action': MirrorAction.mirror_file.to_json(),
208+
'catalog': catalog,
209+
'source': cast(JSON, source.to_json()),
210+
'file': file.to_json()
211+
},
212+
group_id=f'{source.id}:{file.uuid}'
213+
)
214+
198215
def local_reindex(self, catalog: CatalogName, prefix: str) -> int:
199216
notifications = [
200217
self.notification(bundle_fqid)
@@ -607,7 +624,7 @@ def message(source: SourceRef):
607624
def mirror_source(self, catalog: CatalogName, source_json: JSON):
608625
plugin = self.repository_plugin(catalog)
609626
source = plugin.source_ref_cls.from_json(source_json)
610-
source = plugin.partition_source_for_indexing(catalog, source)
627+
source = plugin.partition_source_for_mirroring(catalog, source)
611628

612629
def message(prefix: str) -> SQSMessage:
613630
log.info('Mirroring files in partition %r of source %r from catalog %r',
@@ -617,6 +634,18 @@ def message(prefix: str) -> SQSMessage:
617634
messages = map(message, source.spec.prefix.partition_prefixes())
618635
self.queue_mirror_messages(messages)
619636

637+
def mirror_partition(self, catalog: CatalogName, source_json: JSON, prefix: str):
638+
plugin = self.repository_plugin(catalog)
639+
source = plugin.source_ref_cls.from_json(source_json)
640+
641+
def message(file: File) -> SQSMessage:
642+
log.info('Mirroring file %r in source %r from catalog %r',
643+
file.uuid, str(source.spec), catalog)
644+
return self.mirror_file_message(catalog, source, file)
645+
646+
messages = map(message, plugin.list_files(source, prefix))
647+
self.queue_mirror_messages(messages)
648+
620649

621650
class AzulClientError(RuntimeError):
622651
pass

src/azul/indexer/mirror_controller.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
)
2121
from azul.types import (
2222
JSON,
23+
json_mapping,
2324
json_str,
2425
)
2526

@@ -40,11 +41,14 @@ def _mirror(self, message: JSON):
4041
if action is MirrorAction.mirror_source:
4142
self.client.mirror_source(message['catalog'], message['source'])
4243
elif action is MirrorAction.mirror_partition:
43-
# FIXME: Implement mirror_partition
44-
# https://github.com/DataBiosphere/azul/issues/6861
45-
log.info('Would mirror files in partition %r of source %r',
46-
message['prefix'], message['source'])
47-
time.sleep(10)
44+
self.client.mirror_partition(message['catalog'],
45+
message['source'],
46+
message['prefix'])
47+
elif action is MirrorAction.mirror_file:
48+
# FIXME: Implement mirror_file, mirror_part & finalize_file
49+
# https://github.com/DataBiosphere/azul/issues/6862
50+
log.info('Would mirror parts of file %r', json_mapping(message['file'])['uuid'])
51+
time.sleep(1)
4852
else:
4953
# FIXME: Implement mirror_file, mirror_part & finalize_file
5054
# https://github.com/DataBiosphere/azul/issues/6862

src/azul/plugins/__init__.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -672,6 +672,14 @@ def count_bundles(self, source: SOURCE_SPEC) -> int:
672672
"""
673673
raise NotImplementedError
674674

675+
@abstractmethod
676+
def count_files(self, source: SOURCE_SPEC) -> int:
677+
"""
678+
The total number of files in the given source. The source's prefix
679+
may be None.
680+
"""
681+
raise NotImplementedError
682+
675683
def partition_source_for_indexing(self,
676684
catalog: CatalogName,
677685
source: SOURCE_REF
@@ -683,6 +691,17 @@ def partition_source_for_indexing(self,
683691
"""
684692
return self._partition_source(catalog, source, self.count_bundles)
685693

694+
def partition_source_for_mirroring(self,
695+
catalog: CatalogName,
696+
source: SOURCE_REF
697+
) -> SOURCE_REF:
698+
"""
699+
If the source already has a prefix, return the source. Otherwise, return
700+
an updated copy of the source with a heuristically computed prefix that
701+
should be appropriate for mirroring in the given catalog.
702+
"""
703+
return self._partition_source(catalog, source, self.count_files)
704+
686705
def _partition_source(self,
687706
catalog: CatalogName,
688707
source: SOURCE_REF,
@@ -730,6 +749,20 @@ def fetch_bundle(self, bundle_fqid: BUNDLE_FQID) -> BUNDLE:
730749
"""
731750
raise NotImplementedError
732751

752+
@abstractmethod
753+
def list_files(self, source: SOURCE_REF, prefix: str) -> list['File']:
754+
"""
755+
List the files in the given source whose digest value starts with the
756+
given prefix.
757+
758+
:param source: A reference to the repository source that contains the
759+
files to list
760+
761+
:param prefix: A string of lower-case hexadecimal characters
762+
"""
763+
764+
raise NotImplementedError
765+
733766
@abstractmethod
734767
def drs_client(self,
735768
authentication: Authentication | None = None

src/azul/plugins/repository/canned/__init__.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@
5050
RepositoryFileDownload,
5151
RepositoryPlugin,
5252
)
53+
from azul.plugins.metadata.hca import (
54+
HCAFile,
55+
)
5356
from azul.plugins.metadata.hca.bundle import (
5457
HCABundle,
5558
)
@@ -171,6 +174,15 @@ def count_bundles(self, source: CannedSourceRef) -> int:
171174
if links_id.lower().startswith(prefix)
172175
)
173176

177+
def count_files(self, source: SimpleSourceSpec) -> int:
178+
staging_area = self.staging_area(source.name)
179+
prefix = '' if source.prefix is None else source.prefix.common
180+
return sum(
181+
1
182+
for descriptor in staging_area.descriptors.values()
183+
if descriptor.content['sha256'].startswith(prefix)
184+
)
185+
174186
def list_bundles(self,
175187
source: CannedSourceRef,
176188
prefix: str
@@ -205,6 +217,19 @@ def fetch_bundle(self, bundle_fqid: CannedBundleFQID) -> CannedBundle:
205217
time.time() - now, bundle.uuid, bundle.version)
206218
return bundle
207219

220+
def list_files(self, source: CannedSourceRef, prefix: str) -> list[HCAFile]:
221+
self._assert_source(source)
222+
self._assert_partition(source, prefix)
223+
staging_area = self.staging_area(source.spec.name)
224+
return [
225+
HCAFile.from_descriptor(descriptor.content,
226+
uuid=file_uuid,
227+
name=descriptor.content['file_name'],
228+
drs_uri=None)
229+
for file_uuid, descriptor in staging_area.descriptors.items()
230+
if descriptor.content['sha256'].startswith(prefix)
231+
]
232+
208233
def _construct_file_url(self, url: furl, file_name: str) -> furl:
209234
"""
210235
>>> plugin = Plugin(_sources=set())

src/azul/plugins/repository/dss/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,9 @@ def _lookup_source_id(self, spec: SimpleSourceSpec) -> str:
119119
def count_bundles(self, source: SimpleSourceSpec) -> NoReturn:
120120
assert False, 'DSS is EOL'
121121

122+
def count_files(self, source: SimpleSourceSpec) -> NoReturn:
123+
assert False, 'DSS is EOL'
124+
122125
def list_sources(self,
123126
authentication: Authentication | None
124127
) -> list[DSSSourceRef]:
@@ -136,6 +139,9 @@ def list_bundles(self,
136139
def fetch_bundle(self, bundle_fqid: DSSBundleFQID) -> NoReturn:
137140
assert False, 'DSS is EOL'
138141

142+
def list_files(self, source: DSSSourceRef, prefix: str) -> NoReturn:
143+
assert False, 'DSS is EOL'
144+
139145
def dss_subscription_query(self, prefix: str) -> JSON:
140146
return {
141147
"query": {

src/azul/plugins/repository/tdr_anvil/__init__.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@
4242
EntityReference,
4343
EntityType,
4444
)
45+
from azul.plugins.metadata.anvil import (
46+
AnvilFile,
47+
)
4548
from azul.plugins.metadata.anvil.bundle import (
4649
AnvilBundle,
4750
EntityLink,
@@ -241,6 +244,15 @@ def _batch_uuid(self,
241244
self.batch_uuid_version,
242245
self.bundle_uuid_version)
243246

247+
def count_files(self, source: TDRSourceSpec) -> int:
248+
query = f'''
249+
SELECT COUNT(*) AS count
250+
FROM {backtick(self._full_table_name(source, 'anvil_file'))}
251+
''' + ('' if source.prefix is None else f'''
252+
WHERE starts_with(file_md5sum, {source.prefix.common!r})
253+
''')
254+
return one(self._run_sql(query))['count']
255+
244256
def count_bundles(self, source: TDRSourceSpec) -> int:
245257
prefix = '' if source.prefix is None else source.prefix.common
246258
assert not any(map(str.isupper, prefix)), source
@@ -316,6 +328,23 @@ def list_bundles(self,
316328
batch_prefix=batch_prefix))
317329
return bundles
318330

331+
def list_files(self, source: TDRSourceRef, prefix: str) -> list[AnvilFile]:
332+
self._assert_source(source)
333+
self._assert_partition(source, prefix)
334+
batch = self._get_batch(source.spec,
335+
'anvil_file',
336+
prefix,
337+
id_column='file_md5sum')
338+
return [
339+
AnvilFile(uuid=ref.entity_id,
340+
name=row['file_name'],
341+
version=self._version,
342+
size=row['file_size'],
343+
md5=row['file_md5sum'],
344+
drs_uri=row['file_ref'])
345+
for ref, row in batch
346+
]
347+
319348
def _emulate_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle:
320349
if bundle_fqid.table_name == BundleType.primary.value:
321350
log.info('Bundle %r is a primary bundle', bundle_fqid.uuid)

src/azul/plugins/repository/tdr_hca/__init__.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,20 @@ def count_bundles(self, source: TDRSourceSpec) -> int:
242242
rows = self._run_sql(query)
243243
return one(rows)['count']
244244

245+
def count_files(self, source: TDRSourceSpec) -> int:
246+
query = ' UNION ALL '.join(
247+
f'''
248+
SELECT COUNT(*) AS count
249+
FROM {backtick(self._full_table_name(source, entity_type))}
250+
''' + ('' if source.prefix is None else f'''
251+
WHERE STARTS_WITH(JSON_EXTRACT_SCALAR(descriptor, "$.sha256"), {source.prefix.common!r})
252+
''')
253+
for entity_type, entity_cls in api.entity_types.items()
254+
if entity_type.endswith('_file')
255+
)
256+
rows = self._run_sql(query)
257+
return sum(row['count'] for row in rows)
258+
245259
def list_bundles(self,
246260
source: TDRSourceRef,
247261
prefix: str
@@ -261,6 +275,20 @@ def list_bundles(self,
261275
for row in current_bundles
262276
]
263277

278+
def list_files(self, source: TDRSourceRef, prefix: str) -> list[HCAFile]:
279+
self._assert_source(source)
280+
self._assert_partition(source, prefix)
281+
rows = self._run_sql(' UNION ALL '.join(
282+
f'''
283+
SELECT {', '.join(TDRHCABundle.data_columns)}
284+
FROM {backtick(self._full_table_name(source.spec, entity_type))}
285+
WHERE STARTS_WITH(JSON_EXTRACT_SCALAR(descriptor, "$.sha256"), {prefix!r})
286+
'''
287+
for entity_type, entity_cls in api.entity_types.items()
288+
if entity_type.endswith('_file')
289+
))
290+
return list(map(TDRHCABundle.file_from_row, rows))
291+
264292
def _query_unique_sorted(self,
265293
query: str,
266294
group_by: str

test/indexer/test_mirror_controller.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,16 @@
1717
from azul.indexer.mirror_controller import (
1818
MirrorController,
1919
)
20+
from azul.json import (
21+
copy_json,
22+
)
2023
from azul.logging import (
2124
configure_test_logging,
2225
get_test_logger,
2326
)
27+
from azul.plugins.metadata.hca import (
28+
HCAFile,
29+
)
2430
from azul_test_case import (
2531
DCP2TestCase,
2632
)
@@ -69,6 +75,7 @@ def test_mirroring(self):
6975
controller = MirrorController(app=MagicMock())
7076
controller.mirror(event)
7177
partition_messages = self._read_queue(self.client.mirror_queue())
78+
partition_message = copy_json(partition_messages[0])
7279
partitions = []
7380
for message in partition_messages:
7481
partitions.append(message.pop('prefix'))
@@ -77,3 +84,22 @@ def test_mirroring(self):
7784
source=self.source.to_json()),
7885
message)
7986
self.assertEqual(list(self.source.spec.prefix.partition_prefixes()), partitions)
87+
88+
with self.subTest('mirror_partition'):
89+
event = [self._mock_sqs_record(partition_message)]
90+
file = HCAFile(uuid='405852c9-a0cc-4cd8-b9ff-7c6296223661',
91+
name='foo.txt',
92+
version=None,
93+
drs_uri=None,
94+
size=0,
95+
content_type='text/plain',
96+
sha256='123')
97+
plugin_cls = type(self.client.repository_plugin(self.catalog))
98+
with patch.object(plugin_cls, 'list_files', return_value=[file]):
99+
controller.mirror(event)
100+
file_message = one(self._read_queue(self.client.mirror_queue()))
101+
expected_message = dict(action='mirror_file',
102+
catalog=self.catalog,
103+
source=self.source.to_json(),
104+
file=file.to_json())
105+
self.assertEqual(expected_message, file_message)

0 commit comments

Comments
 (0)