Skip to content

Create a GeneratorStep from a dataset using a helper function #812

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1a71823
Add helper function to create generator step from dataset
plaguss Jul 23, 2024
911c9e2
Add integration tests for make_generator_step
plaguss Jul 23, 2024
2928845
Redirect import
plaguss Jul 23, 2024
484c568
Update LoadDataFromHub to not call load if a dataset is already defined
plaguss Jul 23, 2024
fcab8b6
Update docs
plaguss Jul 23, 2024
7b4a15a
Add unit tests for the new helper function
plaguss Jul 23, 2024
3c046e5
Update filename to utils
plaguss Jul 23, 2024
1eb7e9c
Add helper method to insert a root step
plaguss Jul 24, 2024
608da33
Add logic to create a generator step internally from a dataset
plaguss Jul 24, 2024
2d4aa49
Pass the dataset variable from all the pipeline implementations
plaguss Jul 24, 2024
50cbac5
Add type for the input datasets
plaguss Jul 24, 2024
b734fe8
Avoid circular imports
plaguss Jul 24, 2024
0c32758
Add test for pipelines with generator step and dataset
plaguss Jul 24, 2024
941d8bb
Add integration tests for dataset passed via run method
plaguss Jul 24, 2024
47906cc
Fix error evaluation dataframe
plaguss Jul 24, 2024
4fe0b34
Add example on quickstart and entry on how to guide
plaguss Jul 24, 2024
4c414fc
Update docs/sections/getting_started/quickstart.md
plaguss Jul 24, 2024
28435b2
Update docs/sections/getting_started/quickstart.md
plaguss Jul 24, 2024
0c32127
Update src/distilabel/pipeline/base.py
plaguss Jul 24, 2024
1acc0e1
Update src/distilabel/pipeline/ray.py
plaguss Jul 24, 2024
dd6d427
Update src/distilabel/steps/generators/utils.py
plaguss Jul 24, 2024
da72207
Update src/distilabel/steps/generators/utils.py
plaguss Jul 24, 2024
5a23778
Update src/distilabel/pipeline/local.py
plaguss Jul 24, 2024
da3deed
Respect import order
plaguss Jul 24, 2024
dab2662
Move functionality to a proper internal method
plaguss Jul 24, 2024
d7b4768
Run linter
plaguss Jul 24, 2024
ae85770
Merge branch 'develop' into pipeline-with-dataset
davidberenstein1957 Jul 25, 2024
11c1412
Merge branch 'develop' into pipeline-with-dataset
gabrielmbmb Jul 29, 2024
64e4ff2
Fix format
gabrielmbmb Jul 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/api/step/generator_step.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ This section contains the API reference for the [`GeneratorStep`][distilabel.ste
For more information and examples on how to use existing generator steps or create custom ones, please refer to [Tutorial - Step - GeneratorStep](../../sections/how_to_guides/basic/step/generator_step.md).

::: distilabel.steps.base.GeneratorStep

::: distilabel.steps.generators.utils.make_generator_step
28 changes: 28 additions & 0 deletions docs/sections/getting_started/quickstart.md
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we could leave here the new example that you've added @plaguss as it's the quicker way to get started. WDYT @dvsrepo @davidberenstein1957 ?

Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,31 @@ if __name__ == "__main__":
7. We run the pipeline with the parameters for the `load_dataset` and `text_generation` steps. The `load_dataset` step will use the repository `distilabel-internal-testing/instruction-dataset-mini` and the `test` split, and the `text_generation` task will use the `generation_kwargs` with the `temperature` set to `0.7` and the `max_new_tokens` set to `512`.

8. Optionally, we can push the generated [`Distiset`][distilabel.distiset.Distiset] to the Hugging Face Hub repository `distilabel-example`. This will allow you to share the generated dataset with others and use it in other pipelines.

## Minimal example

`Distilabel` gives a lot of flexibility to create your pipelines, but to start right away, you can omit a lot of the details and let default values:

```python
from distilabel.llms import OpenAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps.tasks import TextGeneration
from datasets import load_dataset


dataset = load_dataset("distilabel-internal-testing/instruction-dataset-mini", split="test")

with Pipeline() as pipeline: # (1)
TextGeneration(llm=OpenAILLM(model="gpt-3.5-turbo")) # (2)


if __name__ == "__main__":
distiset = pipeline.run(dataset=dataset) # (3)
distiset.push_to_hub(repo_id="distilabel-example")
```

1. The [`Pipeline`][distilabel.pipeline.Pipeline] can take no arguments and generate a default name on it's own that will be tracked internally.

2. Just as with the [`Pipeline`][distilabel.pipeline.Pipeline], the [`Step`][distilabel.steps.base.Step]s don't explicitly need a name.

3. You can generate the dataset as you would normally do with Hugging Face and pass the dataset to the run method.
98 changes: 98 additions & 0 deletions docs/sections/how_to_guides/basic/pipeline/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,56 @@ with Pipeline("pipe-name", description="My first pipe") as pipeline:
...
```

!!! Tip "Easily load your datasets"

If you are already used to work with Hugging Face's `Dataset` via `load_dataset` or `pd.DataFrame`, you can create the `GeneratorStep` directly from the dataset (or dataframe), and create the step with the help of [`make_generator_step`][distilabel.steps.generators.utils.make_generator_step]:

=== "From a list of dicts"

```python
from distilabel.pipeline import Pipeline
from distilabel.steps import make_generator_step

dataset = [{"instruction": "Tell me a joke."}]

with Pipeline("pipe-name", description="My first pipe") as pipeline:
loader = make_generator_step(dataset, output_mappings={"prompt": "instruction"})
...
```

=== "From `datasets.Dataset`"

```python
from datasets import load_dataset
from distilabel.pipeline import Pipeline
from distilabel.steps import make_generator_step

dataset = load_dataset(
"DIBT/10k_prompts_ranked",
split="train"
).filter(
lambda r: r["avg_rating"]>=4 and r["num_responses"]>=2
).select(range(500))

with Pipeline("pipe-name", description="My first pipe") as pipeline:
loader = make_generator_step(dataset, output_mappings={"prompt": "instruction"})
...
```

=== "From `pd.DataFrame`"

```python
import pandas as pd
from distilabel.pipeline import Pipeline
from distilabel.steps import make_generator_step

dataset = pd.read_csv("path/to/dataset.csv")

with Pipeline("pipe-name", description="My first pipe") as pipeline:
loader = make_generator_step(dataset, output_mappings={"prompt": "instruction"})
...
```

Next, we will use `prompt` column from the dataset obtained through `LoadDataFromHub` and use several `LLM`s to execute a `TextGeneration` task. We will also use the `Task.connect()` method to connect the steps, so the output of one step is the input of the next one.

!!! NOTE
Expand Down Expand Up @@ -282,6 +332,54 @@ if __name__ == "__main__":
distiset.push_to_hub("distilabel-internal-testing/instruction-dataset-mini-with-generations")
```

#### Pipeline.run with a dataset

Note that in most cases if you don't need the extra flexibility the [`GeneratorSteps`][distilabel.steps.base.GeneratorStep] bring you, you can create a dataset as you would normally do and pass it to the [Pipeline.run][distilabel.pipeline.base.BasePipeline.run] method directly. Look at the highlighted lines to see the updated lines:

```python hl_lines="11-14 33 38"
import random
from distilabel.llms import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline, routing_batch_function
from distilabel.steps import GroupColumns
from distilabel.steps.tasks import TextGeneration

@routing_batch_function
def sample_two_steps(steps: list[str]) -> list[str]:
return random.sample(steps, 2)

dataset = load_dataset(
"distilabel-internal-testing/instruction-dataset-mini",
split="test"
)

with Pipeline("pipe-name", description="My first pipe") as pipeline:
tasks = []
for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.0-pro"),
):
tasks.append(
TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
)

combine_generations = GroupColumns(
name="combine_generations",
columns=["generation", "model_name"],
output_columns=["generations", "model_names"],
)

sample_two_steps >> tasks >> combine_generations


if __name__ == "__main__":
distiset = pipeline.run(
dataset=dataset,
parameters=...
)
```


### Stopping the pipeline

In case you want to stop the pipeline while it's running, you can press ++ctrl+c++ or ++cmd+c++ depending on your OS (or send a `SIGINT` to the main process), and the outputs will be stored in the cache. Pressing an additional time will force the pipeline to stop its execution, but this can lead to losing the generated outputs for certain batches.
Expand Down
12 changes: 11 additions & 1 deletion src/distilabel/pipeline/_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

if TYPE_CHECKING:
from distilabel.mixins.runtime_parameters import RuntimeParametersNames
from distilabel.steps.base import Step, _Step
from distilabel.steps.base import GeneratorStep, Step, _Step


class DAG(_Serializable):
Expand Down Expand Up @@ -146,6 +146,16 @@ def add_edge(self, from_step: str, to_step: str) -> None:

self.G.add_edge(from_step, to_step)

def add_root_step(self, step: "GeneratorStep") -> None:
"""Adds a root step, helper method used when a pipeline receives a dataset in the run
method.

Args:
step: The generator step that will be set as the new root.
"""
self.add_step(step)
self.add_edge(step.name, next(iter(self)))

@cached_property
def root_steps(self) -> Set[str]:
"""The steps that don't have any predecessors i.e. generator steps.
Expand Down
24 changes: 23 additions & 1 deletion src/distilabel/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
STEP_ATTR_NAME,
)
from distilabel.pipeline.write_buffer import _WriteBuffer
from distilabel.steps.base import GeneratorStep
from distilabel.steps.generators.utils import make_generator_step
from distilabel.utils.logging import setup_logging, stop_logging
from distilabel.utils.serialization import (
TYPE_INFO_KEY,
Expand All @@ -65,7 +67,11 @@

from distilabel.distiset import Distiset
from distilabel.pipeline.routing_batch_function import RoutingBatchFunction
from distilabel.pipeline.typing import PipelineRuntimeParametersInfo, StepLoadStatus
from distilabel.pipeline.typing import (
InputDataset,
PipelineRuntimeParametersInfo,
StepLoadStatus,
)
from distilabel.steps.base import Step, _Step

class _CacheLocation(TypedDict):
Expand Down Expand Up @@ -289,6 +295,7 @@ def run(
use_cache: bool = True,
storage_parameters: Optional[Dict[str, Any]] = None,
use_fs_to_pass_data: bool = False,
dataset: Optional["InputDataset"] = None,
) -> "Distiset": # type: ignore
"""Run the pipeline. It will set the runtime parameters for the steps and validate
the pipeline.
Expand All @@ -312,6 +319,9 @@ def run(
the `_Batch`es between the steps. Even if this parameter is `False`, the
`Batch`es received by `GlobalStep`s will always use the file system to
pass the data. Defaults to `False`.
dataset: If given, it will be used to create a `GeneratorStep` and put it as the
root step. Convenient method when you have already processed the dataset in
your script and just want to pass it already processed.

Returns:
The `Distiset` created by the pipeline.
Expand All @@ -327,6 +337,18 @@ def run(
setup_logging(
log_queue=self._log_queue, filename=str(self._cache_location["log_file"])
)
if dataset is not None:
# If a dataset was passed, create a generator step and add it as root.
# It can be done only if there isn't already a GeneratorStep in the DAG.
for step_name in self.dag:
step = self.dag.get_step(step_name)[STEP_ATTR_NAME]
if isinstance(step_name, GeneratorStep):
raise ValueError(
"There is already a `GeneratorStep` in the pipeline, you can either pass a `dataset` to the "
f"run method, or create a `GeneratorStep` explictly. `GeneratorStep`: {step}"
)
loader = make_generator_step(dataset)
self.dag.add_root_step(loader)

# Validate the pipeline DAG to check that all the steps are chainable, there are
# no missing runtime parameters, batch sizes are correct, etc.
Expand Down
12 changes: 11 additions & 1 deletion src/distilabel/pipeline/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from queue import Queue

from distilabel.distiset import Distiset
from distilabel.pipeline.typing import InputDataset
from distilabel.steps.base import _Step


Expand Down Expand Up @@ -90,6 +91,7 @@ def run(
use_cache: bool = True,
storage_parameters: Optional[Dict[str, Any]] = None,
use_fs_to_pass_data: bool = False,
dataset: Optional["InputDataset"] = None,
) -> "Distiset":
"""Runs the pipeline.

Expand All @@ -109,6 +111,9 @@ def run(
the `_Batch`es between the steps. Even if this parameter is `False`, the
`Batch`es received by `GlobalStep`s will always use the file system to
pass the data. Defaults to `False`.
dataset: If given, it will be used to create a `GeneratorStep` and put it as the
root step. Convenient method when you have already processed the dataset in
your script and just want to pass it already processed.

Returns:
The `Distiset` created by the pipeline.
Expand All @@ -123,12 +128,17 @@ def run(
use_cache=use_cache,
storage_parameters=storage_parameters,
use_fs_to_pass_data=use_fs_to_pass_data,
dataset=dataset,
)

self._log_queue = cast("Queue[Any]", mp.Queue())

if distiset := super().run(
parameters, use_cache, storage_parameters, use_fs_to_pass_data
parameters,
use_cache,
storage_parameters,
use_fs_to_pass_data,
dataset=dataset,
):
return distiset

Expand Down
11 changes: 10 additions & 1 deletion src/distilabel/pipeline/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from queue import Queue

from distilabel.distiset import Distiset
from distilabel.pipeline.typing import InputDataset
from distilabel.steps.base import _Step


Expand Down Expand Up @@ -75,6 +76,7 @@ def run(
use_cache: bool = True,
storage_parameters: Optional[Dict[str, Any]] = None,
use_fs_to_pass_data: bool = False,
dataset: Optional["InputDataset"] = None,
) -> "Distiset":
"""Runs the pipeline in the Ray cluster.

Expand All @@ -94,6 +96,9 @@ def run(
the `_Batch`es between the steps. Even if this parameter is `False`, the
`Batch`es received by `GlobalStep`s will always use the file system to
pass the data. Defaults to `False`.
dataset: If given, it will be used to create a `GeneratorStep` and put it as the
root step. Convenient method when you have already processed the dataset in
your script and just want to pass it already processed.

Returns:
The `Distiset` created by the pipeline.
Expand All @@ -108,7 +113,11 @@ def run(
)

if distiset := super().run(
parameters, use_cache, storage_parameters, use_fs_to_pass_data
parameters,
use_cache,
storage_parameters,
use_fs_to_pass_data,
dataset=dataset,
):
return distiset

Expand Down
6 changes: 6 additions & 0 deletions src/distilabel/pipeline/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from typing import TYPE_CHECKING, Dict, List, Literal, TypedDict, TypeVar, Union

if TYPE_CHECKING:
import pandas as pd
from datasets import Dataset

from distilabel.mixins.runtime_parameters import RuntimeParameterInfo
from distilabel.steps.base import GeneratorStep, GlobalStep, Step

Expand Down Expand Up @@ -47,3 +50,6 @@ class StepLoadStatus(TypedDict):
str, Union[List["RuntimeParameterInfo"], Dict[str, "RuntimeParameterInfo"]]
]
"""Alias for the information of the runtime parameters of a `Pipeline`."""

InputDataset = Union["Dataset", "pd.DataFrame", List[Dict[str, str]]]
"""Alias for the types we can process as input dataset."""
2 changes: 2 additions & 0 deletions src/distilabel/steps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
LoadDataFromFileSystem,
LoadDataFromHub,
)
from distilabel.steps.generators.utils import make_generator_step
from distilabel.steps.globals.huggingface import PushToHub
from distilabel.steps.typing import GeneratorStepOutput, StepOutput

Expand Down Expand Up @@ -72,4 +73,5 @@
"GeneratorStepOutput",
"StepOutput",
"step",
"make_generator_step",
]
3 changes: 3 additions & 0 deletions src/distilabel/steps/generators/huggingface.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ class LoadDataFromHub(GeneratorStep):
def load(self) -> None:
"""Load the dataset from the Hugging Face Hub"""
super().load()
if self._dataset is not None:
# Here to simplify the functionality of distilabel.steps.generators.util.make_generator_step
return

self._dataset = load_dataset(
self.repo_id, # type: ignore
Expand Down
Loading
Loading