diff --git a/.github/workflows/R_CMD_check_Hades.yml b/.github/workflows/R_CMD_check_Hades.yml index 049c900..5ab7611 100644 --- a/.github/workflows/R_CMD_check_Hades.yml +++ b/.github/workflows/R_CMD_check_Hades.yml @@ -52,6 +52,7 @@ jobs: CDM5_SPARK_PASSWORD: ${{ secrets.CDM5_SPARK_PASSWORD }} CDM5_SPARK_CONNECTION_STRING: ${{ secrets.CDM5_SPARK_CONNECTION_STRING }} + steps: - uses: actions/checkout@v2 @@ -75,9 +76,29 @@ jobs: - uses: r-lib/actions/setup-r-dependencies@v2 with: - extra-packages: any::rcmdcheck + extra-packages: any::rcmdcheck reticulate needs: check + + - uses: actions/setup-python@v4 + with: + python-version: "3.x" + + - name: setup r-reticulate venv + shell: Rscript {0} + run: | + + path_to_python <- reticulate::virtualenv_create( + envname = "r-reticulate", + python = Sys.which("python"), # placed on PATH by the setup-python action + packages = c( + "psycopg2-binary" + ) + ) + + writeLines(sprintf("RETICULATE_PYTHON=%s", path_to_python), + Sys.getenv("GITHUB_ENV")) + - uses: r-lib/actions/check-r-package@v2 with: args: 'c("--no-manual", "--as-cran")' diff --git a/.github/workflows/R_CMD_check_main_weekly.yaml b/.github/workflows/R_CMD_check_main_weekly.yaml index b96f76c..238fac1 100644 --- a/.github/workflows/R_CMD_check_main_weekly.yaml +++ b/.github/workflows/R_CMD_check_main_weekly.yaml @@ -59,6 +59,20 @@ jobs: - uses: r-lib/actions/setup-pandoc@v2 + - name: setup r-reticulate venv + shell: Rscript {0} + run: | + + path_to_python <- reticulate::virtualenv_create( + envname = "r-reticulate", + python = Sys.which("python"), # placed on PATH by the setup-python action + packages = c( + "psycopg2-binary" + ) + ) + + writeLines(sprintf("RETICULATE_PYTHON=%s", path_to_python), + Sys.getenv("GITHUB_ENV")) - uses: r-lib/actions/setup-r-dependencies@v2 with: extra-packages: any::rcmdcheck diff --git a/DESCRIPTION b/DESCRIPTION index c4b08fc..fe65715 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: ResultModelManager Title: Result Model Manager -Version: 0.5.11 +Version: 0.6.0 Authors@R: person("Jamie", "Gilbert", , "gilbert@ohdsi.org", role = c("aut", "cre")) Description: Database data model management utilities for R packages in the Observational Health Data Sciences and @@ -42,5 +42,7 @@ Suggests: pkgdown, remotes, styler, - Andromeda + Andromeda, + rJava, + reticulate Config/testthat/edition: 3 diff --git a/NAMESPACE b/NAMESPACE index c3b8599..cb6a966 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -9,9 +9,15 @@ export(createQueryNamespace) export(createResultExportManager) export(deleteAllRowsForDatabaseId) export(deleteAllRowsForPrimaryKey) +export(disablePythonUploads) +export(enablePythonUploads) export(generateSqlSchema) export(grantTablePermissions) +export(install_psycopg2) export(loadResultsDataModelSpecifications) +export(pyPgUploadEnabled) +export(pyUploadCsv) +export(pyUploadDataFrame) export(unzipResults) export(uploadResults) import(DatabaseConnector) diff --git a/NEWS.md b/NEWS.md index 90cc835..d2b230c 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,9 @@ +# ResultModelManager 0.6.0 + +Changes: +1. Added optional wrapper functions for python based + + # ResultModelManager 0.5.11 Changes: diff --git a/R/DataModel.R b/R/DataModel.R index 95d01a6..eef6190 100644 --- a/R/DataModel.R +++ b/R/DataModel.R @@ -48,7 +48,8 @@ checkAndFixColumnNames <- expectedNames <- tableSpecs %>% dplyr::select("columnName") %>% - dplyr::anti_join(dplyr::filter(optionalNames, !.data$columnName %in% observeredNames), + dplyr::anti_join( + dplyr::filter(optionalNames, !.data$columnName %in% observeredNames), by = "columnName" ) %>% dplyr::arrange("columnName") %>% @@ -181,7 +182,7 @@ checkAndFixDuplicateRows <- specifications) { primaryKeys <- specifications %>% dplyr::filter(.data$tableName == !!tableName & - tolower(.data$primaryKey) == "yes") %>% + tolower(.data$primaryKey) == "yes") %>% dplyr::select("columnName") %>% dplyr::pull() duplicatedRows <- duplicated(table[, primaryKeys]) @@ -194,7 +195,7 @@ checkAndFixDuplicateRows <- sum(duplicatedRows) ) ) - return(table[!duplicatedRows, ]) + return(table[!duplicatedRows,]) } else { return(table) } @@ -220,7 +221,7 @@ appendNewRows <- if (nrow(data) > 0) { primaryKeys <- specifications %>% dplyr::filter(.data$tableName == !!tableName & - tolower(.data$primaryKey) == "yes") %>% + tolower(.data$primaryKey) == "yes") %>% dplyr::select("columnName") %>% dplyr::pull() newData <- newData %>% @@ -250,10 +251,10 @@ formatDouble <- function(x) { .truncateTable <- function(tableName, connection, schema, tablePrefix) { DatabaseConnector::renderTranslateExecuteSql(connection, - "TRUNCATE TABLE @schema.@table_prefix@table;", - table_prefix = tablePrefix, - schema = schema, - table = tableName + "TRUNCATE TABLE @schema.@table_prefix@table;", + table_prefix = tablePrefix, + schema = schema, + table = tableName ) invisible(NULL) } @@ -273,7 +274,7 @@ formatDouble <- function(x) { } -uploadChunk <- function(chunk, pos, env, specifications, resultsFolder, connection, runCheckAndFixCommands, forceOverWriteOfSpecifications) { +uploadChunk <- function(chunk, pos, env, specifications, resultsFolder, connection, pythonConnection, runCheckAndFixCommands, forceOverWriteOfSpecifications) { ParallelLogger::logInfo( "- Preparing to upload rows ", pos, @@ -354,8 +355,8 @@ uploadChunk <- function(chunk, pos, env, specifications, resultsFolder, connecti primaryKeyValuesInChunk <- unique(chunk[env$primaryKey]) duplicates <- dplyr::inner_join(env$primaryKeyValuesInDb, - primaryKeyValuesInChunk, - by = env$primaryKey + primaryKeyValuesInChunk, + by = env$primaryKey ) if (nrow(duplicates) != 0) { @@ -386,23 +387,33 @@ uploadChunk <- function(chunk, pos, env, specifications, resultsFolder, connecti # Remove duplicates we already dealt with: env$primaryKeyValuesInDb <- env$primaryKeyValuesInDb %>% - dplyr::anti_join(duplicates, by = env$primaryKey) + dplyr::anti_join(duplicates, by = env$primaryKey) } } if (nrow(chunk) == 0) { ParallelLogger::logInfo("- No data left to insert") } else { insertTableStatus <- tryCatch(expr = { - DatabaseConnector::insertTable( - connection = connection, - tableName = env$tableName, - databaseSchema = env$schema, - data = chunk, - dropTableIfExists = FALSE, - createTable = FALSE, - tempTable = FALSE, - progressBar = TRUE - ) + if (!is.null(pythonConnection)) { + tryCatch({ + .pgWriteDataFrame(chunk, pyConnection = pythonConnection, table = env$tableName, schema = env$schema) + }, error = function(error) { + # rollback write of data + pythonConnection$rollback() + stop(error) + }) + } else { + DatabaseConnector::insertTable( + connection = connection, + tableName = env$tableName, + databaseSchema = env$schema, + data = chunk, + dropTableIfExists = FALSE, + createTable = FALSE, + tempTable = FALSE, + progressBar = TRUE + ) + } }, error = function(e) e) if (inherits(insertTableStatus, "error")) { stop(insertTableStatus$message) @@ -421,7 +432,8 @@ uploadTable <- function(tableName, runCheckAndFixCommands, forceOverWriteOfSpecifications, purgeSiteDataBeforeUploading, - warnOnMissingTable) { + warnOnMissingTable, + pythonConnection) { csvFileName <- paste0(tableName, ".csv") specifications <- specifications %>% dplyr::filter(.data$tableName == !!tableName) @@ -485,11 +497,11 @@ uploadTable <- function(tableName, convertType <- Vectorize( function(type) { switch(type, - varchar = "c", - bigint = "n", - int = "n", - date = "D", - "?" + varchar = "c", + bigint = "n", + int = "n", + date = "D", + "?" ) # default to guess if type not matched } ) @@ -501,7 +513,7 @@ uploadTable <- function(tableName, readr::read_csv_chunked( file = file.path(resultsFolder, csvFileName), - callback = function(chunk, pos) uploadChunk(chunk, pos, env, specifications, resultsFolder, connection, runCheckAndFixCommands, forceOverWriteOfSpecifications), + callback = function(chunk, pos) uploadChunk(chunk, pos, env, specifications, resultsFolder, connection, pythonConnection, runCheckAndFixCommands, forceOverWriteOfSpecifications), chunk_size = 1e7, col_types = colTypes, guess_max = 1e6, @@ -593,10 +605,10 @@ uploadResults <- function(connection = NULL, ParallelLogger::logInfo("Removing all records for tables within specification") invisible(lapply(unique(specifications$tableName), - .truncateTable, - connection = connection, - schema = schema, - tablePrefix = tablePrefix + .truncateTable, + connection = connection, + schema = schema, + tablePrefix = tablePrefix )) } @@ -625,7 +637,11 @@ uploadResults <- function(connection = NULL, ) } } - + pythonConnection <- NULL + if (DatabaseConnector::dbms(connection) == "postgresql" && pyPgUploadEnabled()) { + pythonConnection <- .createPyConnection(connection) + on.exit(pythonConnection$close(), add = TRUE) + } for (tableName in unique(specifications$tableName)) { uploadTable( tableName, @@ -638,8 +654,13 @@ uploadResults <- function(connection = NULL, runCheckAndFixCommands, forceOverWriteOfSpecifications, purgeSiteDataBeforeUploading, - warnOnMissingTable + warnOnMissingTable, + pythonConnection = pythonConnection ) + + if (!is.null(pythonConnection)) { + pythonConnection$commit() + } } delta <- Sys.time() - start @@ -660,6 +681,7 @@ uploadResults <- function(connection = NULL, #' @export deleteAllRowsForPrimaryKey <- function(connection, schema, tableName, keyValues) { + createSqlStatement <- function(i) { sql <- paste0( "DELETE FROM ", @@ -668,7 +690,7 @@ deleteAllRowsForPrimaryKey <- tableName, "\nWHERE ", paste(paste0( - colnames(keyValues), " = '", keyValues[i, ], "'" + colnames(keyValues), " = '", keyValues[i,], "'" ), collapse = " AND "), ";" ) @@ -743,9 +765,9 @@ deleteAllRowsForDatabaseId <- database_id = databaseId ) DatabaseConnector::executeSql(connection, - sql, - progressBar = FALSE, - reportOverallTime = FALSE + sql, + progressBar = FALSE, + reportOverallTime = FALSE ) } } diff --git a/R/PyFunctions.R b/R/PyFunctions.R new file mode 100644 index 0000000..f15bf1b --- /dev/null +++ b/R/PyFunctions.R @@ -0,0 +1,276 @@ +# Copyright 2024 Observational Health Data Sciences and Informatics +# +# This file is part of CohortDiagnostics +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +.createPyConnection <- function(connection) { + stopifnot(DatabaseConnector::dbms(connection) == "postgresql") + + server <- attr(connection, "server")() + port <- attr(connection, "port")() + if (is.null(server)) { + # NOTE - taken directly from DatabaseConnector R/RStudio.R - getServer.default + # This should be patched to make pulling it out more straightforward + databaseMetaData <- rJava::.jcall( + connection@jConnection, + "Ljava/sql/DatabaseMetaData;", + "getMetaData" + ) + server <- rJava::.jcall(databaseMetaData, "Ljava/lang/String;", "getURL") + server <- strsplit(server, "//")[[1]][2] + } + + hostServerDb <- strsplit(server, "/")[[1]] + + if (is.null(port)) { + port <- strsplit(hostServerDb[1], ":")[[1]][2] + } + + if (is.na(port)) { + port <- "5432" + } + + user <- attr(connection, "user")() + password <- attr(connection, "password")() + + message("Connecting to PostgreSQL (python)...") + psycopg2 <- reticulate::import("psycopg2", delay_load = TRUE) + pgConnection <- psycopg2$connect(dbname = hostServerDb[2], + user = user, + password = password, + host = strsplit(hostServerDb[1], ":")[[1]][1], + port = port) + return(pgConnection) +} + +.pyEnv <- new.env() + +.loadPsycopg2Functions <- function() { + if (identical(Sys.getenv("RMM_USE_PYTHON_UPLOADS"), "TRUE")) { + reticulate::source_python(system.file("pg_upload.py", package = utils::packageName()), envir = .pyEnv) + } +} + +#' install psycopg2 +#' @description +#' Install psycopg2-binary python package into the specified python virtualenv +#' @param envname python virtual environment name. Can be set with system environment variable "RMM_PYTHON_ENV", default is rmm-uploads +#' @param method method paramter for reticulate::py_install (defualt is auto) +#' @param ... Extra parameters for reticulate::py_install +#' @export +install_psycopg2 <- function(envname = Sys.getenv("RMM_PYTHON_ENV", unset = "rmm-uploads"), method = "auto", ...) { + if (!interactive()) + stop("Session is not interactive. This is not how you want to install psycopg2") + + if (!reticulate::virtualenv_exists(envname)) { + msg <- paste("No virtualenv configured. Create virtualenv", envname, " (set with envrionment valirable \"RMM_PYTHON_ENV\")") + createnv <- utils::askYesNo(msg) + if (createnv) + reticulate::virtualenv_create(envname) + else + stop("Virtual env does not exist") + } + + installBinary <- utils::askYesNo("Install psycopg2-binary python package into virtualenv?") + if (!installBinary) + stop("Virtual env does not exist") + + reticulate::py_install("psycopg2-binary", envname = envname, method = method, ...) + invisible(NULL) +} + + +#' Enable Python Postgres Uploads +#' @description +#' Step by step install to enable python uploads +#' @param ... parameters to pass to py_install +#' @export +enablePythonUploads <- function(...) { + if (pyPgUploadEnabled()) + return(invisible(NULL)) + + # Check reticulate is installed + reticulateVersion <- tryCatch(utils::packageVersion("reticulate"), error = function(e) { return(NULL) }) + installed <- !is.null(reticulateVersion) + if (!installed && interactive()) { + if (isTRUE(utils::askYesNo("reticulate is required for this functionality - would you like to enable it?"))) { + utils::install.packages("reticulate") + installed <- TRUE + } + } + + if (!isTRUE(installed)) { + stop("Cannot continue - reticulate package is not installed on this system install with install.packages('reticulate')") + } + + pyPostgresInstalled <- reticulate::py_module_available("psycopg2") + + if (!interactive() && !pyPostgresInstalled) { + stop("psycopg2 is not installed in the specifed python environment for this system") + } + + # Check package installed + if (!pyPostgresInstalled) { + install_psycopg2(...) + } + + Sys.setenv("RMM_USE_PYTHON_UPLOADS" = "TRUE") + .loadPsycopg2Functions() + return(invisible(NULL)) +} + +#' Disable python uploads +#' @description +#' This will stop the use of python in uploaResults - not that this will only work for this R session. If you have set +#' `RMM_USE_PYTHON_UPLOADS` in your .Renviron this will reset the next time you start your R session. +#' +#' @export +disablePythonUploads <- function() { + Sys.setenv("RMM_USE_PYTHON_UPLOADS" = "") + invisible(NULL) +} + + +#' are python postgresql uploads enabled? +#' @export +pyPgUploadEnabled <- function() { + reticulateVersion <- tryCatch(utils::packageVersion("reticulate"), error = function(e) { return(NULL) }) + pySetupComplete <- FALSE + if (!is.null(reticulateVersion)) { + pySetupComplete <- reticulate::py_module_available("psycopg2") + } + return(identical(Sys.getenv("RMM_USE_PYTHON_UPLOADS"), "TRUE") && pySetupComplete) +} + +#' Py Upload CSV +#' @description +#' Wrapper to python function to upload a csv using Postgres Copy functionality +#' @param connection DatabaseConnector connection instance +#' @param table Table in database +#' @param filepath path to csv +#' @param schema database schema containing table reference +#' @param disableConstraints (not reccomended) disable constraints prior to upload to speed up process +#' @examples +#' \dontrun{ +#' connection <- DabaseConnector::connect(dbms = "postgreql", +#' server = "myserver.com", +#' port = 5432, +#' password = "s", +#' user = "me", +#' database = "some_db") +#' ResultModelManager::pyUploadCsv(connection, +#' table = "my_table", +#' filepath = "my_massive_csv.csv", +#' schema = "my_schema") +#' } +#' @export +pyUploadCsv <- function(connection, table, filepath, schema, disableConstraints = FALSE) { + stopifnot(pyPgUploadEnabled()) + checkmate::assertFileExists(filepath) + checkmate::assertString(table) + checkmate::assertString(schema) + checkmate::assertLogical(disableConstraints) + DatabaseConnector::dbIsValid(connection) + checkmate::assertTRUE(DatabaseConnector::dbms(connection) == 'postgresql') + + pyConnection <- .createPyConnection(connection) + on.exit(pyConnection$close(), add = TRUE) + + result <- .pyEnv$upload_table(connection = pyConnection, + table = table, + filepath = normalizePath(filepath), + schema = schema, + disable_constraints = disableConstraints) + # Handle errors + if (result$status == -1) { + ParallelLogger::logError("Error uploading filepath to table") + ParallelLogger::logError(result$msg) + stop("psycopg2 upload failed") + } + + invisible() +} + + +.pgWriteDataFrame <- function(data, pyConnection, table, schema, bufferWriteSize = 1e6) { + fd <- raw(0) + buffer <- rawConnection(fd, "r+") + on.exit(close(buffer), add = TRUE) + offset <- 1 + # Read data chunk by chunk and write to a string buffer + stdata <- data[offset:min(offset + bufferWriteSize, nrow(data)),] + + while (offset < nrow(data)) { + readr::write_delim(stdata, buffer, delim = "\t", na = '$$$$$', quote = "all", escape = "double", col_names = FALSE) + nchars <- seek(buffer, 0) + # Note this use of multiple buffers is inefficient but without R being able to write to a python buffer, the + charContent <- readChar(buffer, nchars = nchars) + .pyEnv$upload_buffer(connection = pyConnection, + table = table, + csv_content = charContent, + schema = schema, + colnames = paste0(colnames(stdata), collapse = ",")) + + offset <- offset + bufferWriteSize + stdata <- data[offset:min(offset + bufferWriteSize, nrow(data)),] + seek(buffer, 0) + } +} + + +#' Py Upload data.frame +#' @description +#' Wrapper to python function to upload a data.frame using Postgres Copy functionality +#' @param data data.frame +#' @param connection DatabaseConnector connection instance +#' @param table Table in database +#' @param schema database schema containing table reference +#' @examples +#' \dontrun{ +#' connection <- DabaseConnector::connect(dbms = "postgreql", +#' server = "myserver.com", +#' port = 5432, +#' password = "s", +#' user = "me", +#' database = "some_db") +#' +#' ResultModelManager::pyUploadDataFrame(connection, +#' table = "my_table", +#' data.frame(id=1:100, value = "some_value"), +#' schema = "my_schema") +#' } +#' @export +pyUploadDataFrame <- function(data, connection, table, schema) { + stopifnot(pyPgUploadEnabled()) + checkmate::assertDataFrame(data) + checkmate::assertString(table) + checkmate::assertString(schema) + DatabaseConnector::dbIsValid(connection) + checkmate::assertTRUE(DatabaseConnector::dbms(connection) == 'postgresql') + + pyConnection <- .createPyConnection(connection) + on.exit(pyConnection$close(), add = TRUE) + + tryCatch({ + .pgWriteDataFrame(data, pyConnection, table, schema) + }, error = function(error) { + # rollback write of data + pyConnection$rollback() + stop(error) + }) + + # User must handle error on commits + pyConnection$commit() + invisible() +} diff --git a/R/ResultModelManager.R b/R/ResultModelManager.R index ef9661e..9487918 100644 --- a/R/ResultModelManager.R +++ b/R/ResultModelManager.R @@ -25,3 +25,7 @@ NULL # Add custom assertions assertSpecificationColumns <- checkmate::makeAssertionFunction(checkSpecificationColumns) + +.onLoad <- function(libname, pkgname) { + .loadPsycopg2Functions() +} diff --git a/_pkgdown.yml b/_pkgdown.yml index 875fbf6..c913df8 100644 --- a/_pkgdown.yml +++ b/_pkgdown.yml @@ -1,5 +1,6 @@ template: - params: + bootstrap: 5 + bslib: bootswatch: cosmo home: diff --git a/inst/pg_upload.py b/inst/pg_upload.py new file mode 100644 index 0000000..e7ad097 --- /dev/null +++ b/inst/pg_upload.py @@ -0,0 +1,55 @@ +from typing import Dict, Any +from io import StringIO + + +def upload_table(connection, + table: str, + filepath: str, + schema: str, + disable_constraints: bool = False) -> Dict[str, Any]: + """ + utility function for pushing a csv file into a postgres db. Intended to be used in R but should work with and CSV + delimited file that uses empty as NULL. + :param connection: psycopg2 connection instance + :param table: string table name + :param filepath: path to csvfile + :param schema: database schema of table + :param disable_constraints: optionally switch of constraints prior to insert and switch back on after + :return: + """ + with connection.cursor() as curr: + if disable_constraints: + try: + curr.execute(f"ALTER TABLE {schema}.{table} DISABLE TRIGGER ALL;") + connection.commit() + except Exception as e: + return dict(status=-1, message=f"{e}") + + # Taken from database connector + copy_string = f"COPY {schema}.{table} FROM STDIN NULL AS '' DELIMITER ',' CSV HEADER;" + try: + with open(filepath, "rb") as uf: + # Copy file from STDIN - should only use buffer size memory + curr.copy_expert(copy_string, uf) + if disable_constraints: + curr.execute(f"ALTER TABLE {schema}.{table} ENABLE TRIGGER ALL;") + connection.commit() + status = dict(status=1, message="upload success") + except Exception as e: + status = dict(status=-1, message=f"{e}") + connection.rollback() + + return status + + +def upload_buffer(connection, csv_content, schema: str, table: str, colnames: str, commit: bool = False): + # Create a StringIO buffer from the CSV content + copy_cmd = f"COPY {schema}.{table} ({colnames}) FROM STDIN CSV NULL AS '$$$$$' DELIMITER E'\\t' ESCAPE '\\';" + # Upload the CSV data to the database table + with connection.cursor() as curr: + # Use the COPY command + with StringIO(csv_content) as buffer: + curr.copy_expert(copy_cmd, buffer) + # Commit the transaction + if commit: + connection.commit() \ No newline at end of file diff --git a/man/disablePythonUploads.Rd b/man/disablePythonUploads.Rd new file mode 100644 index 0000000..4178f3a --- /dev/null +++ b/man/disablePythonUploads.Rd @@ -0,0 +1,12 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/PyFunctions.R +\name{disablePythonUploads} +\alias{disablePythonUploads} +\title{Disable python uploads} +\usage{ +disablePythonUploads() +} +\description{ +This will stop the use of python in uploaResults - not that this will only work for this R session. If you have set +\code{RMM_USE_PYTHON_UPLOADS} in your .Renviron this will reset the next time you start your R session. +} diff --git a/man/enablePythonUploads.Rd b/man/enablePythonUploads.Rd new file mode 100644 index 0000000..28cb3df --- /dev/null +++ b/man/enablePythonUploads.Rd @@ -0,0 +1,14 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/PyFunctions.R +\name{enablePythonUploads} +\alias{enablePythonUploads} +\title{Enable Python Postgres Uploads} +\usage{ +enablePythonUploads(...) +} +\arguments{ +\item{...}{parameters to pass to py_install} +} +\description{ +Step by step install to enable python uploads +} diff --git a/man/install_psycopg2.Rd b/man/install_psycopg2.Rd new file mode 100644 index 0000000..fc8c485 --- /dev/null +++ b/man/install_psycopg2.Rd @@ -0,0 +1,22 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/PyFunctions.R +\name{install_psycopg2} +\alias{install_psycopg2} +\title{install psycopg2} +\usage{ +install_psycopg2( + envname = Sys.getenv("RMM_PYTHON_ENV", unset = "rmm-uploads"), + method = "auto", + ... +) +} +\arguments{ +\item{envname}{python virtual environment name. Can be set with system environment variable "RMM_PYTHON_ENV", default is rmm-uploads} + +\item{method}{method paramter for reticulate::py_install (defualt is auto)} + +\item{...}{Extra parameters for reticulate::py_install} +} +\description{ +Install psycopg2-binary python package into the specified python virtualenv +} diff --git a/man/pyPgUploadEnabled.Rd b/man/pyPgUploadEnabled.Rd new file mode 100644 index 0000000..695d094 --- /dev/null +++ b/man/pyPgUploadEnabled.Rd @@ -0,0 +1,11 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/PyFunctions.R +\name{pyPgUploadEnabled} +\alias{pyPgUploadEnabled} +\title{are python postgresql uploads enabled?} +\usage{ +pyPgUploadEnabled() +} +\description{ +are python postgresql uploads enabled? +} diff --git a/man/pyUploadCsv.Rd b/man/pyUploadCsv.Rd new file mode 100644 index 0000000..c115f18 --- /dev/null +++ b/man/pyUploadCsv.Rd @@ -0,0 +1,36 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/PyFunctions.R +\name{pyUploadCsv} +\alias{pyUploadCsv} +\title{Py Upload CSV} +\usage{ +pyUploadCsv(connection, table, filepath, schema, disableConstraints = FALSE) +} +\arguments{ +\item{connection}{DatabaseConnector connection instance} + +\item{table}{Table in database} + +\item{filepath}{path to csv} + +\item{schema}{database schema containing table reference} + +\item{disableConstraints}{(not reccomended) disable constraints prior to upload to speed up process} +} +\description{ +Wrapper to python function to upload a csv using Postgres Copy functionality +} +\examples{ +\dontrun{ + connection <- DabaseConnector::connect(dbms = "postgreql", + server = "myserver.com", + port = 5432, + password = "s", + user = "me", + database = "some_db") + ResultModelManager::pyUploadCsv(connection, + table = "my_table", + filepath = "my_massive_csv.csv", + schema = "my_schema") +} +} diff --git a/man/pyUploadDataFrame.Rd b/man/pyUploadDataFrame.Rd new file mode 100644 index 0000000..aaad32e --- /dev/null +++ b/man/pyUploadDataFrame.Rd @@ -0,0 +1,35 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/PyFunctions.R +\name{pyUploadDataFrame} +\alias{pyUploadDataFrame} +\title{Py Upload data.frame} +\usage{ +pyUploadDataFrame(data, connection, table, schema) +} +\arguments{ +\item{data}{data.frame} + +\item{connection}{DatabaseConnector connection instance} + +\item{table}{Table in database} + +\item{schema}{database schema containing table reference} +} +\description{ +Wrapper to python function to upload a data.frame using Postgres Copy functionality +} +\examples{ +\dontrun{ + connection <- DabaseConnector::connect(dbms = "postgreql", + server = "myserver.com", + port = 5432, + password = "s", + user = "me", + database = "some_db") + + ResultModelManager::pyUploadDataFrame(connection, + table = "my_table", + data.frame(id=1:100, value = "some_value"), + schema = "my_schema") +} +} diff --git a/tests/testthat/test-DataModel.R b/tests/testthat/test-DataModel.R index 3126f3c..dd011a3 100644 --- a/tests/testthat/test-DataModel.R +++ b/tests/testthat/test-DataModel.R @@ -70,6 +70,7 @@ test_that("results are uploaded", { resultsFolder = tempDir ) + disablePythonUploads() uploadResults( connectionDetails = testDatabaseConnectionDetails, schema = testSchema, @@ -98,7 +99,7 @@ test_that("results are uploaded", { for (tableName in unique(specifications$tableName)) { primaryKey <- specifications %>% dplyr::filter(tableName == !!tableName & - primaryKey == "Yes") %>% + primaryKey == "Yes") %>% dplyr::select("columnName") %>% dplyr::pull() @@ -118,6 +119,7 @@ test_that("results are uploaded", { } # Test uploading after truncate + enablePythonUploads() uploadResults( connectionDetails = testDatabaseConnectionDetails, schema = testSchema, @@ -129,7 +131,7 @@ test_that("results are uploaded", { runCheckAndFixCommands = TRUE, warnOnMissingTable = FALSE ) - + disablePythonUploads() expect_false(.removeDataUserCheck("N")) expect_false(.removeDataUserCheck("n")) expect_false(.removeDataUserCheck("")) @@ -145,7 +147,7 @@ test_that("appending results rows using primary keys works", { for (tableName in unique(specifications$tableName)) { primaryKey <- specifications %>% dplyr::filter(tableName == !!tableName & - primaryKey == "Yes") %>% + primaryKey == "Yes") %>% dplyr::select("columnName") %>% dplyr::pull() @@ -200,7 +202,7 @@ test_that("deleting results rows using data primary key works", { for (tableName in unique(specifications$tableName)) { primaryKey <- specifications %>% dplyr::filter(tableName == !!tableName & - primaryKey == "Yes") %>% + primaryKey == "Yes") %>% dplyr::select("columnName") %>% dplyr::pull() @@ -240,7 +242,7 @@ test_that("deleting results rows by database id works", { for (tableName in unique(specifications$tableName)) { primaryKey <- specifications %>% dplyr::filter(tableName == !!tableName & - primaryKey == "Yes") %>% + primaryKey == "Yes") %>% dplyr::select("columnName") %>% dplyr::pull() diff --git a/tests/testthat/test-PyFunctions.R b/tests/testthat/test-PyFunctions.R new file mode 100644 index 0000000..cbb2dbb --- /dev/null +++ b/tests/testthat/test-PyFunctions.R @@ -0,0 +1,137 @@ +test_that("test python postgres connection works", { + skip_on_cran() + skip_if(Sys.getenv("CDM5_POSTGRESQL_SERVER") == "") + # Should not throw error + enablePythonUploads() + expect_true(pyPgUploadEnabled()) + + + pyConnection <- .createPyConnection(testDatabaseConnection) + on.exit(pyConnection$close(), add = TRUE) + curr <- pyConnection$cursor() + on.exit(curr$close(), add = TRUE) + curr$execute("SELECT 1") + res <- curr$fetchone() + expect_equal(res[[1]], 1) + + + server <- Sys.getenv("CDM5_POSTGRESQL_SERVER") + hostServerDb <- strsplit(server, "/")[[1]] + # Test with connection string + testDatabaseConnection2 <- DatabaseConnector::connect( + dbms = "postgresql", + connectionString = paste0("jdbc:postgresql://", hostServerDb[1], ":5432/", hostServerDb[2]), + user = Sys.getenv("CDM5_POSTGRESQL_USER"), + password = utils::URLdecode(Sys.getenv("CDM5_POSTGRESQL_PASSWORD")) + ) + + on.exit(DatabaseConnector::disconnect(testDatabaseConnection2), add = TRUE) + pyConnection2 <- .createPyConnection(testDatabaseConnection2) + on.exit(pyConnection2$close(), add = TRUE) + + if(!interactive()) + expect_error(install_psycopg2(), "Session is not interactive. This is not how you want to install psycopg2") +}) + + +test_that("test python upload table from csv works", { + skip_on_cran() + skip_if(Sys.getenv("CDM5_POSTGRESQL_SERVER") == "") + enablePythonUploads() + tfile <- tempfile(fileext = ".csv") + pyConnection <- .createPyConnection(testDatabaseConnection) + on.exit({ + pyConnection$close() + unlink(tfile) + }, add = TRUE) + + table <- paste0("test_", sample(1:10000, 1)) + # create csv + testData <- data.frame(id = 1:10, test_string = 'some crazy vaLUEs;;a,.\t\n∑åˆø') + readr::write_csv(testData, tfile) + # upload csv + result <- .pyEnv$upload_table(connection = pyConnection, + schema = testSchema, + table = table, + filepath = normalizePath(tfile)) + + expect_equal(result$status, -1) + # Test internal function + expect_error(pyUploadCsv(testDatabaseConnection, schema = testSchema, table = table, filepath = tfile), "psycopg2 upload failed") + + # create table + sql <- "CREATE TABLE @schema.@table (id int, test_string varchar)" + DatabaseConnector::renderTranslateExecuteSql(testDatabaseConnection, sql, schema = testSchema, table = table) + + result <- .pyEnv$upload_table(connection = pyConnection, + schema = testSchema, + table = table, + filepath = normalizePath(tfile)) + expect_equal(result$status, 1) + + + resultData <- DatabaseConnector::renderTranslateQuerySql(connection = testDatabaseConnection, + "SELECT * FROM @schema.@table", + schema = testSchema, + table = table, + snakeCaseToCamelCase = TRUE) + expect_equal(nrow(testData), nrow(resultData)) + expect_true(all(c("id", "testString") %in% names(resultData))) + + # Test exported function + DatabaseConnector::renderTranslateExecuteSql(testDatabaseConnection, "TRUNCATE TABLE @schema.@table;", schema = testSchema, table = table) + pyUploadCsv(testDatabaseConnection, schema = testSchema, table = table, filepath = tfile) + + resultData <- DatabaseConnector::renderTranslateQuerySql(connection = testDatabaseConnection, + "SELECT * FROM @schema.@table", + schema = testSchema, + table = table, + snakeCaseToCamelCase = TRUE) + expect_equal(nrow(testData), nrow(resultData)) + expect_equal(sum(resultData$id), sum(testData$id)) + expect_true(all(c("id", "testString") %in% names(resultData))) +}) + + +test_that("upload data.frame via string buffer", { + skip_on_cran() + skip_if(Sys.getenv("CDM5_POSTGRESQL_SERVER") == "") + # Should not throw error + enablePythonUploads() + expect_true(pyPgUploadEnabled()) + + table <- paste0("test_", sample(1:10000, 1)) + sql <- "CREATE TABLE @schema.@table (id int, my_date date, my_date2 varchar, test_string1 varchar, test_string2 varchar)" + DatabaseConnector::renderTranslateExecuteSql(testDatabaseConnection, sql, schema = testSchema, table = table) + + pyConnection <- .createPyConnection(testDatabaseConnection) + on.exit(pyConnection$close(), add = TRUE) + testData <- data.frame(id = 1:100, + test_string1 = 'Sjögren syndrome', + test_string2 = 'Merative MarketScan® Commercial Claims and Encounters Ωåß∂', + my_date = as.Date("1980-05-28"), + my_date2 = as.Date("1980-05-30")) + + # Note small buffer write size tests if buffering is functioning correctly + .pgWriteDataFrame(data = testData, + pyConnection = pyConnection, + table = table, + schema = testSchema) + pyConnection$commit() + + resultData <- DatabaseConnector::renderTranslateQuerySql(connection = testDatabaseConnection, + "SELECT * FROM @schema.@table", + schema = testSchema, + table = table, + snakeCaseToCamelCase = TRUE) + expect_equal(nrow(resultData), nrow(testData)) + expect_equal(sum(resultData$id), sum(testData$id)) + expect_true(all(c("id", "testString2", "testString1", "myDate", "myDate2") %in% names(resultData))) + + DatabaseConnector::renderTranslateExecuteSql(testDatabaseConnection, "TRUNCATE TABLE @schema.@table", schema = testSchema, table = table) + pyUploadDataFrame(testData, testDatabaseConnection, schema = testSchema, table = table) + + expect_equal(nrow(resultData), nrow(testData)) + expect_equal(sum(resultData$id), sum(testData$id)) + expect_true(all(c("id", "testString2", "testString1", "myDate", "myDate2") %in% names(resultData))) +}) \ No newline at end of file diff --git a/vignettes/UsingPythonUploads.Rmd b/vignettes/UsingPythonUploads.Rmd new file mode 100644 index 0000000..54f84a7 --- /dev/null +++ b/vignettes/UsingPythonUploads.Rmd @@ -0,0 +1,113 @@ +--- +title: "Using python for postrgresql uploads" +author: "James P. Gilbert" +date: "`r Sys.Date()`" +output: + pdf_document: + toc: yes + html_document: + number_sections: yes + toc: yes +vignette: > + %\VignetteIndexEntry{Python based uploads} + %\VignetteEncoding{UTF-8} + %\VignetteEngine{knitr::rmarkdown} +--- + +# Introduction +Note, this feature should be considered experimental until further notice. + +The use of DatabaseConnector with postgresql without bulk uploads functionality will often be slow and require the installation and configuration of postgresql binaries on the system. +This may be challenging or restrictred in many environments. +Similarly, this method requires writing a data.frame to disk which will be prohibitively slow if data is already in a +csv format. + +As a consequence, this package supports bulk uploading through python with a small amount of configuration. +This uses no r-memory; csv files are transfered directly through python and will be considerably faster + +This process uses the `psycopg2` python library, which can be installed via compilation or in binary form. +This process demonstrates usage with the `psycopg2-binary` package. + +# Installing psycopg2 + +## Using a virtualenv +Result model manager provides an interactive function for enabling the python library. If psycopg2 this function will +do nothing. +However, if there is no available binary (and the `reticulate` package is not installed) you will be asked to install +these packages. +Do do this run the following: +```{r eval = F} +ResultModelManager::enablePythonUploads() +``` +Alternatively you can specify this manually + +```{r eval = F} +ResultModelManager::install_psycopg2() +``` + +## Using conda or system python installs + +Please consult the [reticulate documentation](https://rstudio.github.io/reticulate/articles/python_packages.html) on how to install the `psycopg2-binary` package. + +# Usage within functions + +By default, this functionality will not be enabled when uploading tables and the function `pyUploadCsv` will fail. +To enable, and directly upload a csv, try the following example code. + +```{r eval = FALSE} +ResultModelManager::enablePythonUploads() +connectionDetails <- DabaseConnector::createConnectionDetails(dbms = "postgreql", + server = "myserver.com", + port = 5432, + password = "s", + user = "me", + database = "some_db") +connection <- DatabaseConnector::connect(connectionDetails) +readr::write_csv(data.frame(id = 1:1e6, + paste(1:1e6, "bottle(s) on the wall")), + "my_massive_csv.csv") + +ResultModelManager::pyUploadCsv(connection, + table = "my_table", + filepath = "my_massive_csv.csv", + schema = "my_schema") +``` + +Note that you are not required to call `ResultModelManager::enablePythonUploads()` every time. As an alternative, +add the following line to your .Renviron file (note that this will automatically assume that setup of python libraries +has been completed) + +``` +RMM_USE_PYTHON_UPLOADS=TRUE +``` + +The astute reader will realize that this approach requires an IO call, writing the CSV to disk. In many situations this +will be a major bottleneck. +A much more sensible approach is to use a string buffer. +Thankfully, the author of this package has provided such an interface! + +```{r eval=F} +ResultModelManager::pyUploadDataframe(connection, + table = "my_table", + filepath = "my_massive_csv.csv", + schema = "my_schema") +``` + +Note - that this approach is actually already implemented for you when you use `uploadResults` functionality. +That's right - if you call `ResultModelManager::enablePythonUploads()` (and you are using postgres) you will be able +to upload your large R `data.frames` to postgres! + +```{r eval=F} +ResultModelManager::enablePythonUploads() + +ResultModelManager::uploadResults( + connectionDetails, + schema = "my_schema", + resultsFolder = "my_results_folder", + tablePrefix = "cm_", + purgeSiteDataBeforeUploading = FALSE, + specifications = getResultsDataModelSpec() +) +``` +Better yet, calling `ResultModelManager::enablePythonUploads()` before uploading results from any OHDSI package will +automatically give you this fast(er) upload functionality. \ No newline at end of file