Skip to content

Commit 8ea5625

Browse files
authored
Add new pipeline option to perform more frequent deletion of intermediate files (#262)
1 parent cd85ff2 commit 8ea5625

File tree

5 files changed

+100
-5
lines changed

5 files changed

+100
-5
lines changed

SequenceAnalysis/api-src/org/labkey/api/sequenceanalysis/pipeline/TaskFileManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ public interface TaskFileManager extends PipelineOutputTracker
5656

5757
boolean isDeleteIntermediateFiles();
5858

59+
public boolean performCleanupAfterEachStep();
60+
5961
boolean isCopyInputsLocally();
6062

6163
void addPicardMetricsFiles(List<PipelineStepOutput.PicardMetricsOutput> files) throws PipelineJobException;

SequenceAnalysis/src/org/labkey/sequenceanalysis/SequenceAnalysisController.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3298,6 +3298,15 @@ public ApiResponse execute(CheckFileStatusForm form, BindException errors)
32983298

32993299
toolArr.put(intermediateFiles);
33003300

3301+
JSONObject performCleanupAfterEachStep = new JSONObject();
3302+
performCleanupAfterEachStep.put("name", "performCleanupAfterEachStep");
3303+
performCleanupAfterEachStep.put("defaultValue", true);
3304+
performCleanupAfterEachStep.put("label", "Perform Cleanup After Each Step");
3305+
performCleanupAfterEachStep.put("description", "Is selected, intermediate files from this job will be deleted after each step, instead of once at the end of the job. This can reduce the working directory size. Note: this will only apply if deleteIntermediateFiles is selected, and this is not supported across every possible pipeline type.");
3306+
performCleanupAfterEachStep.put("fieldXtype", "checkbox");
3307+
3308+
toolArr.put(performCleanupAfterEachStep);
3309+
33013310
ret.put("toolParameters", toolArr);
33023311

33033312
ret.put("description", handler.getDescription());

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/ProcessVariantsHandler.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,12 @@ public static File processVCF(File input, Integer libraryId, JobContext ctx, Res
483483
action.setEndTime(end);
484484
ctx.getJob().getLogger().info(stepCtx.getProvider().getLabel() + " Duration: " + DurationFormatUtils.formatDurationWords(end.getTime() - start.getTime(), true, true));
485485

486+
if (ctx.getFileManager().performCleanupAfterEachStep())
487+
{
488+
List<File> toRetain = Arrays.asList(currentVCF, new File(currentVCF.getPath() + ".tbi"));
489+
getTaskFileManagerImpl(ctx).deleteIntermediateFiles(toRetain);
490+
}
491+
486492
resumer.setStepComplete(stepIdx, input.getPath(), action, currentVCF);
487493
}
488494

@@ -886,4 +892,14 @@ public void performAdditionalMergeTasks(JobContext ctx, PipelineJob job, TaskFil
886892
}
887893
}
888894
}
895+
896+
private static TaskFileManagerImpl getTaskFileManagerImpl(JobContext ctx) throws PipelineJobException
897+
{
898+
if (!(ctx.getFileManager() instanceof TaskFileManagerImpl tfm))
899+
{
900+
throw new PipelineJobException("Expected fileManager to be a TaskFileManagerImpl");
901+
}
902+
903+
return tfm;
904+
}
889905
}

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/SequenceAlignmentTask.java

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
import java.util.LinkedHashSet;
9797
import java.util.List;
9898
import java.util.Map;
99+
import java.util.Objects;
99100
import java.util.Optional;
100101
import java.util.Set;
101102
import java.util.stream.Collectors;
@@ -347,11 +348,27 @@ private Map<ReadData, Pair<File, File>> performFastqPreprocessing(SequenceReadse
347348
toAlign.put(d, pair);
348349
}
349350

351+
if (getHelper().getFileManager().performCleanupAfterEachStep())
352+
{
353+
List<File> toRetain = toAlign.values().stream().map(x -> Arrays.asList(x.first, x.second)).flatMap(List::stream).filter(Objects::nonNull).toList();
354+
getTaskFileManagerImpl().deleteIntermediateFiles(toRetain);
355+
}
356+
350357
_resumer.setFastqPreprocessingDone(toAlign, preprocessingActions, copiedInputs);
351358

352359
return toAlign;
353360
}
354361

362+
private TaskFileManagerImpl getTaskFileManagerImpl() throws PipelineJobException
363+
{
364+
if (!(getHelper().getFileManager() instanceof TaskFileManagerImpl tfm))
365+
{
366+
throw new PipelineJobException("Expected fileManager to be a TaskFileManagerImpl");
367+
}
368+
369+
return tfm;
370+
}
371+
355372
private SequenceAlignmentJob getPipelineJob()
356373
{
357374
return (SequenceAlignmentJob)getJob();
@@ -667,6 +684,12 @@ private void alignSet(Readset rs, String basename, Map<ReadData, Pair<File, File
667684
List<RecordedAction> alignActions = new ArrayList<>();
668685
bam = doAlignment(referenceGenome, rs, files, alignActions);
669686

687+
if (getHelper().getFileManager().performCleanupAfterEachStep())
688+
{
689+
List<File> toRetain = Arrays.asList(bam, SequenceUtil.getExpectedIndex(bam));
690+
getTaskFileManagerImpl().deleteIntermediateFiles(toRetain);
691+
}
692+
670693
_resumer.setInitialAlignmentDone(bam, alignActions);
671694
}
672695

@@ -742,6 +765,12 @@ else if (step.expectToCreateNewBam())
742765
action.setEndTime(end);
743766
getJob().getLogger().info(stepCtx.getProvider().getLabel() + " Duration: " + DurationFormatUtils.formatDurationWords(end.getTime() - start.getTime(), true, true));
744767
postProcessActions.add(action);
768+
769+
if (getHelper().getFileManager().performCleanupAfterEachStep())
770+
{
771+
List<File> toRetain = Arrays.asList(bam, SequenceUtil.getExpectedIndex(bam));
772+
getTaskFileManagerImpl().deleteIntermediateFiles(toRetain);
773+
}
745774
}
746775
}
747776

@@ -791,6 +820,12 @@ else if (step.expectToCreateNewBam())
791820
}
792821
}
793822

823+
if (getHelper().getFileManager().performCleanupAfterEachStep())
824+
{
825+
List<File> toRetain = Arrays.asList(bam, SequenceUtil.getExpectedIndex(bam));
826+
getTaskFileManagerImpl().deleteIntermediateFiles(toRetain);
827+
}
828+
794829
_resumer.setBamSortDone(bam, sortAction);
795830
}
796831

@@ -841,6 +876,12 @@ else if (step.expectToCreateNewBam())
841876
renameAction.setEndTime(end);
842877
getJob().getLogger().info("Rename Bam Duration: " + DurationFormatUtils.formatDurationWords(end.getTime() - start.getTime(), true, true));
843878

879+
if (getHelper().getFileManager().performCleanupAfterEachStep())
880+
{
881+
List<File> toRetain = Arrays.asList(renamedBam, SequenceUtil.getExpectedIndex(renamedBam));
882+
getTaskFileManagerImpl().deleteIntermediateFiles(toRetain);
883+
}
884+
844885
_resumer.setBamRenameDone(renamedBam, List.of(renameAction));
845886
}
846887

@@ -888,6 +929,12 @@ else if (step.expectToCreateNewBam())
888929
indexAction.setEndTime(end);
889930
getJob().getLogger().info("IndexBam Duration: " + DurationFormatUtils.formatDurationWords(end.getTime() - start.getTime(), true, true));
890931

932+
if (getHelper().getFileManager().performCleanupAfterEachStep())
933+
{
934+
List<File> toRetain = Arrays.asList(renamedBam, SequenceUtil.getExpectedIndex(renamedBam));
935+
getTaskFileManagerImpl().deleteIntermediateFiles(toRetain);
936+
}
937+
891938
_resumer.setIndexBamDone(true, indexAction);
892939
}
893940
}
@@ -1045,8 +1092,15 @@ else if (step.expectToCreateNewBam())
10451092
}
10461093

10471094
analysisActions.add(action);
1048-
_resumer.setBamAnalysisComplete(analysisActions);
1095+
1096+
if (getHelper().getFileManager().performCleanupAfterEachStep())
1097+
{
1098+
List<File> toRetain = Arrays.asList(renamedBam, SequenceUtil.getExpectedIndex(renamedBam));
1099+
getTaskFileManagerImpl().deleteIntermediateFiles(toRetain);
1100+
}
10491101
}
1102+
1103+
_resumer.setBamAnalysisComplete(analysisActions);
10501104
}
10511105
}
10521106

SequenceAnalysis/src/org/labkey/sequenceanalysis/pipeline/TaskFileManagerImpl.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.apache.commons.lang3.StringUtils;
88
import org.apache.commons.lang3.time.DurationFormatUtils;
99
import org.apache.logging.log4j.Logger;
10+
import org.jetbrains.annotations.NotNull;
1011
import org.jetbrains.annotations.Nullable;
1112
import org.labkey.api.data.Table;
1213
import org.labkey.api.data.TableInfo;
@@ -707,6 +708,12 @@ public boolean isDeleteIntermediateFiles()
707708
return "true".equals(_job.getParameters().get("deleteIntermediateFiles"));
708709
}
709710

711+
@Override
712+
public boolean performCleanupAfterEachStep()
713+
{
714+
return "true".equals(_job.getParameters().get("performCleanupAfterEachStep"));
715+
}
716+
710717
@Override
711718
public boolean isCopyInputsLocally()
712719
{
@@ -726,19 +733,26 @@ private Set<String> getInputPaths()
726733
@Override
727734
public void deleteIntermediateFiles() throws PipelineJobException
728735
{
729-
_job.getLogger().info("Cleaning up intermediate files");
736+
deleteIntermediateFiles(Collections.emptySet());
737+
}
730738

731-
Set<File> inputs = new HashSet<>();
732-
inputs.addAll(getSupport().getInputFiles());
739+
public void deleteIntermediateFiles(@NotNull Collection<File> filesToRetain) throws PipelineJobException
740+
{
741+
_job.getLogger().info("Cleaning up intermediate files");
733742

734743
Set<String> inputPaths = getInputPaths();
735-
736744
if (isDeleteIntermediateFiles())
737745
{
738746
_job.getLogger().debug("Intermediate files will be removed, total: " + _intermediateFiles.size());
739747

740748
for (File f : _intermediateFiles)
741749
{
750+
if (filesToRetain.contains(f))
751+
{
752+
_job.getLogger().debug("\tFile marked for deletion, but was part of filesToRetain and will not be deleted: " + f.getPath());
753+
continue;
754+
}
755+
742756
_job.getLogger().debug("\tDeleting intermediate file: " + f.getPath());
743757

744758
if (inputPaths.contains(f.getPath()))

0 commit comments

Comments
 (0)