Skip to content

Commit

Permalink
Merge pull request #67 from OHDSI/type-read-conversion
Browse files Browse the repository at this point in the history
Named vector for col type conversion
  • Loading branch information
azimov authored Aug 5, 2024
2 parents 1fb0f18 + e94c955 commit a59c3fb
Showing 1 changed file with 47 additions and 26 deletions.
73 changes: 47 additions & 26 deletions R/DataModel.R
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ checkAndFixColumnNames <-
expectedNames <- tableSpecs %>%
dplyr::select("columnName") %>%
dplyr::anti_join(dplyr::filter(optionalNames, !.data$columnName %in% observeredNames),
by = "columnName"
by = "columnName"
) %>%
dplyr::arrange("columnName") %>%
dplyr::pull()
Expand Down Expand Up @@ -110,7 +110,7 @@ checkAndFixDataTypes <-
table <- dplyr::mutate_at(table, i, as.numeric)
}
} else if (expectedType == "int") {
if (observedTypes[i] != "integer") {
if (!observedTypes[i] %in% c("integer", "numeric")) {
ParallelLogger::logDebug(
sprintf(
"Column %s in table %s in results folder %s is of type %s, but was expecting %s. Attempting to convert.",
Expand All @@ -121,7 +121,7 @@ checkAndFixDataTypes <-
expectedType
)
)
table <- dplyr::mutate_at(table, i, as.integer)
table <- dplyr::mutate_at(table, i, as.numeric)
}
} else if (expectedType == "varchar") {
if (observedTypes[i] != "character") {
Expand Down Expand Up @@ -181,7 +181,7 @@ checkAndFixDuplicateRows <-
specifications) {
primaryKeys <- specifications %>%
dplyr::filter(.data$tableName == !!tableName &
tolower(.data$primaryKey) == "yes") %>%
tolower(.data$primaryKey) == "yes") %>%
dplyr::select("columnName") %>%
dplyr::pull()
duplicatedRows <- duplicated(table[, primaryKeys])
Expand All @@ -194,7 +194,7 @@ checkAndFixDuplicateRows <-
sum(duplicatedRows)
)
)
return(table[!duplicatedRows, ])
return(table[!duplicatedRows,])
} else {
return(table)
}
Expand All @@ -220,7 +220,7 @@ appendNewRows <-
if (nrow(data) > 0) {
primaryKeys <- specifications %>%
dplyr::filter(.data$tableName == !!tableName &
tolower(.data$primaryKey) == "yes") %>%
tolower(.data$primaryKey) == "yes") %>%
dplyr::select("columnName") %>%
dplyr::pull()
newData <- newData %>%
Expand Down Expand Up @@ -250,10 +250,10 @@ formatDouble <- function(x) {

.truncateTable <- function(tableName, connection, schema, tablePrefix) {
DatabaseConnector::renderTranslateExecuteSql(connection,
"TRUNCATE TABLE @schema.@table_prefix@table;",
table_prefix = tablePrefix,
schema = schema,
table = tableName
"TRUNCATE TABLE @schema.@table_prefix@table;",
table_prefix = tablePrefix,
schema = schema,
table = tableName
)
invisible(NULL)
}
Expand Down Expand Up @@ -354,8 +354,8 @@ uploadChunk <- function(chunk, pos, env, specifications, resultsFolder, connecti
primaryKeyValuesInChunk <- unique(chunk[env$primaryKey])
duplicates <-
dplyr::inner_join(env$primaryKeyValuesInDb,
primaryKeyValuesInChunk,
by = env$primaryKey
primaryKeyValuesInChunk,
by = env$primaryKey
)

if (nrow(duplicates) != 0) {
Expand Down Expand Up @@ -386,7 +386,7 @@ uploadChunk <- function(chunk, pos, env, specifications, resultsFolder, connecti
# Remove duplicates we already dealt with:
env$primaryKeyValuesInDb <-
env$primaryKeyValuesInDb %>%
dplyr::anti_join(duplicates, by = env$primaryKey)
dplyr::anti_join(duplicates, by = env$primaryKey)
}
}
if (nrow(chunk) == 0) {
Expand Down Expand Up @@ -423,12 +423,14 @@ uploadTable <- function(tableName,
purgeSiteDataBeforeUploading,
warnOnMissingTable) {
csvFileName <- paste0(tableName, ".csv")
specifications <- specifications %>%
dplyr::filter(.data$tableName == !!tableName)

if (csvFileName %in% list.files(resultsFolder)) {
rlang::inform(paste0("Uploading file: ", csvFileName, " to table: ", tableName))

primaryKey <- specifications %>%
dplyr::filter(.data$tableName == !!tableName &
tolower(.data$primaryKey) == "yes") %>%
dplyr::filter(tolower(.data$primaryKey) == "yes") %>%
dplyr::select("columnName") %>%
dplyr::pull()

Expand All @@ -443,8 +445,7 @@ uploadTable <- function(tableName,
env$purgeSiteDataBeforeUploading <- purgeSiteDataBeforeUploading
if (purgeSiteDataBeforeUploading && "database_id" %in% primaryKey) {
type <- specifications %>%
dplyr::filter(.data$tableName == !!tableName &
.data$columnName == "database_id") %>%
dplyr::filter(.data$columnName == "database_id") %>%
dplyr::select("dataType") %>%
dplyr::pull()
# Remove the existing data for the databaseId
Expand Down Expand Up @@ -477,12 +478,31 @@ uploadTable <- function(tableName,
env$primaryKeyValuesInDb <- primaryKeyValuesInDb
}

# Remove data size or types
types <- sub(" ", "", sub("\\(.*\\)", "", specifications$dataType))

# Convert the types to readr's col_types format
convertType <- Vectorize(
function(type) {
switch(type,
varchar = "c",
bigint = "n",
int = "n",
date = "D",
"?") # default to guess if type not matched
}
)

types <- convertType(types)
# Create a named vector of column types
names(types) <- specifications$columnName
colTypes <- do.call(readr::cols, as.list(types))

readr::read_csv_chunked(
file = file.path(resultsFolder, csvFileName),
callback = function(chunk, pos) uploadChunk(chunk, pos, env, specifications, resultsFolder, connection, runCheckAndFixCommands, forceOverWriteOfSpecifications),
chunk_size = 1e7,
col_types = readr::cols(),
col_types = colTypes,
guess_max = 1e6,
progress = FALSE
)
Expand Down Expand Up @@ -572,10 +592,10 @@ uploadResults <- function(connection = NULL,
ParallelLogger::logInfo("Removing all records for tables within specification")

invisible(lapply(unique(specifications$tableName),
.truncateTable,
connection = connection,
schema = schema,
tablePrefix = tablePrefix
.truncateTable,
connection = connection,
schema = schema,
tablePrefix = tablePrefix
))
}

Expand Down Expand Up @@ -639,6 +659,7 @@ uploadResults <- function(connection = NULL,
#' @export
deleteAllRowsForPrimaryKey <-
function(connection, schema, tableName, keyValues) {

createSqlStatement <- function(i) {
sql <- paste0(
"DELETE FROM ",
Expand All @@ -647,7 +668,7 @@ deleteAllRowsForPrimaryKey <-
tableName,
"\nWHERE ",
paste(paste0(
colnames(keyValues), " = '", keyValues[i, ], "'"
colnames(keyValues), " = '", keyValues[i,], "'"
), collapse = " AND "),
";"
)
Expand Down Expand Up @@ -722,9 +743,9 @@ deleteAllRowsForDatabaseId <-
database_id = databaseId
)
DatabaseConnector::executeSql(connection,
sql,
progressBar = FALSE,
reportOverallTime = FALSE
sql,
progressBar = FALSE,
reportOverallTime = FALSE
)
}
}
Expand Down

0 comments on commit a59c3fb

Please sign in to comment.