Skip to content

Commit efd333a

Browse files
authored
[dashboard][train] Add dynolog for on-demand GPU profiling for Torch training (#53191)
Add on-demand GPU profiling endpoint to the dashboard reporter API at `/worker/gpu_profile`. Here's the list of parameters that are available: ``` /worker/gpu_profile?node_ip=xxx.x.x.x&pid=xxxxx&num_iterations=x ``` ## Design * Functionality depends on dynolog binaries (`dynolog`, `dyno`) being installed on the image already. * Launch a singleton dynolog daemon monitoring process on every node during dashboard agent startup, listening at port `65406`. * The `/worker/gpu_profile` request gets propagated to the ReporterAgent on the correct node, which makes a call to `dyno gputrace --pids=<train_worker_pid> ...` and then waits for the trace file to be dumped. * The request then redirects to the streaming log download API to download the trace on the client (browser). --------- Signed-off-by: Justin Yu <justinvyu@anyscale.com>
1 parent 27a1dfc commit efd333a

File tree

5 files changed

+784
-26
lines changed

5 files changed

+784
-26
lines changed
Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
import asyncio
2+
import functools
3+
import logging
4+
import os
5+
from datetime import datetime
6+
from pathlib import Path
7+
import shutil
8+
import socket
9+
import subprocess
10+
from typing import Optional, Tuple
11+
12+
import psutil
13+
14+
from ray.dashboard.modules.reporter.profile_manager import (
15+
_format_failed_profiler_command,
16+
)
17+
18+
logger = logging.getLogger(__name__)
19+
20+
21+
class GpuProfilingManager:
22+
"""GPU profiling manager for Ray Dashboard.
23+
24+
NOTE: The current implementation is based on the `dynolog` OSS project,
25+
but these are mostly implementation details that can be changed in the future.
26+
`dynolog` needs to be installed on the nodes where profiling is being done.
27+
28+
This only supports Torch training scripts with KINETO_USE_DAEMON=1 set.
29+
It is not supported for other frameworks.
30+
"""
31+
32+
# Port for the monitoring daemon.
33+
# This port was chosen arbitrarily to a value to avoid conflicts.
34+
_DYNOLOG_PORT = 65406
35+
36+
# Default timeout for the profiling operation.
37+
_DEFAULT_TIMEOUT_S = 5 * 60
38+
39+
_NO_PROCESSES_MATCHED_ERROR_MESSAGE_PREFIX = "No processes were matched"
40+
41+
_DISABLED_ERROR_MESSAGE = (
42+
"GPU profiling is not enabled on node {ip_address}. "
43+
"This is the case if no GPUs are detected on the node or if "
44+
"the profiling dependency `dynolog` is not installed on the node.\n"
45+
"Please ensure that GPUs are available on the node and that "
46+
"`dynolog` is installed."
47+
)
48+
_NO_PROCESSES_MATCHED_ERROR_MESSAGE = (
49+
"The profiling command failed for pid={pid} on node {ip_address}. "
50+
"There are a few potential reasons for this:\n"
51+
"1. The `KINETO_USE_DAEMON=1 KINETO_DAEMON_INIT_DELAY_S=5` environment variables "
52+
"are not set for the training worker processes.\n"
53+
"2. The process requested for profiling is not running a "
54+
"PyTorch training script. GPU profiling is only supported for "
55+
"PyTorch training scripts, typically launched via "
56+
"`ray.train.torch.TorchTrainer`."
57+
)
58+
_DEAD_PROCESS_ERROR_MESSAGE = (
59+
"The requested process to profile with pid={pid} on node "
60+
"{ip_address} is no longer running. "
61+
"GPU profiling is not available for this process."
62+
)
63+
64+
def __init__(self, profile_dir_path: str):
65+
# Dump trace files to: /tmp/ray/session_latest/logs/profiles/
66+
self._root_log_dir = Path(profile_dir_path)
67+
self._profile_dir_path = self._root_log_dir / "profiles"
68+
self._daemon_log_file_path = (
69+
self._profile_dir_path / f"dynolog_daemon_{os.getpid()}.log"
70+
)
71+
72+
hostname = socket.gethostname()
73+
self._ip_address = socket.gethostbyname(hostname)
74+
75+
self._dynolog_bin = shutil.which("dynolog")
76+
self._dyno_bin = shutil.which("dyno")
77+
78+
self._dynolog_daemon_process: Optional[subprocess.Popen] = None
79+
80+
if not self.node_has_gpus():
81+
logger.warning(
82+
"[GpuProfilingManager] No GPUs found on this node, GPU profiling will not be setup."
83+
)
84+
if not self._dynolog_bin or not self._dyno_bin:
85+
logger.warning(
86+
"[GpuProfilingManager] `dynolog` is not installed, GPU profiling will not be available."
87+
)
88+
89+
self._profile_dir_path.mkdir(parents=True, exist_ok=True)
90+
91+
@property
92+
def enabled(self) -> bool:
93+
return (
94+
self.node_has_gpus()
95+
and self._dynolog_bin is not None
96+
and self._dyno_bin is not None
97+
)
98+
99+
@property
100+
def is_monitoring_daemon_running(self) -> bool:
101+
return (
102+
self._dynolog_daemon_process is not None
103+
and self._dynolog_daemon_process.poll() is None
104+
)
105+
106+
@classmethod
107+
@functools.cache
108+
def node_has_gpus(cls) -> bool:
109+
try:
110+
subprocess.check_output(["nvidia-smi"], stderr=subprocess.DEVNULL)
111+
return True
112+
except (subprocess.CalledProcessError, FileNotFoundError):
113+
return False
114+
115+
@classmethod
116+
def is_pid_alive(cls, pid: int) -> bool:
117+
try:
118+
return psutil.pid_exists(pid) and psutil.Process(pid).is_running()
119+
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
120+
return False
121+
122+
def start_monitoring_daemon(self):
123+
"""Start the GPU profiling monitoring daemon if it's possible.
124+
This must be called before profiling.
125+
"""
126+
127+
if not self.enabled:
128+
logger.warning(
129+
"[GpuProfilingManager] GPU profiling is disabled, skipping daemon setup."
130+
)
131+
return
132+
133+
if self.is_monitoring_daemon_running:
134+
logger.warning(
135+
"[GpuProfilingManager] GPU profiling monitoring daemon is already running."
136+
)
137+
return
138+
139+
try:
140+
with open(self._daemon_log_file_path, "ab") as log_file:
141+
daemon = subprocess.Popen(
142+
[
143+
self._dynolog_bin,
144+
"--enable_ipc_monitor",
145+
"--port",
146+
str(self._DYNOLOG_PORT),
147+
],
148+
stdout=log_file,
149+
stderr=log_file,
150+
stdin=subprocess.DEVNULL,
151+
start_new_session=True,
152+
)
153+
except (FileNotFoundError, PermissionError, OSError) as e:
154+
logger.error(
155+
f"[GpuProfilingManager] Failed to launch GPU profiling monitoring daemon: {e}\n"
156+
f"Check error log for more details: {self._daemon_log_file_path}"
157+
)
158+
return
159+
160+
logger.info(
161+
"[GpuProfilingManager] Launched GPU profiling monitoring daemon "
162+
f"(pid={daemon.pid}, port={self._DYNOLOG_PORT})\n"
163+
f"Redirecting logs to: {self._daemon_log_file_path}"
164+
)
165+
self._dynolog_daemon_process = daemon
166+
167+
def _get_trace_filename(self) -> str:
168+
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
169+
return f"gputrace_{self._ip_address}_{timestamp}.json"
170+
171+
async def gpu_profile(
172+
self, pid: int, num_iterations: int, _timeout_s: int = _DEFAULT_TIMEOUT_S
173+
) -> Tuple[bool, str]:
174+
"""
175+
Perform GPU profiling on a specified process.
176+
177+
Args:
178+
pid: The process ID (PID) of the target process to be profiled.
179+
num_iterations: The number of iterations to profile.
180+
_timeout_s: Maximum time in seconds to wait for profiling to complete.
181+
This is an advanced parameter that catches edge cases where the
182+
profiling request never completes and hangs indefinitely.
183+
184+
Returns:
185+
Tuple[bool, str]: A tuple containing a boolean indicating the success
186+
of the profiling operation and a string with the
187+
filepath of the trace file relative to the root log directory,
188+
or an error message.
189+
"""
190+
if not self.enabled:
191+
return False, self._DISABLED_ERROR_MESSAGE.format(
192+
ip_address=self._ip_address
193+
)
194+
195+
if not self._dynolog_daemon_process:
196+
raise RuntimeError("Must call `start_monitoring_daemon` before profiling.")
197+
198+
if not self.is_monitoring_daemon_running:
199+
error_msg = (
200+
f"GPU monitoring daemon (pid={self._dynolog_daemon_process.pid}) "
201+
f"is not running on node {self._ip_address}. "
202+
f"See log for more details: {self._daemon_log_file_path}"
203+
)
204+
logger.error(f"[GpuProfilingManager] {error_msg}")
205+
return False, error_msg
206+
207+
if not self.is_pid_alive(pid):
208+
error_msg = self._DEAD_PROCESS_ERROR_MESSAGE.format(
209+
pid=pid, ip_address=self._ip_address
210+
)
211+
logger.error(f"[GpuProfilingManager] {error_msg}")
212+
return False, error_msg
213+
214+
trace_file_name = self._get_trace_filename()
215+
trace_file_path = self._profile_dir_path / trace_file_name
216+
217+
cmd = [
218+
self._dyno_bin,
219+
"--port",
220+
str(self._DYNOLOG_PORT),
221+
"gputrace",
222+
"--pids",
223+
str(pid),
224+
"--log-file",
225+
str(trace_file_path),
226+
"--process-limit",
227+
str(1),
228+
"--iterations",
229+
str(num_iterations),
230+
]
231+
232+
process = await asyncio.create_subprocess_exec(
233+
*cmd,
234+
stdout=subprocess.PIPE,
235+
stderr=subprocess.PIPE,
236+
)
237+
stdout, stderr = await process.communicate()
238+
if process.returncode != 0:
239+
return False, _format_failed_profiler_command(cmd, "dyno", stdout, stderr)
240+
241+
stdout_str = stdout.decode("utf-8")
242+
logger.info(f"[GpuProfilingManager] Launched profiling: {stdout_str}")
243+
244+
# The initial launch command returns immediately,
245+
# so wait for the profiling to actually finish before returning.
246+
# The indicator of the profiling finishing is the creation of the trace file,
247+
# when the completed trace is moved from <prefix>.tmp.json -> <prefix>.json
248+
# If the profiling request is invalid (e.g. "No processes were matched"),
249+
# the trace file will not be created and this will hang indefinitely,
250+
# up until the timeout is reached.
251+
252+
# TODO(ml-team): This logic is brittle, we should find a better way to do this.
253+
if self._NO_PROCESSES_MATCHED_ERROR_MESSAGE_PREFIX in stdout_str:
254+
error_msg = self._NO_PROCESSES_MATCHED_ERROR_MESSAGE.format(
255+
pid=pid, ip_address=self._ip_address
256+
)
257+
logger.error(f"[GpuProfilingManager] {error_msg}")
258+
return False, error_msg
259+
260+
# The actual trace file gets dumped with a suffix of `_{pid}.json
261+
trace_file_name_pattern = trace_file_name.replace(".json", "*.json")
262+
263+
return await self._wait_for_trace_file(pid, trace_file_name_pattern, _timeout_s)
264+
265+
async def _wait_for_trace_file(
266+
self,
267+
pid: int,
268+
trace_file_name_pattern: str,
269+
timeout_s: int,
270+
sleep_interval_s: float = 0.25,
271+
) -> Tuple[bool, str]:
272+
"""Wait for the trace file to be created.
273+
274+
Args:
275+
pid: The target process to be profiled.
276+
trace_file_name_pattern: The pattern of the trace file to be created
277+
within the `<log_dir>/profiles` directory.
278+
timeout_s: Maximum time in seconds to wait for profiling to complete.
279+
sleep_interval_s: Time in seconds to sleep between checking for the trace file.
280+
281+
Returns:
282+
Tuple[bool, str]: (success, trace file path relative to the *root* log directory)
283+
"""
284+
remaining_timeout_s = timeout_s
285+
286+
logger.info(
287+
"[GpuProfilingManager] Waiting for trace file to be created "
288+
f"with the pattern: {trace_file_name_pattern}"
289+
)
290+
291+
while True:
292+
dumped_trace_file_path = next(
293+
self._profile_dir_path.glob(trace_file_name_pattern), None
294+
)
295+
if dumped_trace_file_path is not None:
296+
break
297+
298+
await asyncio.sleep(sleep_interval_s)
299+
300+
remaining_timeout_s -= sleep_interval_s
301+
if remaining_timeout_s <= 0:
302+
return (
303+
False,
304+
f"GPU profiling timed out after {timeout_s} seconds, please try again.",
305+
)
306+
307+
# If the process has already exited, return an error.
308+
if not self.is_pid_alive(pid):
309+
return (
310+
False,
311+
self._DEAD_PROCESS_ERROR_MESSAGE.format(
312+
pid=pid, ip_address=self._ip_address
313+
),
314+
)
315+
316+
logger.info(
317+
f"[GpuProfilingManager] GPU profiling finished, trace file: {dumped_trace_file_path}"
318+
)
319+
return True, str(dumped_trace_file_path.relative_to(self._root_log_dir))

python/ray/dashboard/modules/reporter/reporter_agent.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
CpuProfilingManager,
3737
MemoryProfilingManager,
3838
)
39+
from ray.dashboard.modules.reporter.gpu_profile_manager import GpuProfilingManager
3940

4041
import psutil
4142

@@ -418,6 +419,9 @@ def __init__(self, dashboard_agent):
418419
thread_name_prefix="reporter_agent_executor",
419420
)
420421

422+
self._gpu_profiling_manager = GpuProfilingManager(self._log_dir)
423+
self._gpu_profiling_manager.start_monitoring_daemon()
424+
421425
async def GetTraceback(self, request, context):
422426
pid = request.pid
423427
native = request.native
@@ -436,6 +440,14 @@ async def CpuProfiling(self, request, context):
436440
)
437441
return reporter_pb2.CpuProfilingReply(output=output, success=success)
438442

443+
async def GpuProfiling(self, request, context):
444+
pid = request.pid
445+
num_iterations = request.num_iterations
446+
success, output = await self._gpu_profiling_manager.gpu_profile(
447+
pid=pid, num_iterations=num_iterations
448+
)
449+
return reporter_pb2.GpuProfilingReply(success=success, output=output)
450+
439451
async def MemoryProfiling(self, request, context):
440452
pid = request.pid
441453
format = request.format
@@ -535,9 +547,11 @@ def decode(b: Union[str, bytes]) -> str:
535547
processes_pids = [
536548
ProcessGPUInfo(
537549
pid=int(nv_process.pid),
538-
gpu_memory_usage=int(nv_process.usedGpuMemory) // MB
539-
if nv_process.usedGpuMemory
540-
else 0,
550+
gpu_memory_usage=(
551+
int(nv_process.usedGpuMemory) // MB
552+
if nv_process.usedGpuMemory
553+
else 0
554+
),
541555
)
542556
for nv_process in (nv_comp_processes + nv_graphics_processes)
543557
]

0 commit comments

Comments
 (0)