From a63d144a02d30120f2b5d713bda7b1b07a9e1ed0 Mon Sep 17 00:00:00 2001 From: jgilber2 Date: Mon, 29 Jul 2024 16:20:06 -0700 Subject: [PATCH 1/2] Named vector for col type conversion --- R/DataModel.R | 61 ++++++++++++++++++++++++++++++++------------------- 1 file changed, 39 insertions(+), 22 deletions(-) diff --git a/R/DataModel.R b/R/DataModel.R index c1c1685..262d283 100644 --- a/R/DataModel.R +++ b/R/DataModel.R @@ -49,7 +49,7 @@ checkAndFixColumnNames <- expectedNames <- tableSpecs %>% dplyr::select("columnName") %>% dplyr::anti_join(dplyr::filter(optionalNames, !.data$columnName %in% observeredNames), - by = "columnName" + by = "columnName" ) %>% dplyr::arrange("columnName") %>% dplyr::pull() @@ -181,7 +181,7 @@ checkAndFixDuplicateRows <- specifications) { primaryKeys <- specifications %>% dplyr::filter(.data$tableName == !!tableName & - tolower(.data$primaryKey) == "yes") %>% + tolower(.data$primaryKey) == "yes") %>% dplyr::select("columnName") %>% dplyr::pull() duplicatedRows <- duplicated(table[, primaryKeys]) @@ -194,7 +194,7 @@ checkAndFixDuplicateRows <- sum(duplicatedRows) ) ) - return(table[!duplicatedRows, ]) + return(table[!duplicatedRows,]) } else { return(table) } @@ -220,7 +220,7 @@ appendNewRows <- if (nrow(data) > 0) { primaryKeys <- specifications %>% dplyr::filter(.data$tableName == !!tableName & - tolower(.data$primaryKey) == "yes") %>% + tolower(.data$primaryKey) == "yes") %>% dplyr::select("columnName") %>% dplyr::pull() newData <- newData %>% @@ -250,10 +250,10 @@ formatDouble <- function(x) { .truncateTable <- function(tableName, connection, schema, tablePrefix) { DatabaseConnector::renderTranslateExecuteSql(connection, - "TRUNCATE TABLE @schema.@table_prefix@table;", - table_prefix = tablePrefix, - schema = schema, - table = tableName + "TRUNCATE TABLE @schema.@table_prefix@table;", + table_prefix = tablePrefix, + schema = schema, + table = tableName ) invisible(NULL) } @@ -354,8 +354,8 @@ uploadChunk <- function(chunk, pos, env, specifications, resultsFolder, connecti primaryKeyValuesInChunk <- unique(chunk[env$primaryKey]) duplicates <- dplyr::inner_join(env$primaryKeyValuesInDb, - primaryKeyValuesInChunk, - by = env$primaryKey + primaryKeyValuesInChunk, + by = env$primaryKey ) if (nrow(duplicates) != 0) { @@ -386,7 +386,7 @@ uploadChunk <- function(chunk, pos, env, specifications, resultsFolder, connecti # Remove duplicates we already dealt with: env$primaryKeyValuesInDb <- env$primaryKeyValuesInDb %>% - dplyr::anti_join(duplicates, by = env$primaryKey) + dplyr::anti_join(duplicates, by = env$primaryKey) } } if (nrow(chunk) == 0) { @@ -428,7 +428,7 @@ uploadTable <- function(tableName, primaryKey <- specifications %>% dplyr::filter(.data$tableName == !!tableName & - tolower(.data$primaryKey) == "yes") %>% + tolower(.data$primaryKey) == "yes") %>% dplyr::select("columnName") %>% dplyr::pull() @@ -444,7 +444,7 @@ uploadTable <- function(tableName, if (purgeSiteDataBeforeUploading && "database_id" %in% primaryKey) { type <- specifications %>% dplyr::filter(.data$tableName == !!tableName & - .data$columnName == "database_id") %>% + .data$columnName == "database_id") %>% dplyr::select("dataType") %>% dplyr::pull() # Remove the existing data for the databaseId @@ -477,12 +477,28 @@ uploadTable <- function(tableName, env$primaryKeyValuesInDb <- primaryKeyValuesInDb } + types <- sub(" ", "", sub("\\(.*\\)", "", specifications$dataType)) + # Create a named vector of column types + colTypesVec <- setNames(types, specifications$columnName) + + # Convert the types to readr's col_types format + convertType <- function(type) { + switch(type, + varchar = "c", + bigint = "n", + integer = "n", + int = "n", + date = "D", + "?") # default to guess if type not matched + } + + colTypes <- paste0(sapply(colTypesVec, convertType), collapse = "") readr::read_csv_chunked( file = file.path(resultsFolder, csvFileName), callback = function(chunk, pos) uploadChunk(chunk, pos, env, specifications, resultsFolder, connection, runCheckAndFixCommands, forceOverWriteOfSpecifications), chunk_size = 1e7, - col_types = readr::cols(), + col_types = colTypes, guess_max = 1e6, progress = FALSE ) @@ -572,10 +588,10 @@ uploadResults <- function(connection = NULL, ParallelLogger::logInfo("Removing all records for tables within specification") invisible(lapply(unique(specifications$tableName), - .truncateTable, - connection = connection, - schema = schema, - tablePrefix = tablePrefix + .truncateTable, + connection = connection, + schema = schema, + tablePrefix = tablePrefix )) } @@ -639,6 +655,7 @@ uploadResults <- function(connection = NULL, #' @export deleteAllRowsForPrimaryKey <- function(connection, schema, tableName, keyValues) { + createSqlStatement <- function(i) { sql <- paste0( "DELETE FROM ", @@ -647,7 +664,7 @@ deleteAllRowsForPrimaryKey <- tableName, "\nWHERE ", paste(paste0( - colnames(keyValues), " = '", keyValues[i, ], "'" + colnames(keyValues), " = '", keyValues[i,], "'" ), collapse = " AND "), ";" ) @@ -722,9 +739,9 @@ deleteAllRowsForDatabaseId <- database_id = databaseId ) DatabaseConnector::executeSql(connection, - sql, - progressBar = FALSE, - reportOverallTime = FALSE + sql, + progressBar = FALSE, + reportOverallTime = FALSE ) } } From e94c9555437f89bee32bd6fcfa1d4f484d9624af Mon Sep 17 00:00:00 2001 From: jgilber2 Date: Tue, 30 Jul 2024 09:09:11 -0700 Subject: [PATCH 2/2] actually run type conversions correctly. Use numeric instead of integer --- R/DataModel.R | 40 ++++++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/R/DataModel.R b/R/DataModel.R index 262d283..3c3e342 100644 --- a/R/DataModel.R +++ b/R/DataModel.R @@ -110,7 +110,7 @@ checkAndFixDataTypes <- table <- dplyr::mutate_at(table, i, as.numeric) } } else if (expectedType == "int") { - if (observedTypes[i] != "integer") { + if (!observedTypes[i] %in% c("integer", "numeric")) { ParallelLogger::logDebug( sprintf( "Column %s in table %s in results folder %s is of type %s, but was expecting %s. Attempting to convert.", @@ -121,7 +121,7 @@ checkAndFixDataTypes <- expectedType ) ) - table <- dplyr::mutate_at(table, i, as.integer) + table <- dplyr::mutate_at(table, i, as.numeric) } } else if (expectedType == "varchar") { if (observedTypes[i] != "character") { @@ -423,12 +423,14 @@ uploadTable <- function(tableName, purgeSiteDataBeforeUploading, warnOnMissingTable) { csvFileName <- paste0(tableName, ".csv") + specifications <- specifications %>% + dplyr::filter(.data$tableName == !!tableName) + if (csvFileName %in% list.files(resultsFolder)) { rlang::inform(paste0("Uploading file: ", csvFileName, " to table: ", tableName)) primaryKey <- specifications %>% - dplyr::filter(.data$tableName == !!tableName & - tolower(.data$primaryKey) == "yes") %>% + dplyr::filter(tolower(.data$primaryKey) == "yes") %>% dplyr::select("columnName") %>% dplyr::pull() @@ -443,8 +445,7 @@ uploadTable <- function(tableName, env$purgeSiteDataBeforeUploading <- purgeSiteDataBeforeUploading if (purgeSiteDataBeforeUploading && "database_id" %in% primaryKey) { type <- specifications %>% - dplyr::filter(.data$tableName == !!tableName & - .data$columnName == "database_id") %>% + dplyr::filter(.data$columnName == "database_id") %>% dplyr::select("dataType") %>% dplyr::pull() # Remove the existing data for the databaseId @@ -477,22 +478,25 @@ uploadTable <- function(tableName, env$primaryKeyValuesInDb <- primaryKeyValuesInDb } + # Remove data size or types types <- sub(" ", "", sub("\\(.*\\)", "", specifications$dataType)) - # Create a named vector of column types - colTypesVec <- setNames(types, specifications$columnName) # Convert the types to readr's col_types format - convertType <- function(type) { - switch(type, - varchar = "c", - bigint = "n", - integer = "n", - int = "n", - date = "D", - "?") # default to guess if type not matched - } + convertType <- Vectorize( + function(type) { + switch(type, + varchar = "c", + bigint = "n", + int = "n", + date = "D", + "?") # default to guess if type not matched + } + ) - colTypes <- paste0(sapply(colTypesVec, convertType), collapse = "") + types <- convertType(types) + # Create a named vector of column types + names(types) <- specifications$columnName + colTypes <- do.call(readr::cols, as.list(types)) readr::read_csv_chunked( file = file.path(resultsFolder, csvFileName),