Skip to content

Commit

Permalink
Add new pipeline option to perform more frequent deletion of intermed…
Browse files Browse the repository at this point in the history
…iate files
  • Loading branch information
bbimber committed Jan 20, 2024
1 parent cd85ff2 commit 64a311d
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public interface TaskFileManager extends PipelineOutputTracker

boolean isDeleteIntermediateFiles();

public boolean performCleanupAfterEachStep();

boolean isCopyInputsLocally();

void addPicardMetricsFiles(List<PipelineStepOutput.PicardMetricsOutput> files) throws PipelineJobException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3298,6 +3298,15 @@ public ApiResponse execute(CheckFileStatusForm form, BindException errors)

toolArr.put(intermediateFiles);

JSONObject performCleanupAfterEachStep = new JSONObject();
performCleanupAfterEachStep.put("name", "performCleanupAfterEachStep");
performCleanupAfterEachStep.put("defaultValue", true);
performCleanupAfterEachStep.put("label", "Perform Cleanup After Each Step");
performCleanupAfterEachStep.put("description", "Is selected, intermediate files from this job will be deleted after each step, instead of once at the end of the job. This can reduce the working directory size. Note: this will only apply if deleteIntermediateFiles is selected, and this is not supported across every possible pipeline type.");
performCleanupAfterEachStep.put("fieldXtype", "checkbox");

toolArr.put(performCleanupAfterEachStep);

ret.put("toolParameters", toolArr);

ret.put("description", handler.getDescription());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,12 @@ public static File processVCF(File input, Integer libraryId, JobContext ctx, Res
action.setEndTime(end);
ctx.getJob().getLogger().info(stepCtx.getProvider().getLabel() + " Duration: " + DurationFormatUtils.formatDurationWords(end.getTime() - start.getTime(), true, true));

if (ctx.getFileManager().performCleanupAfterEachStep())
{
List<File> toRetain = Arrays.asList(currentVCF, new File(currentVCF.getPath() + ".tbi"));
getTaskFileManagerImpl(ctx).deleteIntermediateFiles(toRetain);
}

resumer.setStepComplete(stepIdx, input.getPath(), action, currentVCF);
}

Expand Down Expand Up @@ -886,4 +892,14 @@ public void performAdditionalMergeTasks(JobContext ctx, PipelineJob job, TaskFil
}
}
}

private static TaskFileManagerImpl getTaskFileManagerImpl(JobContext ctx) throws PipelineJobException
{
if (!(ctx.getFileManager() instanceof TaskFileManagerImpl tfm))
{
throw new PipelineJobException("Expected fileManager to be a TaskFileManagerImpl");
}

return tfm;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -347,11 +348,27 @@ private Map<ReadData, Pair<File, File>> performFastqPreprocessing(SequenceReadse
toAlign.put(d, pair);
}

if (getHelper().getFileManager().performCleanupAfterEachStep())
{
List<File> toRetain = toAlign.values().stream().map(x -> Arrays.asList(x.first, x.second)).flatMap(List::stream).filter(Objects::nonNull).toList();
getTaskFileManagerImpl().deleteIntermediateFiles(toRetain);
}

_resumer.setFastqPreprocessingDone(toAlign, preprocessingActions, copiedInputs);

return toAlign;
}

private TaskFileManagerImpl getTaskFileManagerImpl() throws PipelineJobException
{
if (!(getHelper().getFileManager() instanceof TaskFileManagerImpl tfm))
{
throw new PipelineJobException("Expected fileManager to be a TaskFileManagerImpl");
}

return tfm;
}

private SequenceAlignmentJob getPipelineJob()
{
return (SequenceAlignmentJob)getJob();
Expand Down Expand Up @@ -667,6 +684,12 @@ private void alignSet(Readset rs, String basename, Map<ReadData, Pair<File, File
List<RecordedAction> alignActions = new ArrayList<>();
bam = doAlignment(referenceGenome, rs, files, alignActions);

if (getHelper().getFileManager().performCleanupAfterEachStep())
{
List<File> toRetain = Arrays.asList(bam, SequenceUtil.getExpectedIndex(bam));
getTaskFileManagerImpl().deleteIntermediateFiles(toRetain);
}

_resumer.setInitialAlignmentDone(bam, alignActions);
}

Expand Down Expand Up @@ -742,6 +765,12 @@ else if (step.expectToCreateNewBam())
action.setEndTime(end);
getJob().getLogger().info(stepCtx.getProvider().getLabel() + " Duration: " + DurationFormatUtils.formatDurationWords(end.getTime() - start.getTime(), true, true));
postProcessActions.add(action);

if (getHelper().getFileManager().performCleanupAfterEachStep())
{
List<File> toRetain = Arrays.asList(bam, SequenceUtil.getExpectedIndex(bam));
getTaskFileManagerImpl().deleteIntermediateFiles(toRetain);
}
}
}

Expand Down Expand Up @@ -791,6 +820,12 @@ else if (step.expectToCreateNewBam())
}
}

if (getHelper().getFileManager().performCleanupAfterEachStep())
{
List<File> toRetain = Arrays.asList(bam, SequenceUtil.getExpectedIndex(bam));
getTaskFileManagerImpl().deleteIntermediateFiles(toRetain);
}

_resumer.setBamSortDone(bam, sortAction);
}

Expand Down Expand Up @@ -841,6 +876,12 @@ else if (step.expectToCreateNewBam())
renameAction.setEndTime(end);
getJob().getLogger().info("Rename Bam Duration: " + DurationFormatUtils.formatDurationWords(end.getTime() - start.getTime(), true, true));

if (getHelper().getFileManager().performCleanupAfterEachStep())
{
List<File> toRetain = Arrays.asList(renamedBam, SequenceUtil.getExpectedIndex(renamedBam));
getTaskFileManagerImpl().deleteIntermediateFiles(toRetain);
}

_resumer.setBamRenameDone(renamedBam, List.of(renameAction));
}

Expand Down Expand Up @@ -888,6 +929,12 @@ else if (step.expectToCreateNewBam())
indexAction.setEndTime(end);
getJob().getLogger().info("IndexBam Duration: " + DurationFormatUtils.formatDurationWords(end.getTime() - start.getTime(), true, true));

if (getHelper().getFileManager().performCleanupAfterEachStep())
{
List<File> toRetain = Arrays.asList(renamedBam, SequenceUtil.getExpectedIndex(renamedBam));
getTaskFileManagerImpl().deleteIntermediateFiles(toRetain);
}

_resumer.setIndexBamDone(true, indexAction);
}
}
Expand Down Expand Up @@ -1045,8 +1092,15 @@ else if (step.expectToCreateNewBam())
}

analysisActions.add(action);
_resumer.setBamAnalysisComplete(analysisActions);

if (getHelper().getFileManager().performCleanupAfterEachStep())
{
List<File> toRetain = Arrays.asList(renamedBam, SequenceUtil.getExpectedIndex(renamedBam));
getTaskFileManagerImpl().deleteIntermediateFiles(toRetain);
}
}

_resumer.setBamAnalysisComplete(analysisActions);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.labkey.api.data.Table;
import org.labkey.api.data.TableInfo;
Expand Down Expand Up @@ -707,6 +708,12 @@ public boolean isDeleteIntermediateFiles()
return "true".equals(_job.getParameters().get("deleteIntermediateFiles"));
}

@Override
public boolean performCleanupAfterEachStep()
{
return "true".equals(_job.getParameters().get("performCleanupAfterEachStep"));
}

@Override
public boolean isCopyInputsLocally()
{
Expand All @@ -726,19 +733,26 @@ private Set<String> getInputPaths()
@Override
public void deleteIntermediateFiles() throws PipelineJobException
{
_job.getLogger().info("Cleaning up intermediate files");
deleteIntermediateFiles(Collections.emptySet());
}

Set<File> inputs = new HashSet<>();
inputs.addAll(getSupport().getInputFiles());
public void deleteIntermediateFiles(@NotNull Collection<File> filesToRetain) throws PipelineJobException
{
_job.getLogger().info("Cleaning up intermediate files");

Set<String> inputPaths = getInputPaths();

if (isDeleteIntermediateFiles())
{
_job.getLogger().debug("Intermediate files will be removed, total: " + _intermediateFiles.size());

for (File f : _intermediateFiles)
{
if (filesToRetain.contains(f))
{
_job.getLogger().debug("\tFile marked for deletion, but was part of filesToRetain and will not be deleted: " + f.getPath());
continue;
}

_job.getLogger().debug("\tDeleting intermediate file: " + f.getPath());

if (inputPaths.contains(f.getPath()))
Expand Down

0 comments on commit 64a311d

Please sign in to comment.