Skip to content

Commit 2c1b283

Browse files
raulchenlk-chen
andauthored
[data] disable on_exit hook (#53249)
## Why are these changes needed? * The on_exit hook was introduced to allow users to perform cleanup. * However, it triggers a race condition bug in fault tolerance - after on_exit is called and the UDF is deleted, and before the actor actually exits, another retry task is submitted to the actor. * This PR disables it by default. Eventually this should be fixed in Ray Core #53169 --------- Signed-off-by: Hao Chen <chenh1024@gmail.com> Signed-off-by: lkchen <github@lkchen.net> Co-authored-by: lkchen <github@lkchen.net>
1 parent 8381a78 commit 2c1b283

File tree

4 files changed

+31
-8
lines changed

4 files changed

+31
-8
lines changed

python/ray/data/_internal/execution/operators/actor_pool_map_operator.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,10 @@ def __init__(
127127
gpu=self._ray_remote_args.get("num_gpus", 0),
128128
)
129129
self._actor_pool = _ActorPool(
130-
compute_strategy, self._start_actor, per_actor_resource_usage
130+
compute_strategy,
131+
self._start_actor,
132+
per_actor_resource_usage,
133+
self.data_context._enable_actor_pool_on_exit_hook,
131134
)
132135
# A queue of bundles awaiting dispatch to actors.
133136
self._bundle_queue = create_bundle_queue()
@@ -507,6 +510,7 @@ def __init__(
507510
compute_strategy: ActorPoolStrategy,
508511
create_actor_fn: Callable[[], Tuple[ActorHandle, ObjectRef[Any]]],
509512
per_actor_resource_usage: ExecutionResources,
513+
_enable_actor_pool_on_exit_hook: bool = False,
510514
):
511515
self._min_size: int = compute_strategy.min_size
512516
self._max_size: int = compute_strategy.max_size
@@ -530,6 +534,7 @@ def __init__(
530534
# Track locality matching stats.
531535
self._locality_hits: int = 0
532536
self._locality_misses: int = 0
537+
self._enable_actor_pool_on_exit_hook = _enable_actor_pool_on_exit_hook
533538

534539
# === Overriding methods of AutoscalingActorPool ===
535540

@@ -856,7 +861,9 @@ def _release_running_actors(self, force: bool):
856861

857862
# First release actors and collect their shutdown hook object-refs
858863
for actor in running:
859-
on_exit_refs.append(self._release_running_actor(actor))
864+
ref = self._release_running_actor(actor)
865+
if ref:
866+
on_exit_refs.append(ref)
860867

861868
# Wait for all actors to shutdown gracefully before killing them
862869
ray.wait(on_exit_refs, timeout=self._ACTOR_POOL_GRACEFUL_SHUTDOWN_TIMEOUT_S)
@@ -882,9 +889,12 @@ def _release_running_actor(
882889
if actor not in self._running_actors:
883890
return None
884891

885-
# Call `on_exit` to trigger `UDF.__del__` which may perform
886-
# cleanup operations.
887-
ref = actor.on_exit.remote()
892+
if self._enable_actor_pool_on_exit_hook:
893+
# Call `on_exit` to trigger `UDF.__del__` which may perform
894+
# cleanup operations.
895+
ref = actor.on_exit.remote()
896+
else:
897+
ref = None
888898
del self._running_actors[actor]
889899

890900
return ref

python/ray/data/context.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,12 @@ class DataContext:
438438
override_object_store_memory_limit_fraction: float = None
439439
memory_usage_poll_interval_s: Optional[float] = 1
440440
dataset_logger_id: Optional[str] = None
441+
# This is a temporary workaround to allow actors to perform cleanup
442+
# until https://github.com/ray-project/ray/issues/53169 is fixed.
443+
# This hook is known to have a race condition bug in fault tolerance.
444+
# I.E., after the hook is triggered and the UDF is deleted, another
445+
# retry task may still be scheduled to this actor and it will fail.
446+
_enable_actor_pool_on_exit_hook: bool = False
441447

442448
def __post_init__(self):
443449
# The additonal ray remote args that should be added to

python/ray/data/tests/test_map.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1648,8 +1648,11 @@ def generate_data(n_per_block: int, n_blocks: int):
16481648
assert set(ds.to_pandas()["item"].to_list()) == set(expected.tolist())
16491649

16501650

1651-
def test_actor_udf_cleanup(ray_start_regular_shared, tmp_path):
1651+
def test_actor_udf_cleanup(ray_start_regular_shared, tmp_path, restore_data_context):
16521652
"""Test that for the actor map operator, the UDF object is deleted properly."""
1653+
ctx = DataContext.get_current()
1654+
ctx._enable_actor_pool_on_exit_hook = True
1655+
16531656
test_file = tmp_path / "test.txt"
16541657

16551658
# Simulate the case that the UDF depends on some external resources that

python/ray/llm/_internal/batch/processor/base.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,12 @@ def __init__(
135135
self.stages: OrderedDict[str, StatefulStage] = OrderedDict()
136136

137137
# FIXES: https://github.com/ray-project/ray/issues/53124
138-
# TODO (Kourosh): Remove this once the issue is fixed.
139-
ray.data.DataContext.get_current().wait_for_min_actors_s = 600
138+
# TODO (Kourosh): Remove this once the issue is fixed
139+
data_context = ray.data.DataContext.get_current()
140+
data_context.wait_for_min_actors_s = 600
141+
# TODO: Remove this when https://github.com/ray-project/ray/issues/53169
142+
# is fixed.
143+
data_context._enable_actor_pool_on_exit_hook = True
140144

141145
# NOTE (Kourosh): If pre/postprocess is not provided, use the identity function.
142146
# Wrapping is required even if they are identity functions, b/c data_column

0 commit comments

Comments
 (0)