Skip to content

Commit

Permalink
Rework MergeSeurat to handle large data better
Browse files Browse the repository at this point in the history
  • Loading branch information
bbimber committed Jan 31, 2025
1 parent ec32afa commit d8a8d9a
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 46 deletions.
111 changes: 66 additions & 45 deletions singlecell/resources/chunks/MergeSeurat.R
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@ if (!doDiet && length(seuratObjects) > 20 && !disableAutoDietSeurat) {
doDiet <- TRUE
}

mergeBatch <- function(dat) {
filesToDelete <- c()

mergeBatchInMemory <- function(datasetIdToFilePath, saveFile) {
toMerge <- list()
for (datasetId in names(dat)) {
for (datasetId in names(datasetIdToFilePath)) {
print(paste0('Loading: ', datasetId))
if (doDiet) {
toMerge[[datasetId]] <- Seurat::DietSeurat(readSeuratRDS(dat[[datasetId]]))
toMerge[[datasetId]] <- Seurat::DietSeurat(readSeuratRDS(datasetIdToFilePath[[datasetId]]))
gc()
} else {
toMerge[[datasetId]] <- readSeuratRDS(dat[[datasetId]])
toMerge[[datasetId]] <- readSeuratRDS(datasetIdToFilePath[[datasetId]])
}

if (ncol(toMerge[[datasetId]]) == 1) {
Expand All @@ -38,59 +40,78 @@ mergeBatch <- function(dat) {
}

seuratObj <- CellMembrane::MergeSeuratObjs(toMerge, projectName = projectName, doGC = doDiet, errorOnBarcodeSuffix = errorOnBarcodeSuffix)
return(seuratObj)
saveRDS(seuratObj, file = saveFile)
filesToDelete <<- c(filesToDelete, saveFile)

return(fn)
}

if (length(seuratObjects) == 1) {
print('There is only one seurat object, no need to merge')
datasetId <- names(seuratObjects)[[1]]
saveData(readSeuratRDS(seuratObjects[[datasetId]]), datasetId)
} else {
batchSize <- 20
numBatches <- ceiling(length(seuratObjects) / batchSize)
mergeBatch <- function(seuratObjects, outerBatchIdx, maxBatchSize = 20, maxInputFileSizeMb = maxAllowableInputFileSizeMb) {
logger::log_info(paste0('Beginning outer batch: ', outerBatchIdx, ' with total files: ', length(seuratObjects)))

if (length(seuratObjects) == 1) {
print('Single file, nothing to do')
return(seuratObjects)
}

# Phase 1: group into batches:
batchList <- list()
activeBatch <- c()
sizeOfBatch <- 0
batchIdx <- 1
for (datasetId in names(seuratObjects)) {
activeBatch <- c(activeBatch, seuratObjects[[datasetId]])
sizeInMb <- (file.size(seuratObjects[[datasetId]]) / 1024^2)
sizeOfBatch <- sizeOfBatch + sizeInMb

if (length(activeBatch) >= maxBatchSize || (sizeOfBatch >= maxInputFileSizeMb && length(activeBatch) > 1)) {
logger::log_info(paste0('adding to batch with ', length(activeBatch), ' files and ', sizeOfBatch, 'MB'))
batchList[batchIdx] <- activeBatch
activeBatch <- c()
sizeOfBatch <- 0
batchIdx <- batchIdx + 1
next
}
}

# Account for final files:
if (length(activeBatch) > 0) {
logger::log_info(paste0('finalizing batch with ', length(activeBatch), ' files and ', sizeOfBatch, 'MB'))
batchList[batchIdx] <- activeBatch
}

if (length(batchList) == 0){
stop('Error: zero length batchList')
}

mergedObjectFiles <- list()
for (i in 1:numBatches) {
logger::log_info(paste0('Merging batch ', i, ' of ', numBatches))
start <- 1 + (i-1)*batchSize
end <- min(start+batchSize-1, length(seuratObjects))
logger::log_info(paste0('processing: ', start, ' to ', end, ' of ', length(seuratObjects)))
for (i in 1:length(batchList)) {
activeBatch <- batchList[[i]]
logger::log_info(paste0('Merging inner batch ', i, ' of ', length(batchList), ' with ', length(activeBatch), ' files'))

fn <- paste0('mergeBatch.', i, '.rds')
saveRDS(mergeBatch(seuratObjects[start:end]), file = fn)
mergedObjectFiles[[i]] <- fn
saveFile <- paste0('merge.', outerBatchIdx, '.', i, '.rds')
mergedObjectFiles[[i]] <- mergeBatchInMemory(activeBatch, saveFile = saveFile)

logger::log_info(paste0('mem used: ', R.utils::hsize(as.numeric(pryr::mem_used()))))
gc()
logger::log_info(paste0('after gc: ', R.utils::hsize(as.numeric(pryr::mem_used()))))
}
logger::log_info('Done with inner batch')

logger::log_info('Done with batches')
if (length(mergedObjectFiles) == 1) {
seuratObj <- readRDS(mergedObjectFiles[[1]])
unlink(mergedObjectFiles[[1]])
} else {
logger::log_info('performing final merge')
# TODO: check for single cell in object
seuratObj <- readRDS(mergedObjectFiles[[1]])
unlink(mergedObjectFiles[[1]])

for (i in 2:length(mergedObjectFiles)) {
logger::log_info(paste0('Merging final file ', i, ' of ', length(mergedObjectFiles)))
seuratObj <- merge(x = seuratObj, y = readRDS(mergedObjectFiles[[i]]), project = seuratObj@project.name)
if (HasSplitLayers(seuratObj)) {
seuratObj <- MergeSplitLayers(seuratObj)
}
if (length(mergedObjectFiles) > 1) {
return(mergeBatch(mergedObjectFiles, outerBatchIdx = (outerBatchIdx + 1), maxInputFileSizeMb = maxInputFileSizeMb, maxBatchSize = maxBatchSize))
}

unlink(mergedObjectFiles[[i]])
return(mergedObjectFiles)
}

logger::log_info(paste0('mem used: ', R.utils::hsize(as.numeric(pryr::mem_used()))))
logger::log_info(paste0('seurat object: ', R.utils::hsize(as.numeric(utils::object.size(seuratObj)))))
gc()
logger::log_info(paste0('after gc: ', R.utils::hsize(as.numeric(pryr::mem_used()))))
}
}
mergedObjectFiles <- mergeBatch(seuratObjects, outerBatchIdx = 1)

gc()
print('Overall merge complete')
gc()
saveData(seuratObjMerged, projectName)

saveData(seuratObj, projectName)
# Cleanup:
for (fn in filesToDelete) {
unlink(fn)
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public Provider()
put("height", 150);
put("delimiter", ",");
put("stripCharsRe", "/(^['\"]+)|(['\"]+$)/g");
}}, "RNA.orig").delimiter(",")
}}, "RNA.orig").delimiter(","),
SeuratToolParameter.create("maxAllowableInputFileSizeMb", "Max Allowable Batch Size (MB)", "The largest allowable amount of data (in MB), measured as the size of the RDS files, to be allowed in one unit of data to merge in memory.", "ldk-integerfield", null, 200, "maxAllowableInputFileSizeMb", true, false)
), null, null);
}

Expand Down

0 comments on commit d8a8d9a

Please sign in to comment.