|
| 1 | +""" |
| 2 | + This file contains the code used to process and create the |
| 3 | + FineWeb dataset (https://huggingface.co/datasets/HuggingFaceFW/fineweb) |
| 4 | +""" |
| 5 | +from datatrove.executor.slurm import SlurmPipelineExecutor |
| 6 | +from datatrove.pipeline.dedup import MinhashDedupCluster, MinhashDedupFilter, MinhashDedupSignature |
| 7 | +from datatrove.pipeline.dedup.minhash import MinhashConfig, MinhashDedupBuckets |
| 8 | +from datatrove.pipeline.extractors import Trafilatura |
| 9 | +from datatrove.pipeline.filters import ( |
| 10 | + C4QualityFilter, |
| 11 | + FineWebQualityFilter, |
| 12 | + GopherQualityFilter, |
| 13 | + GopherRepetitionFilter, |
| 14 | + LanguageFilter, |
| 15 | + URLFilter, |
| 16 | +) |
| 17 | +from datatrove.pipeline.formatters import PIIFormatter |
| 18 | +from datatrove.pipeline.readers import JsonlReader, WarcReader |
| 19 | +from datatrove.pipeline.tokens import TokensCounter |
| 20 | +from datatrove.pipeline.writers.jsonl import JsonlWriter |
| 21 | + |
| 22 | + |
| 23 | +""" |
| 24 | + we first ran the following pipeline for each dump |
| 25 | +""" |
| 26 | +DUMP_TO_PROCESS = "CC-MAIN-2O23-5O" # example |
| 27 | + |
| 28 | +MAIN_OUTPUT_PATH = "s3://some_s3_bucket" |
| 29 | +FILTERING_OUTPUT_PATH = f"{MAIN_OUTPUT_PATH}/base_processing" |
| 30 | + |
| 31 | +main_processing_executor = SlurmPipelineExecutor( |
| 32 | + job_name=f"cc_{DUMP_TO_PROCESS}", |
| 33 | + pipeline=[ |
| 34 | + WarcReader( |
| 35 | + f"s3://commoncrawl/crawl-data/{DUMP_TO_PROCESS}/segments/", |
| 36 | + glob_pattern="*/warc/*", # we want the warc files |
| 37 | + default_metadata={"dump": DUMP_TO_PROCESS}, |
| 38 | + ), |
| 39 | + URLFilter(exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/1_url/{DUMP_TO_PROCESS}")), |
| 40 | + Trafilatura(favour_precision=True), |
| 41 | + LanguageFilter( |
| 42 | + exclusion_writer=JsonlWriter( |
| 43 | + f"{FILTERING_OUTPUT_PATH}/2_non_english/", |
| 44 | + output_filename="${language}/" + DUMP_TO_PROCESS + "/${rank}.jsonl.gz", |
| 45 | + # folder structure: language/dump/file |
| 46 | + ) |
| 47 | + ), |
| 48 | + GopherRepetitionFilter( |
| 49 | + exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/3_gopher_rep/{DUMP_TO_PROCESS}") |
| 50 | + ), |
| 51 | + GopherQualityFilter( |
| 52 | + exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/4_gopher_qual/{DUMP_TO_PROCESS}") |
| 53 | + ), |
| 54 | + C4QualityFilter( |
| 55 | + filter_no_terminal_punct=False, |
| 56 | + exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/5_c4/{DUMP_TO_PROCESS}"), |
| 57 | + ), |
| 58 | + FineWebQualityFilter( |
| 59 | + exclusion_writer=JsonlWriter(f"{FILTERING_OUTPUT_PATH}/removed/6_fineweb_qual/{DUMP_TO_PROCESS}") |
| 60 | + ), |
| 61 | + JsonlWriter(f"{FILTERING_OUTPUT_PATH}/output/{DUMP_TO_PROCESS}"), |
| 62 | + ], |
| 63 | + tasks=8000, |
| 64 | + time="10:00:00", |
| 65 | + logging_dir=f"{MAIN_OUTPUT_PATH}/logs/base_processing/{DUMP_TO_PROCESS}", |
| 66 | + slurm_logs_folder=f"logs/base_processing/{DUMP_TO_PROCESS}/slurm_logs", # must be local |
| 67 | + randomize_start=True, # don't hit the bucket all at once with the list requests |
| 68 | + mem_per_cpu_gb=2, |
| 69 | + partition="hopper-cpu", |
| 70 | +) |
| 71 | +main_processing_executor.run() |
| 72 | + |
| 73 | +""" |
| 74 | + we then applied minhash deduplication to each individual dump, |
| 75 | +""" |
| 76 | + |
| 77 | +# you can also change ngrams or the number of buckets and their size here |
| 78 | +minhash_config = MinhashConfig( |
| 79 | + use_64bit_hashes=True, # better precision -> fewer false positives (collisions) |
| 80 | + num_buckets=14, |
| 81 | + hashes_per_bucket=8, |
| 82 | + n_grams=5, |
| 83 | +) |
| 84 | + |
| 85 | +S3_MINHASH_BASE_PATH = f"{MAIN_OUTPUT_PATH}/minhash" |
| 86 | + |
| 87 | +S3_LOGS_FOLDER = f"{MAIN_OUTPUT_PATH}/logs/minhash" |
| 88 | +LOCAL_LOGS_FOLDER = "logs/minhash" |
| 89 | + |
| 90 | +TOTAL_TASKS = 1000 |
| 91 | + |
| 92 | +# this is the original data that we want to deduplicate |
| 93 | +INPUT_READER = JsonlReader( |
| 94 | + f"{FILTERING_OUTPUT_PATH}/output/{DUMP_TO_PROCESS}" |
| 95 | +) # this is the output from the first part |
| 96 | + |
| 97 | +# stage 1 computes minhash signatures for each task (each task gets a set of files) |
| 98 | +stage1 = SlurmPipelineExecutor( |
| 99 | + job_name=f"mh1_{DUMP_TO_PROCESS}", |
| 100 | + pipeline=[ |
| 101 | + INPUT_READER, |
| 102 | + MinhashDedupSignature( |
| 103 | + output_folder=f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/signatures", config=minhash_config |
| 104 | + ), |
| 105 | + ], |
| 106 | + tasks=TOTAL_TASKS, |
| 107 | + time="5:00:00", |
| 108 | + partition="hopper-cpu", |
| 109 | + logging_dir=f"{S3_LOGS_FOLDER}/signatures", |
| 110 | + slurm_logs_folder=f"{LOCAL_LOGS_FOLDER}/signatures/slurm_logs", |
| 111 | + randomize_start=True, |
| 112 | + depends=main_processing_executor, # only start after the first one completes |
| 113 | +) |
| 114 | + |
| 115 | +stage2 = SlurmPipelineExecutor( |
| 116 | + job_name=f"mh2_{DUMP_TO_PROCESS}", |
| 117 | + pipeline=[ |
| 118 | + MinhashDedupBuckets( |
| 119 | + input_folder=f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/signatures", |
| 120 | + output_folder=f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/buckets", |
| 121 | + config=MinhashConfig(use_64bit_hashes=True), |
| 122 | + ), |
| 123 | + ], |
| 124 | + tasks=minhash_config.num_buckets * 50, # the code supports parallelizing each bucket. here we run 50 |
| 125 | + # workers per bucket |
| 126 | + randomize_start=True, |
| 127 | + logging_dir=f"{S3_LOGS_FOLDER}/buckets", |
| 128 | + partition="hopper-cpu", |
| 129 | + time="02:00:00", |
| 130 | + mem_per_cpu_gb=4, |
| 131 | + cpus_per_task=3, # you can add run more (smaller) tasks if you do not have a lot of memory |
| 132 | + depends=stage1, |
| 133 | +) |
| 134 | + |
| 135 | + |
| 136 | +stage3 = SlurmPipelineExecutor( |
| 137 | + job_name=f"mh3_{DUMP_TO_PROCESS}", |
| 138 | + pipeline=[ |
| 139 | + MinhashDedupCluster( |
| 140 | + input_folder=f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/buckets", |
| 141 | + output_folder=f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/remove_ids", |
| 142 | + config=minhash_config, |
| 143 | + ), |
| 144 | + ], |
| 145 | + tasks=1, # this step runs on a single task |
| 146 | + logging_dir=f"{S3_LOGS_FOLDER}/clustering", |
| 147 | + partition="hopper-cpu", |
| 148 | + time="30:00:00", # and can also be quite slow. Usually not this slow though |
| 149 | + mem_per_cpu_gb=25, |
| 150 | + cpus_per_task=8, # if you dedup a full dump, you do need a lot of memory for this one |
| 151 | + depends=stage2, |
| 152 | +) |
| 153 | + |
| 154 | + |
| 155 | +stage4 = SlurmPipelineExecutor( |
| 156 | + job_name=f"mh4_{DUMP_TO_PROCESS}", |
| 157 | + pipeline=[ |
| 158 | + INPUT_READER, |
| 159 | + TokensCounter(), # you can remove this one, it's just a nice way to know how many tokens we have |
| 160 | + # before and after dedup |
| 161 | + MinhashDedupFilter(input_folder=f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/remove_ids"), |
| 162 | + # run the PII removal |
| 163 | + PIIFormatter(), |
| 164 | + JsonlWriter(f"{S3_MINHASH_BASE_PATH}/{DUMP_TO_PROCESS}/deduped_output"), |
| 165 | + ], |
| 166 | + tasks=TOTAL_TASKS, |
| 167 | + logging_dir=f"{S3_LOGS_FOLDER}/filtering", |
| 168 | + partition="hopper-cpu", |
| 169 | + time="5:00:00", |
| 170 | + mem_per_cpu_gb=4, |
| 171 | + depends=stage3, |
| 172 | +) |
| 173 | + |
| 174 | +# launch dedup pipelines |
| 175 | +stage4.run() |
0 commit comments