15
15
from vllm .v1 .engine .parallel_sampling import ParentRequest
16
16
from vllm .v1 .metrics .stats import (IterationStats , LoRARequestStates ,
17
17
RequestStateStats )
18
+ from vllm .config import ObservabilityConfig
19
+ from vllm .tracing import (SpanAttributes , SpanKind , extract_trace_context ,
20
+ init_tracer )
18
21
19
22
20
23
class RequestOutputCollector :
@@ -64,28 +67,27 @@ def get_nowait(self) -> Optional[RequestOutput]:
64
67
65
68
@dataclass
66
69
class OutputProcessorOutput :
67
-
68
70
request_outputs : list [RequestOutput ]
69
71
reqs_to_abort : list [str ]
70
72
71
73
72
74
class RequestState :
73
75
74
76
def __init__ (
75
- self ,
76
- request_id : str ,
77
- parent_req : Optional [ParentRequest ],
78
- request_index : int ,
79
- lora_name : Optional [str ],
80
- output_kind : RequestOutputKind ,
81
- prompt : Optional [str ],
82
- prompt_token_ids : list [int ],
83
- logprobs_processor : LogprobsProcessor ,
84
- detokenizer : IncrementalDetokenizer ,
85
- max_tokens_param : Optional [int ],
86
- arrival_time : float ,
87
- queue : Optional [RequestOutputCollector ],
88
- log_stats : bool ,
77
+ self ,
78
+ request_id : str ,
79
+ parent_req : Optional [ParentRequest ],
80
+ request_index : int ,
81
+ lora_name : Optional [str ],
82
+ output_kind : RequestOutputKind ,
83
+ prompt : Optional [str ],
84
+ prompt_token_ids : list [int ],
85
+ logprobs_processor : LogprobsProcessor ,
86
+ detokenizer : IncrementalDetokenizer ,
87
+ max_tokens_param : Optional [int ],
88
+ arrival_time : float ,
89
+ queue : Optional [RequestOutputCollector ],
90
+ log_stats : bool ,
89
91
):
90
92
self .request_id = request_id
91
93
self .parent_req = parent_req
@@ -106,14 +108,14 @@ def __init__(
106
108
107
109
@classmethod
108
110
def from_new_request (
109
- cls ,
110
- tokenizer : AnyTokenizer ,
111
- request : EngineCoreRequest ,
112
- prompt : Optional [str ],
113
- parent_req : Optional [ParentRequest ],
114
- request_index : int ,
115
- queue : Optional [RequestOutputCollector ],
116
- log_stats : bool ,
111
+ cls ,
112
+ tokenizer : AnyTokenizer ,
113
+ request : EngineCoreRequest ,
114
+ prompt : Optional [str ],
115
+ parent_req : Optional [ParentRequest ],
116
+ request_index : int ,
117
+ queue : Optional [RequestOutputCollector ],
118
+ log_stats : bool ,
117
119
) -> "RequestState" :
118
120
if not request .sampling_params .detokenize :
119
121
tokenizer = None
@@ -142,10 +144,10 @@ def from_new_request(
142
144
)
143
145
144
146
def make_request_output (
145
- self ,
146
- new_token_ids : list [int ],
147
- finish_reason : Optional [FinishReason ],
148
- stop_reason : Union [int , str , None ],
147
+ self ,
148
+ new_token_ids : list [int ],
149
+ finish_reason : Optional [FinishReason ],
150
+ stop_reason : Union [int , str , None ],
149
151
) -> Optional [RequestOutput ]:
150
152
151
153
finished = finish_reason is not None
@@ -170,10 +172,10 @@ def make_request_output(
170
172
return self ._new_request_output (request_id , outputs , finished )
171
173
172
174
def _new_request_output (
173
- self ,
174
- request_id : str ,
175
- outputs : list [CompletionOutput ],
176
- finished : bool ,
175
+ self ,
176
+ request_id : str ,
177
+ outputs : list [CompletionOutput ],
178
+ finished : bool ,
177
179
) -> RequestOutput :
178
180
179
181
if self .output_kind == RequestOutputKind .DELTA :
@@ -192,10 +194,10 @@ def _new_request_output(
192
194
)
193
195
194
196
def _new_completion_output (
195
- self ,
196
- token_ids : list [int ],
197
- finish_reason : Optional [FinishReason ],
198
- stop_reason : Union [int , str , None ],
197
+ self ,
198
+ token_ids : list [int ],
199
+ finish_reason : Optional [FinishReason ],
200
+ stop_reason : Union [int , str , None ],
199
201
) -> CompletionOutput :
200
202
201
203
finished = finish_reason is not None
@@ -225,15 +227,26 @@ class OutputProcessor:
225
227
"""Process EngineCoreOutputs into RequestOutputs."""
226
228
227
229
def __init__ (
228
- self ,
229
- tokenizer : TokenizerGroup ,
230
- log_stats : bool ,
230
+ self ,
231
+ tokenizer : TokenizerGroup ,
232
+ log_stats : bool ,
233
+ observability_config : Optional [ObservabilityConfig ] = None
231
234
):
232
235
self .log_stats = log_stats
233
236
self .tokenizer = tokenizer
234
237
self .request_states : dict [str , RequestState ] = {}
235
238
self .parent_requests : dict [str , ParentRequest ] = {}
236
239
self .lora_states = LoRARequestStates ()
240
+ self .observability_config = observability_config
241
+
242
+ self .tracer = None
243
+ if self .observability_config is not None and self .observability_config .otlp_traces_endpoint :
244
+ self .tracer = init_tracer (
245
+ "vllm.llm_engine" ,
246
+ self .observability_config .otlp_traces_endpoint )
247
+
248
+ def is_tracing_enabled (self ) -> bool :
249
+ return self .tracer is not None
237
250
238
251
def get_num_unfinished_requests (self ):
239
252
return len (self .request_states )
@@ -249,8 +262,8 @@ def propagate_error(self, e: Exception):
249
262
state .queue .put (e )
250
263
251
264
def abort_requests (
252
- self ,
253
- request_ids : Iterable [str ],
265
+ self ,
266
+ request_ids : Iterable [str ],
254
267
) -> list [str ]:
255
268
request_ids_to_abort = []
256
269
for request_id in request_ids :
@@ -266,12 +279,12 @@ def abort_requests(
266
279
return request_ids_to_abort
267
280
268
281
def add_request (
269
- self ,
270
- request : EngineCoreRequest ,
271
- prompt : Optional [str ],
272
- parent_req : Optional [ParentRequest ] = None ,
273
- request_index : int = 0 ,
274
- queue : Optional [RequestOutputCollector ] = None ,
282
+ self ,
283
+ request : EngineCoreRequest ,
284
+ prompt : Optional [str ],
285
+ parent_req : Optional [ParentRequest ] = None ,
286
+ request_index : int = 0 ,
287
+ queue : Optional [RequestOutputCollector ] = None ,
275
288
) -> None :
276
289
request_id = request .request_id
277
290
if request_id in self .request_states :
@@ -291,10 +304,10 @@ def add_request(
291
304
self .parent_requests [parent_req .request_id ] = parent_req
292
305
293
306
def process_outputs (
294
- self ,
295
- engine_core_outputs : list [EngineCoreOutput ],
296
- engine_core_timestamp : Optional [float ] = None ,
297
- iteration_stats : Optional [IterationStats ] = None ,
307
+ self ,
308
+ engine_core_outputs : list [EngineCoreOutput ],
309
+ engine_core_timestamp : Optional [float ] = None ,
310
+ iteration_stats : Optional [IterationStats ] = None ,
298
311
) -> OutputProcessorOutput :
299
312
"""
300
313
Process the EngineCoreOutputs:
@@ -373,14 +386,68 @@ def process_outputs(
373
386
# Track per-request stats
374
387
self ._update_stats_from_finished (req_state , finish_reason ,
375
388
iteration_stats )
376
-
389
+ self . do_tracing ( engine_core_output , req_state , iteration_stats )
377
390
self .lora_states .update_iteration_stats (iteration_stats )
378
391
379
392
return OutputProcessorOutput (
380
393
request_outputs = request_outputs ,
381
394
reqs_to_abort = reqs_to_abort ,
382
395
)
383
396
397
+ def do_tracing (self , engine_core_output : EngineCoreOutput ,
398
+ req_state : RequestState ,
399
+ iteration_stats : Optional [IterationStats ]):
400
+ if engine_core_output .finish_reason is None or iteration_stats is None :
401
+ return
402
+ arrival_time_nano_seconds = int (req_state .stats .arrival_time * 1e9 )
403
+
404
+ trace_context = extract_trace_context (engine_core_output .trace_headers )
405
+ with tracer .start_as_current_span ("llm_request" ,
406
+ kind = SpanKind .SERVER ,
407
+ context = trace_context ,
408
+ start_time = arrival_time_nano_seconds ) as span :
409
+ metrics = req_state .stats
410
+ ttft = metrics .first_token_ts - metrics .arrival_time
411
+ e2e_time = time .time () - metrics .arrival_time
412
+ # Queued interval is from first QUEUED event to first SCHEDULED
413
+ queued_time = metrics .scheduled_ts - metrics .queued_ts
414
+
415
+ # Prefill interval is from first SCHEDULED to first NEW_TOKEN
416
+ # Any preemptions during prefill is included in the interval
417
+ prefill_time = metrics .first_token_ts - metrics .scheduled_ts
418
+
419
+ # Decode interval is from first NEW_TOKEN to last NEW_TOKEN
420
+ # Any preemptions during decode are included
421
+ decode_time = metrics .last_token_ts - metrics .first_token_ts
422
+
423
+ # Inference interval is from first SCHEDULED to last NEW_TOKEN
424
+ # Any preemptions during prefill or decode are included
425
+ inference_time = metrics .last_token_ts - metrics .scheduled_ts
426
+ span .set_attribute (SpanAttributes .GEN_AI_RESPONSE_MODEL ,
427
+ self .tokenizer .tokenizer_id )
428
+ span .set_attribute (SpanAttributes .GEN_AI_REQUEST_ID ,
429
+ req_state .request_id )
430
+ span .set_attribute (SpanAttributes .GEN_AI_REQUEST_MAX_TOKENS ,
431
+ req_state .max_tokens_param )
432
+ span .set_attribute (SpanAttributes .GEN_AI_USAGE_PROMPT_TOKENS ,
433
+ len (req_state .prompt_token_ids ))
434
+ span .set_attribute (SpanAttributes .GEN_AI_USAGE_COMPLETION_TOKENS ,
435
+ metrics .num_generation_tokens )
436
+ span .set_attribute (SpanAttributes .GEN_AI_LATENCY_TIME_IN_QUEUE ,
437
+ metrics .queued_ts - metrics .arrival_time )
438
+ span .set_attribute (SpanAttributes .GEN_AI_LATENCY_TIME_TO_FIRST_TOKEN ,
439
+ ttft )
440
+ span .set_attribute (SpanAttributes .GEN_AI_LATENCY_E2E ,
441
+ e2e_time )
442
+ span .set_attribute (SpanAttributes .GEN_AI_LATENCY_TIME_IN_QUEUE ,
443
+ queued_time )
444
+ span .set_attribute (SpanAttributes .GEN_AI_LATENCY_TIME_IN_MODEL_PREFILL ,
445
+ prefill_time )
446
+ span .set_attribute (SpanAttributes .GEN_AI_LATENCY_TIME_IN_MODEL_DECODE ,
447
+ decode_time )
448
+ span .set_attribute (SpanAttributes .GEN_AI_LATENCY_TIME_IN_MODEL_INFERENCE ,
449
+ inference_time )
450
+
384
451
def _update_stats_from_output (self , req_state : RequestState ,
385
452
engine_core_output : EngineCoreOutput ,
386
453
engine_core_timestamp : Optional [float ],
0 commit comments