16
16
from typing import TYPE_CHECKING , Any , Callable , Dict , List , Optional , Union
17
17
18
18
from distilabel .distiset import create_distiset
19
+ from distilabel .llms .vllm import vLLM
19
20
from distilabel .pipeline .base import BasePipeline
20
21
from distilabel .pipeline .constants import INPUT_QUEUE_ATTR_NAME
21
22
from distilabel .pipeline .step_wrapper import _StepWrapper
26
27
from os import PathLike
27
28
from queue import Queue
28
29
30
+ from ray .util .scheduling_strategies import PlacementGroupSchedulingStrategy
31
+
29
32
from distilabel .distiset import Distiset
30
33
from distilabel .pipeline .typing import InputDataset
31
34
from distilabel .steps .base import _Step
@@ -69,6 +72,7 @@ def __init__(
69
72
70
73
self ._ray_head_node_url = ray_head_node_url
71
74
self ._ray_init_kwargs = ray_init_kwargs or {}
75
+ self ._ray_node_ids = {}
72
76
73
77
def run (
74
78
self ,
@@ -171,6 +175,8 @@ def _init_ray(self) -> None:
171
175
else :
172
176
ray .init (** self ._ray_init_kwargs )
173
177
178
+ self ._ray_node_ids = {node ["NodeID" ]: False for node in ray .nodes ()}
179
+
174
180
@property
175
181
def QueueClass (self ) -> Callable :
176
182
from ray .util .queue import Queue
@@ -218,17 +224,20 @@ def run(self) -> str:
218
224
"name" : f"distilabel-{ self .name } -{ step .name } -{ replica } "
219
225
}
220
226
221
- if step .resources .cpus is not None :
222
- resources ["num_cpus" ] = step .resources .cpus
227
+ if hasattr (step , "llm" ) and isinstance (step .llm , vLLM ): # type: ignore
228
+ resources ["scheduling_strategy" ] = self ._create_vllm_placement_group (step )
229
+ else :
230
+ if step .resources .cpus is not None :
231
+ resources ["num_cpus" ] = step .resources .cpus
223
232
224
- if step .resources .gpus is not None :
225
- resources ["num_gpus" ] = step .resources .gpus
233
+ if step .resources .gpus is not None :
234
+ resources ["num_gpus" ] = step .resources .gpus
226
235
227
- if step .resources .memory is not None :
228
- resources ["memory" ] = step .resources .memory
236
+ if step .resources .memory is not None :
237
+ resources ["memory" ] = step .resources .memory
229
238
230
- if step .resources .resources is not None :
231
- resources ["resources" ] = step .resources .resources
239
+ if step .resources .resources is not None :
240
+ resources ["resources" ] = step .resources .resources
232
241
233
242
_StepWrapperRay = _StepWrapperRay .options (** resources ) # type: ignore
234
243
@@ -255,6 +264,57 @@ def run(self) -> str:
255
264
)
256
265
step_wrapper .run .remote ()
257
266
267
+ def _create_vllm_placement_group (
268
+ self , step : "_Step"
269
+ ) -> "PlacementGroupSchedulingStrategy" :
270
+ """Creates a Ray placement group with as many GPU bundles as `tensor_parallel_size`
271
+ specified in the `vLLM` initialisation. The created placement group uses the `STRICT_PACK`
272
+ strategy if the `pipeline_parallel_size` is less or equal to 1, otherwise it uses
273
+ `SPREAD` (placement group with GPU bundles in several nodes). In addition, the created
274
+ placement group is targeted to be created in a specific node. This avoids having
275
+ `vLLM` raising the exception `Ray does not allocate any GPUs on the driver node...`,
276
+ as it assures that the driver `_StepWrapperRay` actor created resides in the same
277
+ node as the ray actors created by `vLLM` for the distributed inference.
278
+
279
+ Args:
280
+ step: the step which uses `vLLM`.
281
+
282
+ Returns:
283
+ A `PlacementGroupSchedulingStrategy` using the created `PlacementGroup`.
284
+ """
285
+ import ray
286
+
287
+ llm = step .llm # type: ignore
288
+ tensor_parallel_size = llm .extra_kwargs .get ("tensor_parallel_size" , 1 ) # type: ignore
289
+ pipeline_parallel_size = llm .extra_kwargs .get ( # type: ignore
290
+ "pipeline_parallel_size" , 1
291
+ )
292
+
293
+ node_id = next (
294
+ node_id for node_id , used in self ._ray_node_ids .items () if not used
295
+ )
296
+
297
+ self ._ray_node_ids [node_id ] = True
298
+
299
+ # Create a placement group
300
+ pg = ray .util .placement_group (
301
+ # Create `tensor_parallel_size` GPU bundles and at least one CPU bundle
302
+ # so the actors can be scheduled and executed (1 CPU bundle can have infinite actors):
303
+ # https://docs.ray.io/en/latest/ray-core/scheduling/placement-group.html#schedule-tasks-and-actors-to-placement-groups-use-reserved-resources
304
+ bundles = [{"CPU" : 1 }] + [{"GPU" : 1 }] * tensor_parallel_size ,
305
+ strategy = "SPREAD" if pipeline_parallel_size > 1 else "STRICT_PACK" ,
306
+ _soft_target_node_id = node_id if pipeline_parallel_size is None else None ,
307
+ )
308
+
309
+ self ._logger .info (
310
+ f"Step '{ step .name } ' uses `vLLM`. Created a Ray placement group with bundle"
311
+ f" specs: { pg .bundle_specs } "
312
+ )
313
+
314
+ return ray .util .scheduling_strategies .PlacementGroupSchedulingStrategy ( # type: ignore
315
+ placement_group = pg ,
316
+ )
317
+
258
318
def _teardown (self ) -> None :
259
319
"""Clean/release/stop resources reserved to run the pipeline."""
260
320
if self ._write_buffer :
0 commit comments