@@ -372,7 +372,6 @@ def __init__(
372
372
self ._counter = 0
373
373
self ._last_time = time ()
374
374
self ._index_counter = 0
375
- self ._current_item : Any = None
376
375
377
376
def run (self ) -> None :
378
377
try :
@@ -477,6 +476,7 @@ def _try_upload(self, data: Optional[Union[str, Tuple[str, str]]]) -> None:
477
476
assert os .path .exists (data ), data
478
477
else :
479
478
assert os .path .exists (data [- 1 ]), data
479
+
480
480
self .to_upload_queues [self ._counter % self .num_uploaders ].put (data )
481
481
482
482
def _collect_paths (self ) -> None :
@@ -588,8 +588,8 @@ def _start_uploaders(self) -> None:
588
588
589
589
def _handle_data_chunk_recipe (self , index : int ) -> None :
590
590
try :
591
- self . _current_item = self .items [index ] if self .reader is None else self .reader .read (self .items [index ])
592
- item_data_or_generator = self .data_recipe .prepare_item (self . _current_item )
591
+ current_item = self .items [index ] if self .reader is None else self .reader .read (self .items [index ])
592
+ item_data_or_generator = self .data_recipe .prepare_item (current_item )
593
593
if isinstance (item_data_or_generator , types .GeneratorType ):
594
594
for item_data in item_data_or_generator :
595
595
if item_data is not None :
@@ -713,14 +713,19 @@ def _done(self, size: int, delete_cached_files: bool, output_dir: Dir) -> _Resul
713
713
size = sum ([c ["dim" ] if c ["dim" ] is not None else c ["chunk_size" ] for c in config ["chunks" ]])
714
714
num_bytes = sum ([c ["chunk_bytes" ] for c in config ["chunks" ]])
715
715
data_format = tree_unflatten (config ["config" ]["data_format" ], treespec_loads (config ["config" ]["data_spec" ]))
716
+ num_chunks = len (config ["chunks" ])
717
+
718
+ # The platform can't store more than 1024 entries.
719
+ # Note: This isn't really used right now, so it is fine to skip if too big.
720
+ num_bytes_per_chunk = [c ["chunk_size" ] for c in config ["chunks" ]] if num_chunks < 1024 else []
716
721
717
722
return _Result (
718
723
size = size ,
719
724
num_bytes = num_bytes ,
720
725
data_format = data_format ,
721
726
compression = config ["config" ]["compression" ],
722
727
num_chunks = len (config ["chunks" ]),
723
- num_bytes_per_chunk = [ c [ "chunk_size" ] for c in config [ "chunks" ]] ,
728
+ num_bytes_per_chunk = num_bytes_per_chunk ,
724
729
)
725
730
return _Result (
726
731
size = size ,
@@ -866,9 +871,9 @@ def run(self, data_recipe: DataRecipe) -> None:
866
871
raise ValueError ("The `prepare_structure` should return a list of item metadata." )
867
872
868
873
if self .reader :
869
- workers_user_items = self .reader .items_to_workers (user_items , self .num_workers )
874
+ user_items = self .reader .remap_items (user_items , self .num_workers )
870
875
871
- elif self .weights is not None :
876
+ if self .weights is not None :
872
877
if len (self .weights ) != len (user_items ):
873
878
raise ValueError ("The provided weights length should match the inputs' length." )
874
879
workers_user_items = _map_items_to_workers_weighted (
0 commit comments