Skip to content

Commit d7596a5

Browse files
committed
Refactor source partitioning
1 parent 9851290 commit d7596a5

File tree

4 files changed

+22
-13
lines changed

4 files changed

+22
-13
lines changed

scripts/post_deploy_tdr.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ def verify_source(self,
101101
require(source_spec.prefix.common == '', source_spec)
102102
self.tdr.check_bigquery_access(source_spec)
103103
else:
104-
ref = plugin.partition_source(catalog, ref)
104+
ref = plugin.partition_source_for_indexing(catalog, ref)
105105
subgraph_count = plugin.count_bundles(ref.spec)
106106
require(subgraph_count > 0, 'Common prefix is too long', ref.spec)
107107
require(subgraph_count <= 512, 'Common prefix is too short', ref.spec)

src/azul/azulclient.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ def remote_reindex(self,
325325
plugin = self.repository_plugin(catalog)
326326
for source_spec in sources:
327327
source_ref = plugin.resolve_source(source_spec)
328-
source_ref = plugin.partition_source(catalog, source_ref)
328+
source_ref = plugin.partition_source_for_indexing(catalog, source_ref)
329329

330330
def message(partition_prefix: str) -> SQSMessage:
331331
log.info('Remotely reindexing prefix %r of source_ref %r into catalog %r',
@@ -607,7 +607,7 @@ def message(source: SourceRef):
607607
def mirror_source(self, catalog: CatalogName, source_json: JSON):
608608
plugin = self.repository_plugin(catalog)
609609
source = plugin.source_ref_cls.from_json(source_json)
610-
source = plugin.partition_source(catalog, source)
610+
source = plugin.partition_source_for_indexing(catalog, source)
611611

612612
def message(prefix: str) -> SQSMessage:
613613
log.info('Mirroring files in partition %r of source %r from catalog %r',

src/azul/plugins/__init__.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
)
1212
from typing import (
1313
AbstractSet,
14+
Callable,
1415
ClassVar,
1516
Iterable,
1617
Literal,
@@ -671,27 +672,35 @@ def count_bundles(self, source: SOURCE_SPEC) -> int:
671672
"""
672673
raise NotImplementedError
673674

674-
def partition_source(self,
675-
catalog: CatalogName,
676-
source: SOURCE_REF
677-
) -> SOURCE_REF:
675+
def partition_source_for_indexing(self,
676+
catalog: CatalogName,
677+
source: SOURCE_REF
678+
) -> SOURCE_REF:
678679
"""
679680
If the source already has a prefix, return the source. Otherwise, return
680681
an updated copy of the source with a heuristically computed prefix that
681682
should be appropriate for indexing in the given catalog.
682683
"""
684+
return self._partition_source(catalog, source, self.count_bundles)
685+
686+
def _partition_source(self,
687+
catalog: CatalogName,
688+
source: SOURCE_REF,
689+
counter: Callable[[SOURCE_SPEC], int]
690+
) -> SOURCE_REF:
683691
if source.spec.prefix is None:
684-
count = self.count_bundles(source.spec)
692+
count = counter(source.spec)
685693
is_main = config.deployment.is_main
686694
is_it = catalog in config.integration_test_catalogs
687-
# We use the "lesser" heuristic during IT to avoid indexing an
688-
# excessive number of bundles
695+
# We use the "lesser" heuristic during IT to keep the cost and
696+
# performance of the tests within reasonable limits
689697
if is_main and not is_it:
690698
prefix = Prefix.for_main_deployment(count)
691699
else:
692700
prefix = Prefix.for_lesser_deployment(count)
693-
source = source.with_prefix(prefix)
694-
return source
701+
return source.with_prefix(prefix)
702+
else:
703+
return source
695704

696705
@abstractmethod
697706
def list_bundles(self,

test/integration_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1277,7 +1277,7 @@ def _prepare_notifications(self,
12771277
bundle_fqids = set()
12781278
notifications = []
12791279
for source in sources:
1280-
source = plugin.partition_source(catalog, source)
1280+
source = plugin.partition_source_for_indexing(catalog, source)
12811281
# Some partitions may be empty, but we include them anyway to
12821282
# ensure test coverage for handling multiple partitions per source
12831283
for partition_prefix in source.spec.prefix.partition_prefixes():

0 commit comments

Comments
 (0)