Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic functionality and tests for python csv uploads #73

Merged
merged 14 commits into from
Oct 9, 2024
23 changes: 22 additions & 1 deletion .github/workflows/R_CMD_check_Hades.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ jobs:
CDM5_SPARK_PASSWORD: ${{ secrets.CDM5_SPARK_PASSWORD }}
CDM5_SPARK_CONNECTION_STRING: ${{ secrets.CDM5_SPARK_CONNECTION_STRING }}


steps:
- uses: actions/checkout@v2

Expand All @@ -75,9 +76,29 @@ jobs:

- uses: r-lib/actions/setup-r-dependencies@v2
with:
extra-packages: any::rcmdcheck
extra-packages: any::rcmdcheck reticulate
needs: check


- uses: actions/setup-python@v4
with:
python-version: "3.x"

- name: setup r-reticulate venv
shell: Rscript {0}
run: |

path_to_python <- reticulate::virtualenv_create(
envname = "r-reticulate",
python = Sys.which("python"), # placed on PATH by the setup-python action
packages = c(
"psycopg2-binary"
)
)

writeLines(sprintf("RETICULATE_PYTHON=%s", path_to_python),
Sys.getenv("GITHUB_ENV"))

- uses: r-lib/actions/check-r-package@v2
with:
args: 'c("--no-manual", "--as-cran")'
Expand Down
14 changes: 14 additions & 0 deletions .github/workflows/R_CMD_check_main_weekly.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,20 @@ jobs:

- uses: r-lib/actions/setup-pandoc@v2

- name: setup r-reticulate venv
shell: Rscript {0}
run: |

path_to_python <- reticulate::virtualenv_create(
envname = "r-reticulate",
python = Sys.which("python"), # placed on PATH by the setup-python action
packages = c(
"psycopg2-binary"
)
)

writeLines(sprintf("RETICULATE_PYTHON=%s", path_to_python),
Sys.getenv("GITHUB_ENV"))
- uses: r-lib/actions/setup-r-dependencies@v2
with:
extra-packages: any::rcmdcheck
Expand Down
6 changes: 4 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: ResultModelManager
Title: Result Model Manager
Version: 0.5.11
Version: 0.6.0
Authors@R:
person("Jamie", "Gilbert", , "gilbert@ohdsi.org", role = c("aut", "cre"))
Description: Database data model management utilities for R packages in the Observational Health Data Sciences and
Expand Down Expand Up @@ -42,5 +42,7 @@ Suggests:
pkgdown,
remotes,
styler,
Andromeda
Andromeda,
rJava,
reticulate
Config/testthat/edition: 3
6 changes: 6 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,15 @@ export(createQueryNamespace)
export(createResultExportManager)
export(deleteAllRowsForDatabaseId)
export(deleteAllRowsForPrimaryKey)
export(disablePythonUploads)
export(enablePythonUploads)
export(generateSqlSchema)
export(grantTablePermissions)
export(install_psycopg2)
export(loadResultsDataModelSpecifications)
export(pyPgUploadEnabled)
export(pyUploadCsv)
export(pyUploadDataFrame)
export(unzipResults)
export(uploadResults)
import(DatabaseConnector)
Expand Down
6 changes: 6 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# ResultModelManager 0.6.0

Changes:
1. Added optional wrapper functions for python based


# ResultModelManager 0.5.11

Changes:
Expand Down
100 changes: 61 additions & 39 deletions R/DataModel.R
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ checkAndFixColumnNames <-

expectedNames <- tableSpecs %>%
dplyr::select("columnName") %>%
dplyr::anti_join(dplyr::filter(optionalNames, !.data$columnName %in% observeredNames),
dplyr::anti_join(
dplyr::filter(optionalNames, !.data$columnName %in% observeredNames),
by = "columnName"
) %>%
dplyr::arrange("columnName") %>%
Expand Down Expand Up @@ -181,7 +182,7 @@ checkAndFixDuplicateRows <-
specifications) {
primaryKeys <- specifications %>%
dplyr::filter(.data$tableName == !!tableName &
tolower(.data$primaryKey) == "yes") %>%
tolower(.data$primaryKey) == "yes") %>%
dplyr::select("columnName") %>%
dplyr::pull()
duplicatedRows <- duplicated(table[, primaryKeys])
Expand All @@ -194,7 +195,7 @@ checkAndFixDuplicateRows <-
sum(duplicatedRows)
)
)
return(table[!duplicatedRows, ])
return(table[!duplicatedRows,])
} else {
return(table)
}
Expand All @@ -220,7 +221,7 @@ appendNewRows <-
if (nrow(data) > 0) {
primaryKeys <- specifications %>%
dplyr::filter(.data$tableName == !!tableName &
tolower(.data$primaryKey) == "yes") %>%
tolower(.data$primaryKey) == "yes") %>%
dplyr::select("columnName") %>%
dplyr::pull()
newData <- newData %>%
Expand Down Expand Up @@ -250,10 +251,10 @@ formatDouble <- function(x) {

.truncateTable <- function(tableName, connection, schema, tablePrefix) {
DatabaseConnector::renderTranslateExecuteSql(connection,
"TRUNCATE TABLE @schema.@table_prefix@table;",
table_prefix = tablePrefix,
schema = schema,
table = tableName
"TRUNCATE TABLE @schema.@table_prefix@table;",
table_prefix = tablePrefix,
schema = schema,
table = tableName
)
invisible(NULL)
}
Expand All @@ -273,7 +274,7 @@ formatDouble <- function(x) {
}


uploadChunk <- function(chunk, pos, env, specifications, resultsFolder, connection, runCheckAndFixCommands, forceOverWriteOfSpecifications) {
uploadChunk <- function(chunk, pos, env, specifications, resultsFolder, connection, pythonConnection, runCheckAndFixCommands, forceOverWriteOfSpecifications) {
ParallelLogger::logInfo(
"- Preparing to upload rows ",
pos,
Expand Down Expand Up @@ -354,8 +355,8 @@ uploadChunk <- function(chunk, pos, env, specifications, resultsFolder, connecti
primaryKeyValuesInChunk <- unique(chunk[env$primaryKey])
duplicates <-
dplyr::inner_join(env$primaryKeyValuesInDb,
primaryKeyValuesInChunk,
by = env$primaryKey
primaryKeyValuesInChunk,
by = env$primaryKey
)

if (nrow(duplicates) != 0) {
Expand Down Expand Up @@ -386,23 +387,33 @@ uploadChunk <- function(chunk, pos, env, specifications, resultsFolder, connecti
# Remove duplicates we already dealt with:
env$primaryKeyValuesInDb <-
env$primaryKeyValuesInDb %>%
dplyr::anti_join(duplicates, by = env$primaryKey)
dplyr::anti_join(duplicates, by = env$primaryKey)
}
}
if (nrow(chunk) == 0) {
ParallelLogger::logInfo("- No data left to insert")
} else {
insertTableStatus <- tryCatch(expr = {
DatabaseConnector::insertTable(
connection = connection,
tableName = env$tableName,
databaseSchema = env$schema,
data = chunk,
dropTableIfExists = FALSE,
createTable = FALSE,
tempTable = FALSE,
progressBar = TRUE
)
if (!is.null(pythonConnection)) {
tryCatch({
.pgWriteDataFrame(chunk, pyConnection = pythonConnection, table = env$tableName, schema = env$schema)
}, error = function(error) {
# rollback write of data
pythonConnection$rollback()
stop(error)
})
} else {
DatabaseConnector::insertTable(
connection = connection,
tableName = env$tableName,
databaseSchema = env$schema,
data = chunk,
dropTableIfExists = FALSE,
createTable = FALSE,
tempTable = FALSE,
progressBar = TRUE
)
}
}, error = function(e) e)
if (inherits(insertTableStatus, "error")) {
stop(insertTableStatus$message)
Expand All @@ -421,7 +432,8 @@ uploadTable <- function(tableName,
runCheckAndFixCommands,
forceOverWriteOfSpecifications,
purgeSiteDataBeforeUploading,
warnOnMissingTable) {
warnOnMissingTable,
pythonConnection) {
csvFileName <- paste0(tableName, ".csv")
specifications <- specifications %>%
dplyr::filter(.data$tableName == !!tableName)
Expand Down Expand Up @@ -485,11 +497,11 @@ uploadTable <- function(tableName,
convertType <- Vectorize(
function(type) {
switch(type,
varchar = "c",
bigint = "n",
int = "n",
date = "D",
"?"
varchar = "c",
bigint = "n",
int = "n",
date = "D",
"?"
) # default to guess if type not matched
}
)
Expand All @@ -501,7 +513,7 @@ uploadTable <- function(tableName,

readr::read_csv_chunked(
file = file.path(resultsFolder, csvFileName),
callback = function(chunk, pos) uploadChunk(chunk, pos, env, specifications, resultsFolder, connection, runCheckAndFixCommands, forceOverWriteOfSpecifications),
callback = function(chunk, pos) uploadChunk(chunk, pos, env, specifications, resultsFolder, connection, pythonConnection, runCheckAndFixCommands, forceOverWriteOfSpecifications),
chunk_size = 1e7,
col_types = colTypes,
guess_max = 1e6,
Expand Down Expand Up @@ -593,10 +605,10 @@ uploadResults <- function(connection = NULL,
ParallelLogger::logInfo("Removing all records for tables within specification")

invisible(lapply(unique(specifications$tableName),
.truncateTable,
connection = connection,
schema = schema,
tablePrefix = tablePrefix
.truncateTable,
connection = connection,
schema = schema,
tablePrefix = tablePrefix
))
}

Expand Down Expand Up @@ -625,7 +637,11 @@ uploadResults <- function(connection = NULL,
)
}
}

pythonConnection <- NULL
if (DatabaseConnector::dbms(connection) == "postgresql" && pyPgUploadEnabled()) {
pythonConnection <- .createPyConnection(connection)
on.exit(pythonConnection$close(), add = TRUE)
}
for (tableName in unique(specifications$tableName)) {
uploadTable(
tableName,
Expand All @@ -638,8 +654,13 @@ uploadResults <- function(connection = NULL,
runCheckAndFixCommands,
forceOverWriteOfSpecifications,
purgeSiteDataBeforeUploading,
warnOnMissingTable
warnOnMissingTable,
pythonConnection = pythonConnection
)

if (!is.null(pythonConnection)) {
pythonConnection$commit()
}
}

delta <- Sys.time() - start
Expand All @@ -660,6 +681,7 @@ uploadResults <- function(connection = NULL,
#' @export
deleteAllRowsForPrimaryKey <-
function(connection, schema, tableName, keyValues) {

createSqlStatement <- function(i) {
sql <- paste0(
"DELETE FROM ",
Expand All @@ -668,7 +690,7 @@ deleteAllRowsForPrimaryKey <-
tableName,
"\nWHERE ",
paste(paste0(
colnames(keyValues), " = '", keyValues[i, ], "'"
colnames(keyValues), " = '", keyValues[i,], "'"
), collapse = " AND "),
";"
)
Expand Down Expand Up @@ -743,9 +765,9 @@ deleteAllRowsForDatabaseId <-
database_id = databaseId
)
DatabaseConnector::executeSql(connection,
sql,
progressBar = FALSE,
reportOverallTime = FALSE
sql,
progressBar = FALSE,
reportOverallTime = FALSE
)
}
}
Expand Down
Loading
Loading