Skip to content

Commit

Permalink
[RayExecutor]Check unsupported op before processing
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyuhanalex authored Mar 5, 2025
1 parent 6014bcc commit ff4b787
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions data_juicer/core/ray_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from data_juicer.config import init_configs
from data_juicer.core.ray_data import RayDataset
from data_juicer.ops import load_ops
from data_juicer.ops import Aggregator, Grouper, Selector, load_ops
from data_juicer.ops.op_fusion import fuse_operators

from .adapter import Adapter
Expand Down Expand Up @@ -53,7 +53,7 @@ def __init__(self, cfg=None):
self.adapter = Adapter(self.cfg)

# init ray
logger.info('Initing Ray ...')
logger.info('Initializing Ray ...')
ray.init(self.cfg.ray_address)
self.tmp_dir = os.path.join(self.work_dir, '.tmp',
ray.get_runtime_context().get_job_id())
Expand Down Expand Up @@ -85,6 +85,13 @@ def run(self, load_data_np=None):
logger.info('Preparing process operators...')
ops = load_ops(self.cfg.process)

# check if the ops contain unsupported operators
# raise error before processing
for op in ops:
if isinstance(op, (Selector, Grouper, Aggregator)):
raise ValueError(
f'Operator {op} is not supported in RayExecutor')

if self.cfg.op_fusion:
probe_res = None
if self.cfg.fusion_strategy == 'probe':
Expand Down

0 comments on commit ff4b787

Please sign in to comment.