diff --git a/docs/api/step/generator_step.md b/docs/api/step/generator_step.md index 949202eefd..09042b36aa 100644 --- a/docs/api/step/generator_step.md +++ b/docs/api/step/generator_step.md @@ -5,3 +5,5 @@ This section contains the API reference for the [`GeneratorStep`][distilabel.ste For more information and examples on how to use existing generator steps or create custom ones, please refer to [Tutorial - Step - GeneratorStep](../../sections/how_to_guides/basic/step/generator_step.md). ::: distilabel.steps.base.GeneratorStep + +::: distilabel.steps.generators.utils.make_generator_step diff --git a/docs/sections/getting_started/quickstart.md b/docs/sections/getting_started/quickstart.md index f982ae319f..4fd7de607b 100644 --- a/docs/sections/getting_started/quickstart.md +++ b/docs/sections/getting_started/quickstart.md @@ -67,3 +67,31 @@ if __name__ == "__main__": 7. We run the pipeline with the parameters for the `load_dataset` and `text_generation` steps. The `load_dataset` step will use the repository `distilabel-internal-testing/instruction-dataset-mini` and the `test` split, and the `text_generation` task will use the `generation_kwargs` with the `temperature` set to `0.7` and the `max_new_tokens` set to `512`. 8. Optionally, we can push the generated [`Distiset`][distilabel.distiset.Distiset] to the Hugging Face Hub repository `distilabel-example`. This will allow you to share the generated dataset with others and use it in other pipelines. + +## Minimal example + +`distilabel` gives a lot of flexibility to create your pipelines, but to start right away, you can omit a lot of the details and let default values: + +```python +from distilabel.llms import InferenceEndpointsLLM +from distilabel.pipeline import Pipeline +from distilabel.steps.tasks import TextGeneration +from datasets import load_dataset + + +dataset = load_dataset("distilabel-internal-testing/instruction-dataset-mini", split="test") + +with Pipeline() as pipeline: # (1) + TextGeneration(llm=InferenceEndpointsLLM(model_id="meta-llama/Meta-Llama-3.1-8B-Instruct")) # (2) + + +if __name__ == "__main__": + distiset = pipeline.run(dataset=dataset) # (3) + distiset.push_to_hub(repo_id="distilabel-example") +``` + +1. The [`Pipeline`][distilabel.pipeline.Pipeline] can take no arguments and generate a default name on it's own that will be tracked internally. + +2. Just as with the [`Pipeline`][distilabel.pipeline.Pipeline], the [`Step`][distilabel.steps.base.Step]s don't explicitly need a name. + +3. You can generate the dataset as you would normally do with Hugging Face and pass the dataset to the run method. diff --git a/docs/sections/how_to_guides/basic/pipeline/index.md b/docs/sections/how_to_guides/basic/pipeline/index.md index 82a4447855..2d03f9ea87 100644 --- a/docs/sections/how_to_guides/basic/pipeline/index.md +++ b/docs/sections/how_to_guides/basic/pipeline/index.md @@ -29,6 +29,56 @@ with Pipeline("pipe-name", description="My first pipe") as pipeline: ... ``` +!!! Tip "Easily load your datasets" + + If you are already used to work with Hugging Face's `Dataset` via `load_dataset` or `pd.DataFrame`, you can create the `GeneratorStep` directly from the dataset (or dataframe), and create the step with the help of [`make_generator_step`][distilabel.steps.generators.utils.make_generator_step]: + + === "From a list of dicts" + + ```python + from distilabel.pipeline import Pipeline + from distilabel.steps import make_generator_step + + dataset = [{"instruction": "Tell me a joke."}] + + with Pipeline("pipe-name", description="My first pipe") as pipeline: + loader = make_generator_step(dataset, output_mappings={"prompt": "instruction"}) + ... + ``` + + === "From `datasets.Dataset`" + + ```python + from datasets import load_dataset + from distilabel.pipeline import Pipeline + from distilabel.steps import make_generator_step + + dataset = load_dataset( + "DIBT/10k_prompts_ranked", + split="train" + ).filter( + lambda r: r["avg_rating"]>=4 and r["num_responses"]>=2 + ).select(range(500)) + + with Pipeline("pipe-name", description="My first pipe") as pipeline: + loader = make_generator_step(dataset, output_mappings={"prompt": "instruction"}) + ... + ``` + + === "From `pd.DataFrame`" + + ```python + import pandas as pd + from distilabel.pipeline import Pipeline + from distilabel.steps import make_generator_step + + dataset = pd.read_csv("path/to/dataset.csv") + + with Pipeline("pipe-name", description="My first pipe") as pipeline: + loader = make_generator_step(dataset, output_mappings={"prompt": "instruction"}) + ... + ``` + Next, we will use `prompt` column from the dataset obtained through `LoadDataFromHub` and use several `LLM`s to execute a `TextGeneration` task. We will also use the `Task.connect()` method to connect the steps, so the output of one step is the input of the next one. !!! NOTE @@ -282,6 +332,54 @@ if __name__ == "__main__": distiset.push_to_hub("distilabel-internal-testing/instruction-dataset-mini-with-generations") ``` +#### Pipeline.run with a dataset + +Note that in most cases if you don't need the extra flexibility the [`GeneratorSteps`][distilabel.steps.base.GeneratorStep] bring you, you can create a dataset as you would normally do and pass it to the [Pipeline.run][distilabel.pipeline.base.BasePipeline.run] method directly. Look at the highlighted lines to see the updated lines: + +```python hl_lines="11-14 33 38" +import random +from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM +from distilabel.pipeline import Pipeline, routing_batch_function +from distilabel.steps import GroupColumns +from distilabel.steps.tasks import TextGeneration + +@routing_batch_function +def sample_two_steps(steps: list[str]) -> list[str]: + return random.sample(steps, 2) + +dataset = load_dataset( + "distilabel-internal-testing/instruction-dataset-mini", + split="test" +) + +with Pipeline("pipe-name", description="My first pipe") as pipeline: + tasks = [] + for llm in ( + OpenAILLM(model="gpt-4-0125-preview"), + MistralLLM(model="mistral-large-2402"), + VertexAILLM(model="gemini-1.0-pro"), + ): + tasks.append( + TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm) + ) + + combine_generations = GroupColumns( + name="combine_generations", + columns=["generation", "model_name"], + output_columns=["generations", "model_names"], + ) + + sample_two_steps >> tasks >> combine_generations + + +if __name__ == "__main__": + distiset = pipeline.run( + dataset=dataset, + parameters=... + ) +``` + + ### Stopping the pipeline In case you want to stop the pipeline while it's running, you can press ++ctrl+c++ or ++cmd+c++ depending on your OS (or send a `SIGINT` to the main process), and the outputs will be stored in the cache. Pressing an additional time will force the pipeline to stop its execution, but this can lead to losing the generated outputs for certain batches. diff --git a/src/distilabel/pipeline/_dag.py b/src/distilabel/pipeline/_dag.py index c2c28f6334..4cb6bd76ba 100644 --- a/src/distilabel/pipeline/_dag.py +++ b/src/distilabel/pipeline/_dag.py @@ -45,7 +45,7 @@ if TYPE_CHECKING: from distilabel.mixins.runtime_parameters import RuntimeParametersNames - from distilabel.steps.base import Step, _Step + from distilabel.steps.base import GeneratorStep, Step, _Step class DAG(_Serializable): @@ -146,6 +146,16 @@ def add_edge(self, from_step: str, to_step: str) -> None: self.G.add_edge(from_step, to_step) + def add_root_step(self, step: "GeneratorStep") -> None: + """Adds a root step, helper method used when a pipeline receives a dataset in the run + method. + + Args: + step: The generator step that will be set as the new root. + """ + self.add_step(step) + self.add_edge(step.name, next(iter(self))) + @cached_property def root_steps(self) -> Set[str]: """The steps that don't have any predecessors i.e. generator steps. diff --git a/src/distilabel/pipeline/base.py b/src/distilabel/pipeline/base.py index e854dce49e..0bf8463255 100644 --- a/src/distilabel/pipeline/base.py +++ b/src/distilabel/pipeline/base.py @@ -53,6 +53,8 @@ STEP_ATTR_NAME, ) from distilabel.pipeline.write_buffer import _WriteBuffer +from distilabel.steps.base import GeneratorStep +from distilabel.steps.generators.utils import make_generator_step from distilabel.utils.logging import setup_logging, stop_logging from distilabel.utils.serialization import ( TYPE_INFO_KEY, @@ -66,7 +68,11 @@ from distilabel.distiset import Distiset from distilabel.pipeline.routing_batch_function import RoutingBatchFunction - from distilabel.pipeline.typing import PipelineRuntimeParametersInfo, StepLoadStatus + from distilabel.pipeline.typing import ( + InputDataset, + PipelineRuntimeParametersInfo, + StepLoadStatus, + ) from distilabel.steps.base import Step, _Step class _CacheLocation(TypedDict): @@ -290,6 +296,7 @@ def run( use_cache: bool = True, storage_parameters: Optional[Dict[str, Any]] = None, use_fs_to_pass_data: bool = False, + dataset: Optional["InputDataset"] = None, ) -> "Distiset": # type: ignore """Run the pipeline. It will set the runtime parameters for the steps and validate the pipeline. @@ -313,6 +320,9 @@ def run( the `_Batch`es between the steps. Even if this parameter is `False`, the `Batch`es received by `GlobalStep`s will always use the file system to pass the data. Defaults to `False`. + dataset: If given, it will be used to create a `GeneratorStep` and put it as the + root step. Convenient method when you have already processed the dataset in + your script and just want to pass it already processed. Defaults to `None`. Returns: The `Distiset` created by the pipeline. @@ -329,6 +339,9 @@ def run( log_queue=self._log_queue, filename=str(self._cache_location["log_file"]) ) + if dataset is not None: + self._add_dataset_generator_step(dataset) + # Validate the pipeline DAG to check that all the steps are chainable, there are # no missing runtime parameters, batch sizes are correct, etc. self.dag.validate() @@ -412,6 +425,27 @@ def dry_run( self._dry_run = False return distiset + def _add_dataset_generator_step(self, dataset: "InputDataset") -> None: + """Create a root step to work as the `GeneratorStep` for the pipeline using a + dataset. + + Args: + dataset: A dataset that will be used to create a `GeneratorStep` and + placed in the DAG as the root step. + + Raises: + ValueError: If there's already a `GeneratorStep` in the pipeline. + """ + for step_name in self.dag: + step = self.dag.get_step(step_name)[STEP_ATTR_NAME] + if isinstance(step_name, GeneratorStep): + raise ValueError( + "There is already a `GeneratorStep` in the pipeline, you can either pass a `dataset` to the " + f"run method, or create a `GeneratorStep` explictly. `GeneratorStep`: {step}" + ) + loader = make_generator_step(dataset) + self.dag.add_root_step(loader) + def get_runtime_parameters_info(self) -> "PipelineRuntimeParametersInfo": """Get the runtime parameters for the steps in the pipeline. diff --git a/src/distilabel/pipeline/local.py b/src/distilabel/pipeline/local.py index 2f39ac3182..1f9836838f 100644 --- a/src/distilabel/pipeline/local.py +++ b/src/distilabel/pipeline/local.py @@ -33,6 +33,7 @@ from queue import Queue from distilabel.distiset import Distiset + from distilabel.pipeline.typing import InputDataset from distilabel.steps.base import _Step @@ -125,6 +126,7 @@ def run( use_cache: bool = True, storage_parameters: Optional[Dict[str, Any]] = None, use_fs_to_pass_data: bool = False, + dataset: Optional["InputDataset"] = None, ) -> "Distiset": """Runs the pipeline. @@ -144,6 +146,9 @@ def run( the `_Batch`es between the steps. Even if this parameter is `False`, the `Batch`es received by `GlobalStep`s will always use the file system to pass the data. Defaults to `False`. + dataset: If given, it will be used to create a `GeneratorStep` and put it as the + root step. Convenient method when you have already processed the dataset in + your script and just want to pass it already processed. Defaults to `None`. Returns: The `Distiset` created by the pipeline. @@ -158,12 +163,17 @@ def run( use_cache=use_cache, storage_parameters=storage_parameters, use_fs_to_pass_data=use_fs_to_pass_data, + dataset=dataset, ) self._log_queue = cast("Queue[Any]", mp.Queue()) if distiset := super().run( - parameters, use_cache, storage_parameters, use_fs_to_pass_data + parameters, + use_cache, + storage_parameters, + use_fs_to_pass_data, + dataset=dataset, ): return distiset diff --git a/src/distilabel/pipeline/ray.py b/src/distilabel/pipeline/ray.py index 7ec0dab9d3..06d51e90e5 100644 --- a/src/distilabel/pipeline/ray.py +++ b/src/distilabel/pipeline/ray.py @@ -27,6 +27,7 @@ from queue import Queue from distilabel.distiset import Distiset + from distilabel.pipeline.typing import InputDataset from distilabel.steps.base import _Step @@ -75,6 +76,7 @@ def run( use_cache: bool = True, storage_parameters: Optional[Dict[str, Any]] = None, use_fs_to_pass_data: bool = False, + dataset: Optional["InputDataset"] = None, ) -> "Distiset": """Runs the pipeline in the Ray cluster. @@ -94,6 +96,9 @@ def run( the `_Batch`es between the steps. Even if this parameter is `False`, the `Batch`es received by `GlobalStep`s will always use the file system to pass the data. Defaults to `False`. + dataset: If given, it will be used to create a `GeneratorStep` and put it as the + root step. Convenient method when you have already processed the dataset in + your script and just want to pass it already processed. Defaults to `None`. Returns: The `Distiset` created by the pipeline. @@ -108,7 +113,11 @@ def run( ) if distiset := super().run( - parameters, use_cache, storage_parameters, use_fs_to_pass_data + parameters, + use_cache, + storage_parameters, + use_fs_to_pass_data, + dataset=dataset, ): return distiset diff --git a/src/distilabel/pipeline/typing.py b/src/distilabel/pipeline/typing.py index e73d20e8ab..690acecaae 100644 --- a/src/distilabel/pipeline/typing.py +++ b/src/distilabel/pipeline/typing.py @@ -15,6 +15,9 @@ from typing import TYPE_CHECKING, Dict, List, Literal, TypedDict, TypeVar, Union if TYPE_CHECKING: + import pandas as pd + from datasets import Dataset + from distilabel.mixins.runtime_parameters import RuntimeParameterInfo from distilabel.steps.base import GeneratorStep, GlobalStep, Step @@ -47,3 +50,6 @@ class StepLoadStatus(TypedDict): str, Union[List["RuntimeParameterInfo"], Dict[str, "RuntimeParameterInfo"]] ] """Alias for the information of the runtime parameters of a `Pipeline`.""" + +InputDataset = Union["Dataset", "pd.DataFrame", List[Dict[str, str]]] +"""Alias for the types we can process as input dataset.""" diff --git a/src/distilabel/steps/__init__.py b/src/distilabel/steps/__init__.py index 39951a4643..8ba07a4710 100644 --- a/src/distilabel/steps/__init__.py +++ b/src/distilabel/steps/__init__.py @@ -44,6 +44,7 @@ LoadDataFromFileSystem, LoadDataFromHub, ) +from distilabel.steps.generators.utils import make_generator_step from distilabel.steps.globals.huggingface import PushToHub from distilabel.steps.typing import GeneratorStepOutput, StepOutput @@ -70,6 +71,7 @@ "LoadDataFromDisk", "LoadDataFromFileSystem", "LoadDataFromHub", + "make_generator_step", "PushToHub", "Step", "StepInput", diff --git a/src/distilabel/steps/generators/huggingface.py b/src/distilabel/steps/generators/huggingface.py index 4b8d046c9a..dad37ac2c4 100644 --- a/src/distilabel/steps/generators/huggingface.py +++ b/src/distilabel/steps/generators/huggingface.py @@ -130,6 +130,9 @@ class LoadDataFromHub(GeneratorStep): def load(self) -> None: """Load the dataset from the Hugging Face Hub""" super().load() + if self._dataset is not None: + # Here to simplify the functionality of distilabel.steps.generators.util.make_generator_step + return self._dataset = load_dataset( self.repo_id, # type: ignore diff --git a/src/distilabel/steps/generators/utils.py b/src/distilabel/steps/generators/utils.py new file mode 100644 index 0000000000..b9e111d9b9 --- /dev/null +++ b/src/distilabel/steps/generators/utils.py @@ -0,0 +1,80 @@ +# Copyright 2023-present, Argilla, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import TYPE_CHECKING, Dict, List, Optional, Union + +import pandas as pd +from datasets import Dataset + +from distilabel.steps.base import StepResources + +if TYPE_CHECKING: + from distilabel.steps import GeneratorStep + + +def make_generator_step( + dataset: Union[Dataset, pd.DataFrame, List[Dict[str, str]]], + batch_size: int = 50, + input_mappings: Optional[Dict[str, str]] = None, + output_mappings: Optional[Dict[str, str]] = None, + resources: StepResources = StepResources(), +) -> "GeneratorStep": + """Helper method to create a `GeneratorStep` from a dataset, to simplify + + Args: + dataset: The dataset to use in the `Pipeline`. + batch_size: The batch_size, will default to the same used by the `GeneratorStep`s. + Defaults to `50`. + input_mappings: Applies the same as any other step. Defaults to `None`. + output_mappings: Applies the same as any other step. Defaults to `None`. + resources: Applies the same as any other step. Defaults to `StepResources()`. + + Raises: + ValueError: If the format is different from the ones supported. + + Returns: + A `LoadDataFromDicts` if the input is a list of dicts, or `LoadDataFromHub` instance + if the input is a `pd.DataFrame` or a `Dataset`. + """ + from distilabel.steps import LoadDataFromDicts, LoadDataFromHub + + if isinstance(dataset, list): + return LoadDataFromDicts( + data=dataset, + batch_size=batch_size, + input_mappings=input_mappings or {}, + output_mappings=output_mappings or {}, + resources=resources, + ) + + if isinstance(dataset, pd.DataFrame): + dataset = Dataset.from_pandas(dataset, preserve_index=False) + + if not isinstance(dataset, Dataset): + raise ValueError( + f"Dataset type not allowed: {type(dataset)}, must be one of: " + "`datasets.Dataset`, `pd.DataFrame`, `List[Dict[str, str]]`" + ) + + loader = LoadDataFromHub( + repo_id="placeholder_name", + batch_size=batch_size, + input_mappings=input_mappings or {}, + output_mappings=output_mappings or {}, + resources=resources, + ) + loader._dataset = dataset + loader.num_examples = len(dataset) + loader._dataset_info = {"default": dataset.info} + return loader diff --git a/tests/integration/test_dataset_without_step.py b/tests/integration/test_dataset_without_step.py new file mode 100644 index 0000000000..b272752fe7 --- /dev/null +++ b/tests/integration/test_dataset_without_step.py @@ -0,0 +1,85 @@ +# Copyright 2023-present, Argilla, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import TYPE_CHECKING, Dict, List, Union + +import pandas as pd +import pytest +from datasets import Dataset +from distilabel.pipeline import Pipeline +from distilabel.steps import make_generator_step +from distilabel.steps.base import Step, StepInput +from distilabel.steps.typing import StepOutput + +if TYPE_CHECKING: + pass + + +class DummyStep(Step): + @property + def inputs(self) -> List[str]: + return ["instruction"] + + @property + def outputs(self) -> List[str]: + return ["response"] + + def process(self, inputs: StepInput) -> StepOutput: # type: ignore + for input in inputs: + input["response"] = "unit test" + yield inputs + + +data = [{"instruction": "Tell me a joke."}] * 10 + + +@pytest.mark.parametrize("dataset", (data, Dataset.from_list(data), pd.DataFrame(data))) +def test_pipeline_with_dataset_from_function( + dataset: Union[Dataset, pd.DataFrame, List[Dict[str, str]]], +) -> None: + with Pipeline(name="pipe-nothing") as pipeline: + load_dataset = make_generator_step(dataset) + if isinstance(dataset, (pd.DataFrame, Dataset)): + assert isinstance(load_dataset._dataset, Dataset) + + dummy = DummyStep() + load_dataset >> dummy + + distiset = pipeline.run(use_cache=False) + assert len(distiset["default"]["train"]) == 10 + + +@pytest.mark.parametrize("dataset", (data, Dataset.from_list(data), pd.DataFrame(data))) +def test_pipeline_run_without_generator_step( + dataset: Union[Dataset, pd.DataFrame, List[Dict[str, str]]], +) -> None: + with Pipeline(name="pipe-nothing") as pipeline: + DummyStep() + assert len(pipeline.dag) == 1 + + distiset = pipeline.run(use_cache=False, dataset=dataset) + assert len(distiset["default"]["train"]) == 10 + assert len(pipeline.dag) == 2 + + +if __name__ == "__main__": + with Pipeline(name="pipe-nothing") as pipeline: + data = [{"instruction": "Tell me a joke."}] * 10 + load_dataset = make_generator_step(Dataset.from_list(data)) + + dummy = DummyStep() + load_dataset >> dummy + + distiset = pipeline.run(use_cache=False) + print(distiset) diff --git a/tests/unit/pipeline/test_base.py b/tests/unit/pipeline/test_base.py index d0e954b586..cec1e54ee5 100644 --- a/tests/unit/pipeline/test_base.py +++ b/tests/unit/pipeline/test_base.py @@ -1150,6 +1150,20 @@ def process(self, inputs: StepInput) -> StepOutput: # type: ignore gen_step >> step1_0 >> step2 pipeline.run() + def test_pipeline_with_dataset_and_generator_step(self): + with pytest.raises(ValueError) as exc_info: + with DummyPipeline(name="unit-test-pipeline") as pipeline: + gen_step = DummyGeneratorStep() + step1_0 = DummyStep1() + gen_step >> step1_0 + + pipeline.run( + use_cache=False, dataset=[{"instruction": "Tell me a joke."}] * 10 + ) + exc_info.value.args[0].startswith( + "There is already a `GeneratorStep` in the pipeline" + ) + def test_optional_name(self): import random diff --git a/tests/unit/steps/generators/test_utils.py b/tests/unit/steps/generators/test_utils.py new file mode 100644 index 0000000000..2ec2b59b82 --- /dev/null +++ b/tests/unit/steps/generators/test_utils.py @@ -0,0 +1,41 @@ +# Copyright 2023-present, Argilla, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict, List, Union + +import pandas as pd +import pytest +from datasets import Dataset +from distilabel.steps import make_generator_step + +data = [{"instruction": "Tell me a joke."}] * 10 + + +@pytest.mark.parametrize("dataset", (data, Dataset.from_list(data), pd.DataFrame(data))) +def test_make_generator_step( + dataset: Union[Dataset, pd.DataFrame, List[Dict[str, str]]], +): + batch_size = 5 + load_dataset = make_generator_step( + dataset, batch_size=batch_size, output_mappings={"instruction": "other"} + ) + load_dataset.load() + result = next(load_dataset.process()) + assert len(result[0]) == batch_size + if isinstance(dataset, (pd.DataFrame, Dataset)): + assert isinstance(load_dataset._dataset, Dataset) + else: + assert isinstance(load_dataset.data, list) + + assert load_dataset.output_mappings == {"instruction": "other"}