@@ -98,7 +98,7 @@ def finished(self) -> bool:
98
98
return self ._finished
99
99
100
100
async def generator (
101
- self ,
101
+ self
102
102
) -> AsyncGenerator [Union [RequestOutput , EmbeddingRequestOutput ], None ]:
103
103
try :
104
104
while True :
@@ -114,9 +114,9 @@ async def generator(
114
114
115
115
@staticmethod
116
116
def _is_raisable (value : Any ):
117
- return isinstance (
118
- value , BaseException ) or (isinstance (value , type )
119
- and issubclass (value , BaseException ))
117
+ return isinstance (value , BaseException ) or \
118
+ (isinstance (value , type ) and \
119
+ issubclass (value , BaseException ))
120
120
121
121
122
122
class RequestTracker :
@@ -126,7 +126,7 @@ def __init__(self) -> None:
126
126
self ._request_streams : Dict [str , AsyncStream ] = {}
127
127
self ._aborted_requests : asyncio .Queue [str ] = asyncio .Queue ()
128
128
self ._new_requests : asyncio .Queue [Tuple [AsyncStream ,
129
- dict ]] = ( asyncio .Queue () )
129
+ dict ]] = asyncio .Queue ()
130
130
self .new_requests_event = asyncio .Event ()
131
131
132
132
def __contains__ (self , item ):
@@ -148,12 +148,11 @@ def propagate_exception(self,
148
148
for rid in tuple (self ._request_streams .keys ()):
149
149
self .abort_request (rid , exception = exc )
150
150
151
- def process_request_output (
152
- self ,
153
- request_output : Union [RequestOutput , EmbeddingRequestOutput ],
154
- * ,
155
- verbose : bool = False ,
156
- ) -> None :
151
+ def process_request_output (self ,
152
+ request_output : Union [RequestOutput ,
153
+ EmbeddingRequestOutput ],
154
+ * ,
155
+ verbose : bool = False ) -> None :
157
156
"""Process a request output from the engine."""
158
157
request_id = request_output .request_id
159
158
finished = request_output .finished
@@ -172,25 +171,21 @@ def process_request_output(
172
171
if verbose and finished :
173
172
logger .info ("Finished request %s." , request_id )
174
173
175
- def process_exception (
176
- self ,
177
- request_id : str ,
178
- exception : BaseException ,
179
- * ,
180
- verbose : bool = False ,
181
- ) -> None :
174
+ def process_exception (self ,
175
+ request_id : str ,
176
+ exception : BaseException ,
177
+ * ,
178
+ verbose : bool = False ) -> None :
182
179
"""Propagate an exception from the engine."""
183
180
if verbose :
184
181
logger .info ("Finished request %s." , request_id )
185
182
self .abort_request (request_id , exception = exception )
186
183
187
- def add_request (
188
- self ,
189
- request_id : str ,
190
- * ,
191
- verbose : bool = False ,
192
- ** engine_add_request_kwargs ,
193
- ) -> AsyncStream :
184
+ def add_request (self ,
185
+ request_id : str ,
186
+ * ,
187
+ verbose : bool = False ,
188
+ ** engine_add_request_kwargs ) -> AsyncStream :
194
189
"""Add a request to be sent to the engine on the next background
195
190
loop iteration."""
196
191
if request_id in self ._request_streams :
@@ -210,13 +205,12 @@ def add_request(
210
205
211
206
return stream
212
207
213
- def abort_request (
214
- self ,
215
- request_id : str ,
216
- * ,
217
- exception : Optional [Union [BaseException , Type [BaseException ]]] = None ,
218
- verbose : bool = False ,
219
- ) -> None :
208
+ def abort_request (self ,
209
+ request_id : str ,
210
+ * ,
211
+ exception : Optional [Union [BaseException ,
212
+ Type [BaseException ]]] = None ,
213
+ verbose : bool = False ) -> None :
220
214
"""Abort a request during next background loop iteration."""
221
215
if verbose :
222
216
logger .info ("Aborted request %s." , request_id )
@@ -293,12 +287,11 @@ async def step_async(
293
287
# This ensures that the scheduler is only called again when the current
294
288
# batch has completed.
295
289
if not self ._has_remaining_steps (seq_group_metadata_list ):
290
+
296
291
# Schedule iteration
297
- (
298
- seq_group_metadata_list ,
299
- scheduler_outputs ,
300
- allow_async_output_proc ,
301
- ) = self .scheduler [virtual_engine ].schedule ()
292
+ (seq_group_metadata_list , scheduler_outputs ,
293
+ allow_async_output_proc
294
+ ) = self .scheduler [virtual_engine ].schedule ()
302
295
303
296
ctx .seq_group_metadata_list = seq_group_metadata_list
304
297
ctx .scheduler_outputs = scheduler_outputs
@@ -312,11 +305,8 @@ async def step_async(
312
305
# cache the scheduler outputs for the next iteration if we have
313
306
# lookahead slots
314
307
self ._cache_scheduler_outputs_for_multi_step (
315
- virtual_engine ,
316
- seq_group_metadata_list ,
317
- scheduler_outputs ,
318
- allow_async_output_proc ,
319
- )
308
+ virtual_engine , seq_group_metadata_list , scheduler_outputs ,
309
+ allow_async_output_proc )
320
310
321
311
assert seq_group_metadata_list is not None
322
312
assert scheduler_outputs is not None
@@ -329,8 +319,8 @@ async def step_async(
329
319
# For supporting PP this is probably the best way to pass the
330
320
# sampled_token_ids, as a separate broadcast over all the PP stages
331
321
# will cause one virtual engine's microbatch to block the pipeline.
332
- last_sampled_token_ids = self . _get_last_sampled_token_ids (
333
- virtual_engine )
322
+ last_sampled_token_ids = \
323
+ self . _get_last_sampled_token_ids ( virtual_engine )
334
324
335
325
execute_model_req = ExecuteModelRequest (
336
326
seq_group_metadata_list = seq_group_metadata_list ,
@@ -343,8 +333,7 @@ async def step_async(
343
333
finished_requests_ids = finished_requests_ids ,
344
334
# We use ExecuteModelRequest to pass the last sampled_token_ids
345
335
# to each of the non-last PP stages for in-place prepare_input.
346
- last_sampled_token_ids = last_sampled_token_ids ,
347
- )
336
+ last_sampled_token_ids = last_sampled_token_ids )
348
337
349
338
if allow_async_output_proc :
350
339
execute_model_req .async_callback = self .async_callbacks [
@@ -371,26 +360,22 @@ async def step_async(
371
360
if not self ._has_remaining_steps (seq_group_metadata_list ):
372
361
# Clear the cache if we have finished all the steps
373
362
if self .scheduler_config .is_multi_step :
374
- self .cached_scheduler_outputs [virtual_engine ] = (
375
- SchedulerOutputState () )
363
+ self .cached_scheduler_outputs [
364
+ virtual_engine ] = SchedulerOutputState ()
376
365
377
- ctx .append_output (
378
- outputs = outputs ,
379
- seq_group_metadata_list = seq_group_metadata_list ,
380
- scheduler_outputs = scheduler_outputs ,
381
- is_async = allow_async_output_proc ,
382
- is_last_step = True ,
383
- )
366
+ ctx .append_output (outputs = outputs ,
367
+ seq_group_metadata_list = seq_group_metadata_list ,
368
+ scheduler_outputs = scheduler_outputs ,
369
+ is_async = allow_async_output_proc ,
370
+ is_last_step = True )
384
371
385
372
if outputs and allow_async_output_proc :
386
- assert (
387
- len ( outputs ) == 1
388
- ), "Async postprocessor expects only a single output set"
373
+ assert len (
374
+ outputs
375
+ ) == 1 , "Async postprocessor expects only a single output set"
389
376
self ._advance_to_next_step (
390
- outputs [0 ],
391
- seq_group_metadata_list ,
392
- scheduler_outputs .scheduled_seq_groups ,
393
- )
377
+ outputs [0 ], seq_group_metadata_list ,
378
+ scheduler_outputs .scheduled_seq_groups )
394
379
395
380
if not allow_async_output_proc :
396
381
self ._process_model_outputs (ctx = ctx )
@@ -432,11 +417,9 @@ async def add_request_async(
432
417
if lora_request is not None and not self .lora_config :
433
418
raise ValueError (f"Got lora_request { lora_request } but LoRA is "
434
419
"not enabled!" )
435
-
436
420
if priority > 0 and not self .scheduler_config .policy == "priority" :
437
421
raise ValueError (f"Got priority { priority } but "
438
422
"Priority scheduling is not enabled." )
439
-
440
423
if arrival_time is None :
441
424
arrival_time = time .time ()
442
425
@@ -484,13 +467,11 @@ class AsyncLLMEngine:
484
467
485
468
_engine_class : Type [_AsyncLLMEngine ] = _AsyncLLMEngine
486
469
487
- def __init__ (
488
- self ,
489
- * args ,
490
- log_requests : bool = True ,
491
- start_engine_loop : bool = True ,
492
- ** kwargs ,
493
- ) -> None :
470
+ def __init__ (self ,
471
+ * args ,
472
+ log_requests : bool = True ,
473
+ start_engine_loop : bool = True ,
474
+ ** kwargs ) -> None :
494
475
self .log_requests = log_requests
495
476
self .engine = self ._engine_class (* args , ** kwargs )
496
477
@@ -501,8 +482,8 @@ def __init__(
501
482
self .engine .model_config .use_async_output_proc )
502
483
503
484
if self .use_process_request_outputs_callback :
504
- self .engine .process_request_outputs_callback = weak_bind (
505
- self .process_request_outputs )
485
+ self .engine .process_request_outputs_callback = \
486
+ weak_bind ( self .process_request_outputs )
506
487
507
488
self .background_loop : Optional [asyncio .Future ] = None
508
489
# We need to keep a reference to unshielded
@@ -533,58 +514,47 @@ def _get_executor_cls(
533
514
executor_class = distributed_executor_backend
534
515
elif engine_config .device_config .device_type == "neuron" :
535
516
from vllm .executor .neuron_executor import NeuronExecutorAsync
536
-
537
517
executor_class = NeuronExecutorAsync
538
518
elif engine_config .device_config .device_type == "tpu" :
539
519
if distributed_executor_backend == "ray" :
540
520
from vllm .executor .ray_tpu_executor import RayTPUExecutorAsync
541
-
542
521
executor_class = RayTPUExecutorAsync
543
522
else :
544
523
assert distributed_executor_backend is None
545
524
from vllm .executor .tpu_executor import TPUExecutorAsync
546
-
547
525
executor_class = TPUExecutorAsync
548
526
elif engine_config .device_config .device_type == "cpu" :
549
527
from vllm .executor .cpu_executor import CPUExecutorAsync
550
-
551
528
executor_class = CPUExecutorAsync
552
529
elif engine_config .device_config .device_type == "openvino" :
553
530
assert distributed_executor_backend is None , (
554
531
"Distributed execution is not supported with "
555
532
"the OpenVINO backend." )
556
533
from vllm .executor .openvino_executor import OpenVINOExecutorAsync
557
-
558
534
executor_class = OpenVINOExecutorAsync
559
535
elif engine_config .device_config .device_type == "xpu" :
560
536
if distributed_executor_backend is None :
561
537
from vllm .executor .xpu_executor import XPUExecutorAsync
562
-
563
538
executor_class = XPUExecutorAsync
564
539
elif distributed_executor_backend == "ray" :
565
540
from vllm .executor .ray_xpu_executor import RayXPUExecutorAsync
566
-
567
541
executor_class = RayXPUExecutorAsync
568
542
elif distributed_executor_backend == "mp" :
569
543
from vllm .executor .multiproc_xpu_executor import (
570
544
MultiprocessingXPUExecutorAsync )
571
-
572
545
executor_class = MultiprocessingXPUExecutorAsync
573
546
else :
574
547
raise RuntimeError (
575
548
"Not supported distributed execution model on XPU device." )
576
549
elif distributed_executor_backend == "ray" :
577
550
from vllm .executor .ray_gpu_executor import RayGPUExecutorAsync
578
-
579
551
executor_class = RayGPUExecutorAsync
580
552
elif distributed_executor_backend == "mp" :
581
553
from vllm .executor .multiproc_gpu_executor import (
582
554
MultiprocessingGPUExecutorAsync )
583
-
584
555
executor_class = MultiprocessingGPUExecutorAsync
585
556
else :
586
557
from vllm .executor .gpu_executor import GPUExecutorAsync
587
-
588
558
executor_class = GPUExecutorAsync
589
559
return executor_class
590
560
@@ -654,8 +624,8 @@ async def get_tokenizer(
654
624
self ,
655
625
lora_request : Optional [LoRARequest ] = None ,
656
626
) -> AnyTokenizer :
657
- return await self .engine .get_tokenizer_group (
658
- ). get_lora_tokenizer_async (lora_request )
627
+ return await ( self .engine .get_tokenizer_group ().
628
+ get_lora_tokenizer_async (lora_request ) )
659
629
660
630
def start_background_loop (self ) -> None :
661
631
"""Start the background loop."""
@@ -746,8 +716,8 @@ async def run_engine_loop(engine_ref: ReferenceType):
746
716
if not engine :
747
717
return
748
718
749
- pipeline_parallel_size = (
750
- engine .engine .parallel_config .pipeline_parallel_size )
719
+ pipeline_parallel_size = \
720
+ engine .engine .parallel_config .pipeline_parallel_size
751
721
has_requests_in_progress = [False ] * pipeline_parallel_size
752
722
while True :
753
723
if not any (has_requests_in_progress ):
@@ -783,8 +753,7 @@ async def run_engine_loop(engine_ref: ReferenceType):
783
753
async with asyncio_timeout (ENGINE_ITERATION_TIMEOUT_S ):
784
754
done , _ = await asyncio .wait (
785
755
requests_in_progress ,
786
- return_when = asyncio .FIRST_COMPLETED ,
787
- )
756
+ return_when = asyncio .FIRST_COMPLETED )
788
757
for _ in range (pipeline_parallel_size ):
789
758
await asyncio .sleep (0 )
790
759
for task in done :
@@ -818,7 +787,7 @@ async def add_request(
818
787
arrival_time : Optional [float ] = None ,
819
788
lora_request : Optional [LoRARequest ] = None ,
820
789
trace_headers : Optional [Mapping [str , str ]] = None ,
821
- prompt_adapter_request : Optional [PromptAdapterRequest ] = None ,
790
+ prompt_adapter_request : Optional [PromptAdapterRequest ] = None
822
791
) -> AsyncGenerator [Union [RequestOutput , EmbeddingRequestOutput ], None ]:
823
792
if not self .is_running :
824
793
if self .start_engine_loop :
@@ -838,8 +807,7 @@ async def add_request(
838
807
arrival_time = arrival_time or time .time (),
839
808
lora_request = lora_request ,
840
809
trace_headers = trace_headers ,
841
- prompt_adapter_request = prompt_adapter_request ,
842
- )
810
+ prompt_adapter_request = prompt_adapter_request )
843
811
844
812
return stream .generator ()
845
813
@@ -850,7 +818,7 @@ async def generate(
850
818
request_id : str ,
851
819
lora_request : Optional [LoRARequest ] = None ,
852
820
trace_headers : Optional [Mapping [str , str ]] = None ,
853
- prompt_adapter_request : Optional [PromptAdapterRequest ] = None ,
821
+ prompt_adapter_request : Optional [PromptAdapterRequest ] = None
854
822
) -> AsyncGenerator [RequestOutput , None ]:
855
823
"""Generate outputs for a request.
856
824
@@ -1030,11 +998,9 @@ def _abort(self, request_id: str) -> None:
1030
998
Args:
1031
999
request_id: The unique id of the request.
1032
1000
"""
1033
- self ._request_tracker .abort_request (
1034
- request_id ,
1035
- exception = asyncio .CancelledError ,
1036
- verbose = self .log_requests ,
1037
- )
1001
+ self ._request_tracker .abort_request (request_id ,
1002
+ exception = asyncio .CancelledError ,
1003
+ verbose = self .log_requests )
1038
1004
1039
1005
async def get_model_config (self ) -> ModelConfig :
1040
1006
"""Get the model configuration of the vLLM engine."""
@@ -1057,10 +1023,9 @@ async def get_lora_config(self) -> LoRAConfig:
1057
1023
return self .engine .get_lora_config ()
1058
1024
1059
1025
async def do_log_stats (
1060
- self ,
1061
- scheduler_outputs : Optional [SchedulerOutputs ] = None ,
1062
- model_output : Optional [List [SamplerOutput ]] = None ,
1063
- ) -> None :
1026
+ self ,
1027
+ scheduler_outputs : Optional [SchedulerOutputs ] = None ,
1028
+ model_output : Optional [List [SamplerOutput ]] = None ) -> None :
1064
1029
self .engine .do_log_stats ()
1065
1030
1066
1031
async def check_health (self ) -> None :
0 commit comments