From 8ea56254cd02a4e77b9e2e64ac103ee8208e7bfd Mon Sep 17 00:00:00 2001 From: bbimber Date: Fri, 19 Jan 2024 20:54:51 -0600 Subject: [PATCH] Add new pipeline option to perform more frequent deletion of intermediate files (#262) --- .../pipeline/TaskFileManager.java | 2 + .../SequenceAnalysisController.java | 9 +++ .../pipeline/ProcessVariantsHandler.java | 16 ++++++ .../pipeline/SequenceAlignmentTask.java | 56 ++++++++++++++++++- .../pipeline/TaskFileManagerImpl.java | 22 ++++++-- 5 files changed, 100 insertions(+), 5 deletions(-) diff --git a/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/TaskFileManager.java b/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/TaskFileManager.java index 6263c7994..beecdcb51 100644 --- a/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/TaskFileManager.java +++ b/SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/TaskFileManager.java @@ -56,6 +56,8 @@ public interface TaskFileManager extends PipelineOutputTracker boolean isDeleteIntermediateFiles(); + public boolean performCleanupAfterEachStep(); + boolean isCopyInputsLocally(); void addPicardMetricsFiles(List files) throws PipelineJobException; diff --git a/SequenceAnalysis/src/org/labkey/sequenceanalysis/SequenceAnalysisController.java b/SequenceAnalysis/src/org/labkey/sequenceanalysis/SequenceAnalysisController.java index 11d319e7f..096f16ff9 100644 --- a/SequenceAnalysis/src/org/labkey/sequenceanalysis/SequenceAnalysisController.java +++ b/SequenceAnalysis/src/org/labkey/sequenceanalysis/SequenceAnalysisController.java @@ -3298,6 +3298,15 @@ public ApiResponse execute(CheckFileStatusForm form, BindException errors) toolArr.put(intermediateFiles); + JSONObject performCleanupAfterEachStep = new JSONObject(); + performCleanupAfterEachStep.put("name", "performCleanupAfterEachStep"); + performCleanupAfterEachStep.put("defaultValue", true); + performCleanupAfterEachStep.put("label", "Perform Cleanup After Each Step"); + performCleanupAfterEachStep.put("description", "Is selected, intermediate files from this job will be deleted after each step, instead of once at the end of the job. This can reduce the working directory size. Note: this will only apply if deleteIntermediateFiles is selected, and this is not supported across every possible pipeline type."); + performCleanupAfterEachStep.put("fieldXtype", "checkbox"); + + toolArr.put(performCleanupAfterEachStep); + ret.put("toolParameters", toolArr); ret.put("description", handler.getDescription()); diff --git a/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/ProcessVariantsHandler.java b/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/ProcessVariantsHandler.java index d1efdc9a2..ac4624b6d 100644 --- a/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/ProcessVariantsHandler.java +++ b/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/ProcessVariantsHandler.java @@ -483,6 +483,12 @@ public static File processVCF(File input, Integer libraryId, JobContext ctx, Res action.setEndTime(end); ctx.getJob().getLogger().info(stepCtx.getProvider().getLabel() + " Duration: " + DurationFormatUtils.formatDurationWords(end.getTime() - start.getTime(), true, true)); + if (ctx.getFileManager().performCleanupAfterEachStep()) + { + List toRetain = Arrays.asList(currentVCF, new File(currentVCF.getPath() + ".tbi")); + getTaskFileManagerImpl(ctx).deleteIntermediateFiles(toRetain); + } + resumer.setStepComplete(stepIdx, input.getPath(), action, currentVCF); } @@ -886,4 +892,14 @@ public void performAdditionalMergeTasks(JobContext ctx, PipelineJob job, TaskFil } } } + + private static TaskFileManagerImpl getTaskFileManagerImpl(JobContext ctx) throws PipelineJobException + { + if (!(ctx.getFileManager() instanceof TaskFileManagerImpl tfm)) + { + throw new PipelineJobException("Expected fileManager to be a TaskFileManagerImpl"); + } + + return tfm; + } } diff --git a/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/SequenceAlignmentTask.java b/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/SequenceAlignmentTask.java index aba4524bf..f0b4c4aae 100644 --- a/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/SequenceAlignmentTask.java +++ b/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/SequenceAlignmentTask.java @@ -96,6 +96,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -347,11 +348,27 @@ private Map> performFastqPreprocessing(SequenceReadse toAlign.put(d, pair); } + if (getHelper().getFileManager().performCleanupAfterEachStep()) + { + List toRetain = toAlign.values().stream().map(x -> Arrays.asList(x.first, x.second)).flatMap(List::stream).filter(Objects::nonNull).toList(); + getTaskFileManagerImpl().deleteIntermediateFiles(toRetain); + } + _resumer.setFastqPreprocessingDone(toAlign, preprocessingActions, copiedInputs); return toAlign; } + private TaskFileManagerImpl getTaskFileManagerImpl() throws PipelineJobException + { + if (!(getHelper().getFileManager() instanceof TaskFileManagerImpl tfm)) + { + throw new PipelineJobException("Expected fileManager to be a TaskFileManagerImpl"); + } + + return tfm; + } + private SequenceAlignmentJob getPipelineJob() { return (SequenceAlignmentJob)getJob(); @@ -667,6 +684,12 @@ private void alignSet(Readset rs, String basename, Map alignActions = new ArrayList<>(); bam = doAlignment(referenceGenome, rs, files, alignActions); + if (getHelper().getFileManager().performCleanupAfterEachStep()) + { + List toRetain = Arrays.asList(bam, SequenceUtil.getExpectedIndex(bam)); + getTaskFileManagerImpl().deleteIntermediateFiles(toRetain); + } + _resumer.setInitialAlignmentDone(bam, alignActions); } @@ -742,6 +765,12 @@ else if (step.expectToCreateNewBam()) action.setEndTime(end); getJob().getLogger().info(stepCtx.getProvider().getLabel() + " Duration: " + DurationFormatUtils.formatDurationWords(end.getTime() - start.getTime(), true, true)); postProcessActions.add(action); + + if (getHelper().getFileManager().performCleanupAfterEachStep()) + { + List toRetain = Arrays.asList(bam, SequenceUtil.getExpectedIndex(bam)); + getTaskFileManagerImpl().deleteIntermediateFiles(toRetain); + } } } @@ -791,6 +820,12 @@ else if (step.expectToCreateNewBam()) } } + if (getHelper().getFileManager().performCleanupAfterEachStep()) + { + List toRetain = Arrays.asList(bam, SequenceUtil.getExpectedIndex(bam)); + getTaskFileManagerImpl().deleteIntermediateFiles(toRetain); + } + _resumer.setBamSortDone(bam, sortAction); } @@ -841,6 +876,12 @@ else if (step.expectToCreateNewBam()) renameAction.setEndTime(end); getJob().getLogger().info("Rename Bam Duration: " + DurationFormatUtils.formatDurationWords(end.getTime() - start.getTime(), true, true)); + if (getHelper().getFileManager().performCleanupAfterEachStep()) + { + List toRetain = Arrays.asList(renamedBam, SequenceUtil.getExpectedIndex(renamedBam)); + getTaskFileManagerImpl().deleteIntermediateFiles(toRetain); + } + _resumer.setBamRenameDone(renamedBam, List.of(renameAction)); } @@ -888,6 +929,12 @@ else if (step.expectToCreateNewBam()) indexAction.setEndTime(end); getJob().getLogger().info("IndexBam Duration: " + DurationFormatUtils.formatDurationWords(end.getTime() - start.getTime(), true, true)); + if (getHelper().getFileManager().performCleanupAfterEachStep()) + { + List toRetain = Arrays.asList(renamedBam, SequenceUtil.getExpectedIndex(renamedBam)); + getTaskFileManagerImpl().deleteIntermediateFiles(toRetain); + } + _resumer.setIndexBamDone(true, indexAction); } } @@ -1045,8 +1092,15 @@ else if (step.expectToCreateNewBam()) } analysisActions.add(action); - _resumer.setBamAnalysisComplete(analysisActions); + + if (getHelper().getFileManager().performCleanupAfterEachStep()) + { + List toRetain = Arrays.asList(renamedBam, SequenceUtil.getExpectedIndex(renamedBam)); + getTaskFileManagerImpl().deleteIntermediateFiles(toRetain); + } } + + _resumer.setBamAnalysisComplete(analysisActions); } } diff --git a/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/TaskFileManagerImpl.java b/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/TaskFileManagerImpl.java index 670534465..622c2ed74 100644 --- a/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/TaskFileManagerImpl.java +++ b/SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/TaskFileManagerImpl.java @@ -7,6 +7,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DurationFormatUtils; import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.labkey.api.data.Table; import org.labkey.api.data.TableInfo; @@ -707,6 +708,12 @@ public boolean isDeleteIntermediateFiles() return "true".equals(_job.getParameters().get("deleteIntermediateFiles")); } + @Override + public boolean performCleanupAfterEachStep() + { + return "true".equals(_job.getParameters().get("performCleanupAfterEachStep")); + } + @Override public boolean isCopyInputsLocally() { @@ -726,19 +733,26 @@ private Set getInputPaths() @Override public void deleteIntermediateFiles() throws PipelineJobException { - _job.getLogger().info("Cleaning up intermediate files"); + deleteIntermediateFiles(Collections.emptySet()); + } - Set inputs = new HashSet<>(); - inputs.addAll(getSupport().getInputFiles()); + public void deleteIntermediateFiles(@NotNull Collection filesToRetain) throws PipelineJobException + { + _job.getLogger().info("Cleaning up intermediate files"); Set inputPaths = getInputPaths(); - if (isDeleteIntermediateFiles()) { _job.getLogger().debug("Intermediate files will be removed, total: " + _intermediateFiles.size()); for (File f : _intermediateFiles) { + if (filesToRetain.contains(f)) + { + _job.getLogger().debug("\tFile marked for deletion, but was part of filesToRetain and will not be deleted: " + f.getPath()); + continue; + } + _job.getLogger().debug("\tDeleting intermediate file: " + f.getPath()); if (inputPaths.contains(f.getPath()))