diff --git a/data_juicer/core/ray_executor.py b/data_juicer/core/ray_executor.py index 0f4f3fb2d..d95768f98 100644 --- a/data_juicer/core/ray_executor.py +++ b/data_juicer/core/ray_executor.py @@ -7,7 +7,7 @@ from data_juicer.config import init_configs from data_juicer.core.ray_data import RayDataset -from data_juicer.ops import load_ops +from data_juicer.ops import Aggregator, Grouper, Selector, load_ops from data_juicer.ops.op_fusion import fuse_operators from .adapter import Adapter @@ -53,7 +53,7 @@ def __init__(self, cfg=None): self.adapter = Adapter(self.cfg) # init ray - logger.info('Initing Ray ...') + logger.info('Initializing Ray ...') ray.init(self.cfg.ray_address) self.tmp_dir = os.path.join(self.work_dir, '.tmp', ray.get_runtime_context().get_job_id()) @@ -85,6 +85,13 @@ def run(self, load_data_np=None): logger.info('Preparing process operators...') ops = load_ops(self.cfg.process) + # check if the ops contain unsupported operators + # raise error before processing + for op in ops: + if isinstance(op, (Selector, Grouper, Aggregator)): + raise ValueError( + f'Operator {op} is not supported in RayExecutor') + if self.cfg.op_fusion: probe_res = None if self.cfg.fusion_strategy == 'probe':