From 2dd01e7798ef5a70fed8d4cafe3d5c53fd31c4a7 Mon Sep 17 00:00:00 2001 From: Haibin Date: Thu, 27 Feb 2025 15:25:51 +0800 Subject: [PATCH 1/3] for executor function call --- data_juicer/core/executor.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/data_juicer/core/executor.py b/data_juicer/core/executor.py index 7f0d93a66..67d734973 100644 --- a/data_juicer/core/executor.py +++ b/data_juicer/core/executor.py @@ -1,6 +1,6 @@ import os from time import time -from typing import Optional +from typing import Optional, Union from jsonargparse import Namespace from loguru import logger @@ -20,6 +20,7 @@ from ..ops.selector.topk_specified_field_selector import \ TopkSpecifiedFieldSelector from .adapter import Adapter +from .data import NestedDataset from .exporter import Exporter from .tracer import Tracer @@ -143,17 +144,21 @@ def sample_data(self, raise ValueError(f'Unsupported sample_algo: {sample_algo}') def run(self, + dataset: Union[Dataset, NestedDataset] = None, load_data_np: Optional[PositiveInt] = None, skip_return=False): """ Running the dataset process pipeline. + :param dataset: a Dataset object to be executed. :param load_data_np: number of workers when loading the dataset. :param skip_return: skip return for API called. :return: processed dataset. """ # 1. format data - if self.cfg.use_checkpoint and self.ckpt_manager.ckpt_available: + if dataset is not None: + logger.info(f'Using existing dataset {dataset}') + elif self.cfg.use_checkpoint and self.ckpt_manager.ckpt_available: logger.info('Loading dataset from checkpoint...') dataset = self.ckpt_manager.load_ckpt() else: From 658892c4757c433f7cab6d1e147eee2b2e9b003f Mon Sep 17 00:00:00 2001 From: Haibin Date: Fri, 28 Feb 2025 11:20:52 +0800 Subject: [PATCH 2/3] for op to stat --- data_juicer/utils/constant.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/data_juicer/utils/constant.py b/data_juicer/utils/constant.py index abeb6fc27..70ce1b7ad 100644 --- a/data_juicer/utils/constant.py +++ b/data_juicer/utils/constant.py @@ -129,9 +129,23 @@ def __getattr__(cls, attr): cls._accessed_by[caller_class].add(stat_key) return stat_key - def get_access_log(cls, dj_cfg=None): + def get_access_log(cls, dj_cfg=None, dataset=None): if cls._accessed_by: return cls._accessed_by + elif dj_cfg and dataset: + tmp_dj_cfg = copy.deepcopy(dj_cfg) + tmp_dj_cfg.use_cache = False + tmp_dj_cfg.use_checkpoint = False + + from data_juicer.core import Analyzer + tmp_analyzer = Analyzer(tmp_dj_cfg) + + from data_juicer.core.data import NestedDataset + dataset = NestedDataset.from_dict( + {k: [v] + for k, v in dataset[0].items()}) + # do not overwrite the true analysis results + tmp_analyzer.run(dataset=dataset, skip_export=True) elif dj_cfg: tmp_dj_cfg = copy.deepcopy(dj_cfg) # the access has been skipped due to the use of cache @@ -175,9 +189,6 @@ def get_access_log(cls, dj_cfg=None): tmp_dj_cfg.use_cache = False tmp_dj_cfg.use_checkpoint = False - from data_juicer.config import get_init_configs - tmp_dj_cfg = get_init_configs(tmp_dj_cfg) - from data_juicer.core import Analyzer tmp_analyzer = Analyzer(tmp_dj_cfg) # do not overwrite the true analysis results From da1789a39eab2ffc7b4e9eadcab72fe2d04b8ef1 Mon Sep 17 00:00:00 2001 From: Haibin Date: Fri, 28 Feb 2025 15:18:35 +0800 Subject: [PATCH 3/3] use dataset.take --- data_juicer/utils/constant.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/data_juicer/utils/constant.py b/data_juicer/utils/constant.py index 70ce1b7ad..5c6828b07 100644 --- a/data_juicer/utils/constant.py +++ b/data_juicer/utils/constant.py @@ -140,10 +140,7 @@ def get_access_log(cls, dj_cfg=None, dataset=None): from data_juicer.core import Analyzer tmp_analyzer = Analyzer(tmp_dj_cfg) - from data_juicer.core.data import NestedDataset - dataset = NestedDataset.from_dict( - {k: [v] - for k, v in dataset[0].items()}) + dataset = dataset.take(1) # do not overwrite the true analysis results tmp_analyzer.run(dataset=dataset, skip_export=True) elif dj_cfg: