Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

map & operator: Add lightning.ai account creation info #19418

Merged
merged 3 commits into from
Feb 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions src/lightning/data/streaming/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import torch

from lightning.data.processing.readers import BaseReader
from lightning.data.streaming.constants import _TORCH_GREATER_EQUAL_2_1_0
from lightning.data.streaming.constants import _IS_IN_STUDIO, _TORCH_GREATER_EQUAL_2_1_0
from lightning.data.streaming.data_processor import DataChunkRecipe, DataProcessor, DataTransformRecipe
from lightning.data.streaming.resolver import (
Dir,
Expand Down Expand Up @@ -169,8 +169,8 @@ def map(
output_dir: The folder where the processed data should be stored.
num_workers: The number of workers to use during processing
fast_dev_run: Whether to use process only a sub part of the inputs
num_nodes: When doing remote execution, the number of nodes to use.
machine: When doing remote execution, the machine to use.
num_nodes: When doing remote execution, the number of nodes to use. Only supported on https://lightning.ai/.
machine: When doing remote execution, the machine to use. Only supported on https://lightning.ai/.
num_downloaders: The number of downloaders per worker.
reorder_files: By default, reorders the files by file size to distribute work equally among all workers.
Set this to ``False`` if the order in which samples are processed should be preserved.
Expand All @@ -183,6 +183,17 @@ def map(
if len(inputs) == 0:
raise ValueError(f"The provided inputs should be non empty. Found {inputs}.")

if not _IS_IN_STUDIO and (machine is not None or num_nodes is not None):
raise ValueError(
"Only https://lightning.ai/ supports multiple nodes or selecting a machine."
" Create an account to try it out.")

if not _IS_IN_STUDIO:
print(
"Create an account on https://lightning.ai/ to transform your data faster using "
"multiple nodes and large machines."
)

if num_nodes is None or int(os.getenv("DATA_OPTIMIZER_NUM_NODES", 0)) > 0:
_output_dir: Dir = _resolve_dir(output_dir)

Expand Down Expand Up @@ -242,8 +253,8 @@ def optimize(
compression: The compression algorithm to use over the chunks.
num_workers: The number of workers to use during processing
fast_dev_run: Whether to use process only a sub part of the inputs
num_nodes: When doing remote execution, the number of nodes to use.
machine: When doing remote execution, the machine to use.
num_nodes: When doing remote execution, the number of nodes to use. Only supported on https://lightning.ai/.
machine: When doing remote execution, the machine to use. Only supported on https://lightning.ai/.
num_downloaders: The number of downloaders per worker.
reorder_files: By default, reorders the files by file size to distribute work equally among all workers.
Set this to ``False`` if the order in which samples are processed should be preserved.
Expand All @@ -258,6 +269,18 @@ def optimize(
if chunk_size is None and chunk_bytes is None:
raise ValueError("Either `chunk_size` or `chunk_bytes` needs to be defined.")

if not _IS_IN_STUDIO and (machine is not None or num_nodes is not None):
raise ValueError(
"Only https://lightning.ai/ supports multiple nodes or selecting a machine."
"Create an account to try it out."
)

if not _IS_IN_STUDIO:
print(
"Create an account on https://lightning.ai/ to optimize your data faster "
"using multiple nodes and large machines."
)

if num_nodes is None or int(os.getenv("DATA_OPTIMIZER_NUM_NODES", 0)) > 0:
_output_dir: Dir = _resolve_dir(output_dir)

Expand Down Expand Up @@ -312,6 +335,9 @@ def __init__(self, folder: str, max_workers: Optional[int] = os.cpu_count()) ->
self.max_workers = max_workers or 1
self.futures: List[concurrent.futures.Future] = []

if not _IS_IN_STUDIO:
print("This method is optimized to run on https://lightning.ai/. Don't use it otherwise.")

def __iter__(self) -> Any:
"""This function queues the folders to perform listdir across multiple workers."""
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
Expand Down
Loading