From d8a8d9a8d579571bfc5658f51145353ecdbc560a Mon Sep 17 00:00:00 2001 From: bbimber Date: Fri, 31 Jan 2025 09:51:23 -0800 Subject: [PATCH] Rework MergeSeurat to handle large data better --- singlecell/resources/chunks/MergeSeurat.R | 111 +++++++++++------- .../pipeline/singlecell/MergeSeurat.java | 3 +- 2 files changed, 68 insertions(+), 46 deletions(-) diff --git a/singlecell/resources/chunks/MergeSeurat.R b/singlecell/resources/chunks/MergeSeurat.R index 8b0d72d7c..c4d353ae5 100644 --- a/singlecell/resources/chunks/MergeSeurat.R +++ b/singlecell/resources/chunks/MergeSeurat.R @@ -5,15 +5,17 @@ if (!doDiet && length(seuratObjects) > 20 && !disableAutoDietSeurat) { doDiet <- TRUE } -mergeBatch <- function(dat) { +filesToDelete <- c() + +mergeBatchInMemory <- function(datasetIdToFilePath, saveFile) { toMerge <- list() - for (datasetId in names(dat)) { + for (datasetId in names(datasetIdToFilePath)) { print(paste0('Loading: ', datasetId)) if (doDiet) { - toMerge[[datasetId]] <- Seurat::DietSeurat(readSeuratRDS(dat[[datasetId]])) + toMerge[[datasetId]] <- Seurat::DietSeurat(readSeuratRDS(datasetIdToFilePath[[datasetId]])) gc() } else { - toMerge[[datasetId]] <- readSeuratRDS(dat[[datasetId]]) + toMerge[[datasetId]] <- readSeuratRDS(datasetIdToFilePath[[datasetId]]) } if (ncol(toMerge[[datasetId]]) == 1) { @@ -38,59 +40,78 @@ mergeBatch <- function(dat) { } seuratObj <- CellMembrane::MergeSeuratObjs(toMerge, projectName = projectName, doGC = doDiet, errorOnBarcodeSuffix = errorOnBarcodeSuffix) - return(seuratObj) + saveRDS(seuratObj, file = saveFile) + filesToDelete <<- c(filesToDelete, saveFile) + + return(fn) } -if (length(seuratObjects) == 1) { - print('There is only one seurat object, no need to merge') - datasetId <- names(seuratObjects)[[1]] - saveData(readSeuratRDS(seuratObjects[[datasetId]]), datasetId) -} else { - batchSize <- 20 - numBatches <- ceiling(length(seuratObjects) / batchSize) +mergeBatch <- function(seuratObjects, outerBatchIdx, maxBatchSize = 20, maxInputFileSizeMb = maxAllowableInputFileSizeMb) { + logger::log_info(paste0('Beginning outer batch: ', outerBatchIdx, ' with total files: ', length(seuratObjects))) + + if (length(seuratObjects) == 1) { + print('Single file, nothing to do') + return(seuratObjects) + } + + # Phase 1: group into batches: + batchList <- list() + activeBatch <- c() + sizeOfBatch <- 0 + batchIdx <- 1 + for (datasetId in names(seuratObjects)) { + activeBatch <- c(activeBatch, seuratObjects[[datasetId]]) + sizeInMb <- (file.size(seuratObjects[[datasetId]]) / 1024^2) + sizeOfBatch <- sizeOfBatch + sizeInMb + + if (length(activeBatch) >= maxBatchSize || (sizeOfBatch >= maxInputFileSizeMb && length(activeBatch) > 1)) { + logger::log_info(paste0('adding to batch with ', length(activeBatch), ' files and ', sizeOfBatch, 'MB')) + batchList[batchIdx] <- activeBatch + activeBatch <- c() + sizeOfBatch <- 0 + batchIdx <- batchIdx + 1 + next + } + } + + # Account for final files: + if (length(activeBatch) > 0) { + logger::log_info(paste0('finalizing batch with ', length(activeBatch), ' files and ', sizeOfBatch, 'MB')) + batchList[batchIdx] <- activeBatch + } + + if (length(batchList) == 0){ + stop('Error: zero length batchList') + } + mergedObjectFiles <- list() - for (i in 1:numBatches) { - logger::log_info(paste0('Merging batch ', i, ' of ', numBatches)) - start <- 1 + (i-1)*batchSize - end <- min(start+batchSize-1, length(seuratObjects)) - logger::log_info(paste0('processing: ', start, ' to ', end, ' of ', length(seuratObjects))) + for (i in 1:length(batchList)) { + activeBatch <- batchList[[i]] + logger::log_info(paste0('Merging inner batch ', i, ' of ', length(batchList), ' with ', length(activeBatch), ' files')) - fn <- paste0('mergeBatch.', i, '.rds') - saveRDS(mergeBatch(seuratObjects[start:end]), file = fn) - mergedObjectFiles[[i]] <- fn + saveFile <- paste0('merge.', outerBatchIdx, '.', i, '.rds') + mergedObjectFiles[[i]] <- mergeBatchInMemory(activeBatch, saveFile = saveFile) logger::log_info(paste0('mem used: ', R.utils::hsize(as.numeric(pryr::mem_used())))) gc() logger::log_info(paste0('after gc: ', R.utils::hsize(as.numeric(pryr::mem_used())))) } + logger::log_info('Done with inner batch') - logger::log_info('Done with batches') - if (length(mergedObjectFiles) == 1) { - seuratObj <- readRDS(mergedObjectFiles[[1]]) - unlink(mergedObjectFiles[[1]]) - } else { - logger::log_info('performing final merge') - # TODO: check for single cell in object - seuratObj <- readRDS(mergedObjectFiles[[1]]) - unlink(mergedObjectFiles[[1]]) - - for (i in 2:length(mergedObjectFiles)) { - logger::log_info(paste0('Merging final file ', i, ' of ', length(mergedObjectFiles))) - seuratObj <- merge(x = seuratObj, y = readRDS(mergedObjectFiles[[i]]), project = seuratObj@project.name) - if (HasSplitLayers(seuratObj)) { - seuratObj <- MergeSplitLayers(seuratObj) - } + if (length(mergedObjectFiles) > 1) { + return(mergeBatch(mergedObjectFiles, outerBatchIdx = (outerBatchIdx + 1), maxInputFileSizeMb = maxInputFileSizeMb, maxBatchSize = maxBatchSize)) + } - unlink(mergedObjectFiles[[i]]) + return(mergedObjectFiles) +} - logger::log_info(paste0('mem used: ', R.utils::hsize(as.numeric(pryr::mem_used())))) - logger::log_info(paste0('seurat object: ', R.utils::hsize(as.numeric(utils::object.size(seuratObj))))) - gc() - logger::log_info(paste0('after gc: ', R.utils::hsize(as.numeric(pryr::mem_used())))) - } - } +mergedObjectFiles <- mergeBatch(seuratObjects, outerBatchIdx = 1) - gc() +print('Overall merge complete') +gc() +saveData(seuratObjMerged, projectName) - saveData(seuratObj, projectName) +# Cleanup: +for (fn in filesToDelete) { + unlink(fn) } \ No newline at end of file diff --git a/singlecell/src/org/labkey/singlecell/pipeline/singlecell/MergeSeurat.java b/singlecell/src/org/labkey/singlecell/pipeline/singlecell/MergeSeurat.java index b27f985b9..993590b79 100644 --- a/singlecell/src/org/labkey/singlecell/pipeline/singlecell/MergeSeurat.java +++ b/singlecell/src/org/labkey/singlecell/pipeline/singlecell/MergeSeurat.java @@ -35,7 +35,8 @@ public Provider() put("height", 150); put("delimiter", ","); put("stripCharsRe", "/(^['\"]+)|(['\"]+$)/g"); - }}, "RNA.orig").delimiter(",") + }}, "RNA.orig").delimiter(","), + SeuratToolParameter.create("maxAllowableInputFileSizeMb", "Max Allowable Batch Size (MB)", "The largest allowable amount of data (in MB), measured as the size of the RDS files, to be allowed in one unit of data to merge in memory.", "ldk-integerfield", null, 200, "maxAllowableInputFileSizeMb", true, false) ), null, null); }