From ca485ee7ff927a0c24574ed56b7eacc3f2748f72 Mon Sep 17 00:00:00 2001 From: thomas Date: Tue, 6 Feb 2024 09:59:42 +0000 Subject: [PATCH] update --- src/lightning/data/streaming/functions.py | 36 +++++++++++++++++++---- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/src/lightning/data/streaming/functions.py b/src/lightning/data/streaming/functions.py index bbdcdbc2ca764..7bc865ccb8d85 100644 --- a/src/lightning/data/streaming/functions.py +++ b/src/lightning/data/streaming/functions.py @@ -23,7 +23,7 @@ import torch from lightning.data.processing.readers import BaseReader -from lightning.data.streaming.constants import _TORCH_GREATER_EQUAL_2_1_0 +from lightning.data.streaming.constants import _IS_IN_STUDIO, _TORCH_GREATER_EQUAL_2_1_0 from lightning.data.streaming.data_processor import DataChunkRecipe, DataProcessor, DataTransformRecipe from lightning.data.streaming.resolver import ( Dir, @@ -169,8 +169,8 @@ def map( output_dir: The folder where the processed data should be stored. num_workers: The number of workers to use during processing fast_dev_run: Whether to use process only a sub part of the inputs - num_nodes: When doing remote execution, the number of nodes to use. - machine: When doing remote execution, the machine to use. + num_nodes: When doing remote execution, the number of nodes to use. Only supported on https://lightning.ai/. + machine: When doing remote execution, the machine to use. Only supported on https://lightning.ai/. num_downloaders: The number of downloaders per worker. reorder_files: By default, reorders the files by file size to distribute work equally among all workers. Set this to ``False`` if the order in which samples are processed should be preserved. @@ -183,6 +183,17 @@ def map( if len(inputs) == 0: raise ValueError(f"The provided inputs should be non empty. Found {inputs}.") + if not _IS_IN_STUDIO and (machine is not None or num_nodes is not None): + raise ValueError( + "Only https://lightning.ai/ supports multiple nodes or selecting a machine." + " Create an account to try it out.") + + if not _IS_IN_STUDIO: + print( + "Create an account on https://lightning.ai/ to transform your data faster using " + "multiple nodes and large machines." + ) + if num_nodes is None or int(os.getenv("DATA_OPTIMIZER_NUM_NODES", 0)) > 0: _output_dir: Dir = _resolve_dir(output_dir) @@ -242,8 +253,8 @@ def optimize( compression: The compression algorithm to use over the chunks. num_workers: The number of workers to use during processing fast_dev_run: Whether to use process only a sub part of the inputs - num_nodes: When doing remote execution, the number of nodes to use. - machine: When doing remote execution, the machine to use. + num_nodes: When doing remote execution, the number of nodes to use. Only supported on https://lightning.ai/. + machine: When doing remote execution, the machine to use. Only supported on https://lightning.ai/. num_downloaders: The number of downloaders per worker. reorder_files: By default, reorders the files by file size to distribute work equally among all workers. Set this to ``False`` if the order in which samples are processed should be preserved. @@ -258,6 +269,18 @@ def optimize( if chunk_size is None and chunk_bytes is None: raise ValueError("Either `chunk_size` or `chunk_bytes` needs to be defined.") + if not _IS_IN_STUDIO and (machine is not None or num_nodes is not None): + raise ValueError( + "Only https://lightning.ai/ supports multiple nodes or selecting a machine." + "Create an account to try it out." + ) + + if not _IS_IN_STUDIO: + print( + "Create an account on https://lightning.ai/ to optimize your data faster " + "using multiple nodes and large machines." + ) + if num_nodes is None or int(os.getenv("DATA_OPTIMIZER_NUM_NODES", 0)) > 0: _output_dir: Dir = _resolve_dir(output_dir) @@ -312,6 +335,9 @@ def __init__(self, folder: str, max_workers: Optional[int] = os.cpu_count()) -> self.max_workers = max_workers or 1 self.futures: List[concurrent.futures.Future] = [] + if not _IS_IN_STUDIO: + print("This method is optimized to run on https://lightning.ai/. Don't use it otherwise.") + def __iter__(self) -> Any: """This function queues the folders to perform listdir across multiple workers.""" with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: