From 0f27d4f16fc87569ef34b02ce42e3b85a4dead20 Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Mon, 27 Jan 2025 10:13:12 -0500 Subject: [PATCH] [V1][Metrics] Hook up IterationStats Follow on from #12416 Pass IterationStats to the stat logger, and log them in both the logging and prometheus loggers. For the logging stat logger, we need to calculate the throughput based on the number of tokens in the particular logging interval. In the prometheus logger, we just need to record the prompt and generation tokens in a counter. Note, v0 had a vllm:tokens_total counter registered that apparently was never logged to, so I've omitted it in v1. Signed-off-by: Mark McLoughlin --- tests/entrypoints/openai/test_metrics.py | 7 ++- vllm/v1/engine/async_llm.py | 3 +- vllm/v1/metrics/loggers.py | 68 ++++++++++++++++++++---- 3 files changed, 66 insertions(+), 12 deletions(-) diff --git a/tests/entrypoints/openai/test_metrics.py b/tests/entrypoints/openai/test_metrics.py index 469a5fb039f..64deaedf0f2 100644 --- a/tests/entrypoints/openai/test_metrics.py +++ b/tests/entrypoints/openai/test_metrics.py @@ -105,8 +105,6 @@ async def client(server): @pytest.mark.asyncio async def test_metrics_counts(server: RemoteOpenAIServer, client: openai.AsyncClient, use_v1: bool): - if use_v1: - pytest.skip("Skipping test on vllm V1") for _ in range(_NUM_REQUESTS): # sending a request triggers the metrics to be logged. await client.completions.create( @@ -120,6 +118,9 @@ async def test_metrics_counts(server: RemoteOpenAIServer, # Loop over all expected metric_families for metric_family, suffix_values_list in EXPECTED_VALUES.items(): + if use_v1 and metric_family not in EXPECTED_METRICS_V1: + continue + found_metric = False # Check to see if the metric_family is found in the prom endpoint. @@ -199,6 +200,8 @@ async def test_metrics_counts(server: RemoteOpenAIServer, EXPECTED_METRICS_V1 = [ "vllm:num_requests_running", "vllm:num_requests_waiting", + "vllm:prompt_tokens_total", + "vllm:generation_tokens_total", ] diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 917d52d3220..022b6d0668e 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -305,7 +305,8 @@ def _log_stats( return for logger in self.stat_loggers: - logger.log(scheduler_stats=scheduler_stats) + logger.log(scheduler_stats=scheduler_stats, + iteration_stats=iteration_stats) def encode( self, diff --git a/vllm/v1/metrics/loggers.py b/vllm/v1/metrics/loggers.py index b84f03fa326..6a7bb423749 100644 --- a/vllm/v1/metrics/loggers.py +++ b/vllm/v1/metrics/loggers.py @@ -1,11 +1,12 @@ import time from abc import ABC, abstractmethod -from typing import Dict +from typing import Dict, List +import numpy as np import prometheus_client from vllm.logger import init_logger -from vllm.v1.metrics.stats import SchedulerStats +from vllm.v1.metrics.stats import IterationStats, SchedulerStats logger = init_logger(__name__) @@ -15,27 +16,61 @@ class StatLoggerBase(ABC): @abstractmethod - def log(self, scheduler_stats: SchedulerStats): + def log(self, scheduler_stats: SchedulerStats, + iteration_stats: IterationStats): ... class LoggingStatLogger(StatLoggerBase): def __init__(self): - self.last_log_time = time.monotonic() + self._reset(time.monotonic()) - def log(self, scheduler_stats: SchedulerStats): - """Log Stats to standard output.""" + def _reset(self, now): + self.last_log_time = now + + # Tracked stats over current local logging interval. + self.num_prompt_tokens: List[int] = [] + self.num_generation_tokens: List[int] = [] + def _local_interval_elapsed(self, now: float) -> bool: # Log every _LOCAL_LOGGING_INTERVAL_SEC. + elapsed_time = now - self.last_log_time + return elapsed_time > _LOCAL_LOGGING_INTERVAL_SEC + + def _track_iteration_stats(self, iteration_stats: IterationStats): + # Save tracked stats for token counters. + self.num_prompt_tokens.append(iteration_stats.num_prompt_tokens) + self.num_generation_tokens.append( + iteration_stats.num_generation_tokens) + + def _get_throughput(self, tracked_stats: List[int], now: float) -> float: + # Compute summary metrics for tracked stats + return float(np.sum(tracked_stats) / (now - self.last_log_time)) + + def log(self, scheduler_stats: SchedulerStats, + iteration_stats: IterationStats): + """Log Stats to standard output.""" + + self._track_iteration_stats(iteration_stats) + now = time.monotonic() - if now - self.last_log_time < _LOCAL_LOGGING_INTERVAL_SEC: + if not self._local_interval_elapsed(now): return - self.last_log_time = now + + prompt_throughput = self._get_throughput(self.num_prompt_tokens, now) + generation_throughput = self._get_throughput( + self.num_generation_tokens, now) + + self._reset(now) # Format and print output. logger.info( + "Avg prompt throughput: %.1f tokens/s, " + "Avg generation throughput: %.1f tokens/s, " "Running: %d reqs, Waiting: %d reqs ", + prompt_throughput, + generation_throughput, scheduler_stats.num_running_reqs, scheduler_stats.num_waiting_reqs, ) @@ -61,11 +96,26 @@ def __init__(self, labels: Dict[str, str]): documentation="Number of requests waiting to be processed.", labelnames=labelnames).labels(*labelvalues) - def log(self, scheduler_stats: SchedulerStats): + self.counter_prompt_tokens = prometheus_client.Counter( + name="vllm:prompt_tokens_total", + documentation="Number of prefill tokens processed.", + labelnames=labelnames).labels(*labelvalues) + + self.counter_generation_tokens = prometheus_client.Counter( + name="vllm:generation_tokens_total", + documentation="Number of generation tokens processed.", + labelnames=labelnames).labels(*labelvalues) + + def log(self, scheduler_stats: SchedulerStats, + iteration_stats: IterationStats): """Log to prometheus.""" self.gauge_scheduler_running.set(scheduler_stats.num_running_reqs) self.gauge_scheduler_waiting.set(scheduler_stats.num_waiting_reqs) + self.counter_prompt_tokens.inc(iteration_stats.num_prompt_tokens) + self.counter_generation_tokens.inc( + iteration_stats.num_generation_tokens) + @staticmethod def _unregister_vllm_metrics(): # Unregister any existing vLLM collectors (for CI/CD