From b03c2691cd5f53cd729fb0f5412f6dc3d43e74af Mon Sep 17 00:00:00 2001 From: Craig Williams Date: Tue, 27 Jan 2026 14:36:38 +0000 Subject: [PATCH] Class implementation for DBConnection and DBClient --- R/DBClient.R | 43 ++++++++ R/DBClientClass/CRUDKeys.R | 42 ++++++++ R/DBClientClass/CRUDRow.R | 28 ++++++ R/DBClientClass/CRUDRows.R | 158 ++++++++++++++++++++++++++++++ R/DBClientClass/Initialize.R | 8 ++ R/DBClientClass/dbAction.R | 6 ++ R/DBClientClass/dbCollect.R | 52 ++++++++++ R/DBClientClass/dbQuery.R | 9 ++ R/DBClientClass/dbSchema.R | 21 ++++ R/DBClientClass/dbTable.R | 7 ++ R/DBClientClass/dbTransaction.R | 8 ++ R/DBConnection.R | 43 ++++++++ R/DBConnectionClass/Configure.R | 68 +++++++++++++ R/DBConnectionClass/Connect.R | 85 ++++++++++++++++ R/DBConnectionClass/Transaction.R | 39 ++++++++ R/PrintDebug.R | 97 ++++++++++++++++++ 16 files changed, 714 insertions(+) create mode 100644 R/DBClient.R create mode 100644 R/DBClientClass/CRUDKeys.R create mode 100644 R/DBClientClass/CRUDRow.R create mode 100644 R/DBClientClass/CRUDRows.R create mode 100644 R/DBClientClass/Initialize.R create mode 100644 R/DBClientClass/dbAction.R create mode 100644 R/DBClientClass/dbCollect.R create mode 100644 R/DBClientClass/dbQuery.R create mode 100644 R/DBClientClass/dbSchema.R create mode 100644 R/DBClientClass/dbTable.R create mode 100644 R/DBClientClass/dbTransaction.R create mode 100644 R/DBConnection.R create mode 100644 R/DBConnectionClass/Configure.R create mode 100644 R/DBConnectionClass/Connect.R create mode 100644 R/DBConnectionClass/Transaction.R create mode 100644 R/PrintDebug.R diff --git a/R/DBClient.R b/R/DBClient.R new file mode 100644 index 0000000..a717c75 --- /dev/null +++ b/R/DBClient.R @@ -0,0 +1,43 @@ + +class_env <- new.env(parent = baseenv()) + +res <- lapply(list.files("R/DBClientClass", pattern = ".R"), function(fn) { + eval( + parse(paste0("R/DBClientClass/", fn)), + envir = class_env, + enclos = emptyenv() + ) +}) + +fn_names <- names(which(vapply(class_env, is.function, logical(1)))) +fn_list <- lapply(fn_names, function(fn_name) { + class_env[[fn_name]] +}) +names(fn_list) <- fn_names + +priv_fn_names <- ls(class_env, pattern = "^\\..*", all.names = TRUE) +priv_fn_list <- lapply(priv_fn_names, function(fn_name) { + class_env[[fn_name]] +}) +names(priv_fn_list) <- gsub("^\\.", "", priv_fn_names) + +member_names <- names(which(!vapply(class_env, is.function, logical(1)))) +member_list <- lapply(member_names, function(member_name) { + class_env[[member_name]] +}) +names(member_list) <- member_names + + +#' @name DBClient +#' @title DBClient +#' @description Database client class +NULL + +#' @export +DBClient <- R6::R6Class( + "DBClient", + public = fn_list, + private = c(member_list, priv_fn_list), + lock_class = TRUE, + parent_env = class_env +) diff --git a/R/DBClientClass/CRUDKeys.R b/R/DBClientClass/CRUDKeys.R new file mode 100644 index 0000000..b7a9fa3 --- /dev/null +++ b/R/DBClientClass/CRUDKeys.R @@ -0,0 +1,42 @@ +keyExists <- function(table_name, key_name, id) { + key <- dbplyr::ident(key_name) + res <- self$table(table_name) %>% + dplyr::filter(key == !!id) %>% + dplyr::count() %>% + dplyr::pull(n) + return(res[[1]] > 0) +} + +selectByKey <- function(table_name, key_name, id) { + key <- dbplyr::ident(key_name) + res <- self$table(table_name) %>% + dplyr::filter(key %in% !!id) %>% + dplyr::collect() + return(res) +} + +insertByKey <- function(table_name, key_name, id, ...) { + row <- dplyr::bind_cols( + tibble::tibble(!!key_name := !!id), tibble::as_tibble(list(...)) + ) + self$insertRows(table_name, row, key_name) +} + +updateByKey <- function(table_name, key_name, id, ...) { + row <- dplyr::bind_cols( + tibble::tibble(!!key_name := !!id), tibble::as_tibble(list(...)) + ) + self$updateRows(table_name, row, key_name) +} + +upsertByKey <- function(table_name, key_name, id, ...) { + row <- dplyr::bind_cols( + tibble::tibble(!!key_name := !!id), tibble::as_tibble(list(...)) + ) + self$upsertRows(table_name, row, key_name) +} + +deleteByKey <- function(table_name, key_name, id) { + row <- tibble::tibble_row(!!key_name := !!id) + self$deleteRows(table_name, row, key_name) +} diff --git a/R/DBClientClass/CRUDRow.R b/R/DBClientClass/CRUDRow.R new file mode 100644 index 0000000..d10a823 --- /dev/null +++ b/R/DBClientClass/CRUDRow.R @@ -0,0 +1,28 @@ +insert <- function(table_name, ..., key_name = NULL) { + row <- tibble::as_tibble(list(...)) + self$insertRows(table_name, row, key_name) +} + +append <- function(table_name, ...) { + row <- tibble::as_tibble(list(...)) + self$appendRows(table_name, row) +} + +upsert <- function(table_name, ..., key_name = NULL) { + row <- tibble::as_tibble(list(...)) + self$upsertRows(table_name, row, key_name) +} + +update <- function(table_name, ..., key_name = NULL) { + row <- tibble::as_tibble(list(...)) + self$updateRows(table_name, row, key_name) +} + +delete <- function(table_name, ..., key_name = NULL) { + row <- tibble::as_tibble(list(...)) + self$deleteRows(table_name, row, key_name) +} + +truncate <- function(table_name) { + self$dbAction(paste("TRUNCATE TABLE", self$withSchema(table_name))) +} diff --git a/R/DBClientClass/CRUDRows.R b/R/DBClientClass/CRUDRows.R new file mode 100644 index 0000000..96e8173 --- /dev/null +++ b/R/DBClientClass/CRUDRows.R @@ -0,0 +1,158 @@ + + +# NOTE: Temporary fix for issue: tidyverse/dbplyr#1133 +.fix_dbplyr <- function(tbl) { + keys <- c("dbplyr_table_ident", "dbplyr_table_path") + + if (sum(keys %in% class(tbl$lazy_query$x)) == 0) { + tbl$lazy_query$x <- dbplyr::as.sql( + tbl$lazy_query$x, private$client$getConnection() + ) + } + + return(tbl) +} + + +insertRows <- function(table_name, rows, key_name = NULL) { + tbl <- self$table(table_name) + new_rows <- dbplyr::copy_inline(private$client$getConnection(), rows) + + if (getOption("db.simulate", FALSE)) { + return(printDebug("insertRows", table_name, key_name, tbl, rows, TRUE)) + } + if (getOption("db.debug", FALSE)) { + printDebug("insertRows", table_name, key_name, tbl, rows) + } + + tbl <- private$fix_dbplyr(tbl) + + tbl <- dplyr::rows_insert( + tbl, + new_rows, + by = key_name, + conflict = "ignore", + in_place = TRUE, + returning = !!key_name + ) + + if (dbplyr::has_returned_rows(tbl)) { + return(dbplyr::get_returned_rows(tbl)) + } + + return(invisible(tbl)) +} + +appendRows <- function(table_name, rows, key_name = NULL) { + tbl <- self$table(table_name) + new_rows <- dbplyr::copy_inline(private$client$getConnection(), rows) + + if (getOption("db.simulate", FALSE)) { + return(printDebug("appendRows", table_name, key_name, tbl, rows, TRUE)) + } + if (getOption("db.debug", FALSE)) { + printDebug("appendRows", table_name, key_name, tbl, rows) + } + + tbl <- private$fix_dbplyr(tbl) + + tbl <- dplyr::rows_append( + tbl, new_rows, in_place = TRUE, returning = !!key_name + ) + + if (dbplyr::has_returned_rows(tbl)) { + return(dbplyr::get_returned_rows(tbl)) + } + + return(invisible(TRUE)) +} + +upsertRows <- function(table_name, rows, key_name = NULL) { + tbl <- self$table(table_name) + new_rows <- dbplyr::copy_inline(private$client$getConnection(), rows) + + tbl <- private$fix_dbplyr(tbl) + + # NOTE: Temporary fix for issue: tidyverse/dbplyr#1279 + if ("MariaDBConnection" %in% class(private$client$getConnection())) { + keys <- rows %>% + dplyr::pull(!!key_name) + n_rows <- tbl %>% + dplyr::select(!!key_name) %>% + dplyr::filter(!!as.symbol(key_name) %in% !!keys) %>% + dplyr::count() %>% + dplyr::pull(n) + if (n_rows == 0) { + return(insertRows(table_name, rows, key_name)) + } else { + return(updateRows(table_name, rows, key_name)) + } + } else { + if (getOption("db.simulate", FALSE)) { + return(printDebug("upsertRows", table_name, key_name, tbl, rows, TRUE)) + } + if (getOption("db.debug", FALSE)) { + printDebug("upsertRows", table_name, key_name, tbl, rows) + } + + tbl <- dplyr::rows_upsert(tbl, new_rows, by = key_name, in_place = TRUE) + if (dbplyr::has_returned_rows(tbl)) { + return(dbplyr::get_returned_rows(tbl)) + } + } + + return(invisible(TRUE)) +} + +updateRows <- function(table_name, rows, key_name = NULL) { + tbl <- self$table(table_name) + new_rows <- dbplyr::copy_inline(private$client$getConnection(), rows) + + if (getOption("db.simulate", FALSE)) { + return(printDebug("updateRows", table_name, key_name, tbl, rows, TRUE)) + } + if (getOption("db.debug", FALSE)) { + printDebug("updateRows", table_name, key_name, tbl, rows) + } + + tbl <- private$fix_dbplyr(tbl) + + tbl <- dplyr::rows_update( + tbl, new_rows, by = key_name, unmatched = "ignore", in_place = TRUE + ) + + if (dbplyr::has_returned_rows(tbl)) { + return(dbplyr::get_returned_rows(tbl)) + } + + return(invisible(TRUE)) +} + +deleteRows <- function(table_name, rows, key_name = NULL) { + tbl <- self$table(table_name) + old_rows <- dbplyr::copy_inline(private$client$getConnection(), rows) + + if (getOption("db.simulate", FALSE)) { + return(printDebug("deleteRows", table_name, key_name, tbl, rows, TRUE)) + } + if (getOption("db.debug", FALSE)) { + printDebug("deleteRows", table_name, key_name, tbl, rows) + } + + tbl <- private$fix_dbplyr(tbl) + + tbl <- dplyr::rows_delete( + tbl, + old_rows, + by = key_name, + unmatched = "ignore", + in_place = TRUE, + returning = dplyr::everything() + ) + + if (dbplyr::has_returned_rows(tbl)) { + return(dbplyr::get_returned_rows(tbl)) + } + + return(invisible(TRUE)) +} diff --git a/R/DBClientClass/Initialize.R b/R/DBClientClass/Initialize.R new file mode 100644 index 0000000..1e54499 --- /dev/null +++ b/R/DBClientClass/Initialize.R @@ -0,0 +1,8 @@ +client <- NULL + +Initialize <- function(client) { + if (is.null(client) || !is.R6(client) || inherits(client, "DBClient")) { + stop("DBClient instance required!") + } + private$client <- client +} diff --git a/R/DBClientClass/dbAction.R b/R/DBClientClass/dbAction.R new file mode 100644 index 0000000..5673afb --- /dev/null +++ b/R/DBClientClass/dbAction.R @@ -0,0 +1,6 @@ +dbAction <- function(statement) { + if (getOption("db.debug", FALSE)) { + cat("dbQuery:", statement, "\n") + } + DBI::dbExecute(private$client$getConnection(), statement) +} diff --git a/R/DBClientClass/dbCollect.R b/R/DBClientClass/dbCollect.R new file mode 100644 index 0000000..07b6edf --- /dev/null +++ b/R/DBClientClass/dbCollect.R @@ -0,0 +1,52 @@ +collectOrReturn <- function( + qry, collect = get("collect", pos = parent.frame()) +) { + if (length(collect) == 1 && collect[[1]] == TRUE) { + return(dplyr::collect(qry)) + } else { + return(qry) + } +} + +collectGeometries <- function( + qry, geometry_cols = c("geometry"), geometry_crs = 4326, check_ring_dir = TRUE +) { + requireNamespace("sf") + + qry <- qry %>% + dplyr::collect() + + if (sum(geometry_cols %in% colnames(qry)) > 0) { + qry <- qry %>% + dplyr::mutate( + dplyr::across(dplyr::any_of(geometry_cols), function(.col) { + sf::st_as_sfc( + .col, + EWKB = TRUE, + crs = geometry_crs, + check_ring_dir = check_ring_dir + ) + }) + ) + } + + if ("geom" %in% colnames(qry) && sum("geom" %in% geometry_cols) == 0) { + qry <- qry %>% + dplyr::mutate( + geom = sf::st_as_sfc( + geom, EWKB = TRUE, crs = 4326, check_ring_dir = check_ring_dir + ) + ) + } + + if ("proj" %in% colnames(qry) && sum("proj" %in% geometry_cols) == 0) { + qry <- qry %>% + dplyr::mutate( + proj = sf::st_as_sfc( + proj, EWKB = TRUE, crs = 3035, check_ring_dir = check_ring_dir + ) + ) + } + + return(qry) +} diff --git a/R/DBClientClass/dbQuery.R b/R/DBClientClass/dbQuery.R new file mode 100644 index 0000000..24a3be7 --- /dev/null +++ b/R/DBClientClass/dbQuery.R @@ -0,0 +1,9 @@ +dbQuery <- function(statement) { + if (getOption("db.debug", FALSE)) { + cat("dbQuery:", statement, "\n") + } + query <- DBI::dbSendQuery(private$client$getConnection(), statement) + res <- DBI::dbFetch(query) + DBI::dbClearResult(query) + res +} diff --git a/R/DBClientClass/dbSchema.R b/R/DBClientClass/dbSchema.R new file mode 100644 index 0000000..7572774 --- /dev/null +++ b/R/DBClientClass/dbSchema.R @@ -0,0 +1,21 @@ +withSchema <- function(table_name) { + config <- private$client$getConfiguration() + if (exists("schema", config)) { + return(DBI::dbQuoteIdentifier( + getConnection(), DBI::Id(schema = config$schema, table = table_name) + )) + } else { + return(DBI::dbQuoteIdentifier( + getConnection(), DBI::Id(table = table_name) + )) + } +} + +dplyrWithSchema <- function(table_name) { + config <- private$client$getConfiguration() + if (exists("schema", config) && !is.null(config$schema)) { + return(dbplyr::in_schema(config$schema, table_name)) + } else { + return(table_name) + } +} diff --git a/R/DBClientClass/dbTable.R b/R/DBClientClass/dbTable.R new file mode 100644 index 0000000..e33e54a --- /dev/null +++ b/R/DBClientClass/dbTable.R @@ -0,0 +1,7 @@ + +table <- function(table_name) { + if (getOption("db.debug", FALSE)) { + cat("dbTable:", table_name, "\n") + } + dplyr::tbl(private$client$getConnection(), self$dplyrWithSchema(table_name)) +} diff --git a/R/DBClientClass/dbTransaction.R b/R/DBClientClass/dbTransaction.R new file mode 100644 index 0000000..9786cca --- /dev/null +++ b/R/DBClientClass/dbTransaction.R @@ -0,0 +1,8 @@ + +withTransaction <- function(expr) { + private$client$beginTransaction() + on.exit(private$client$rollbackTransaction()) + expr + on.exit() + private$client$commitTransaction() +} diff --git a/R/DBConnection.R b/R/DBConnection.R new file mode 100644 index 0000000..1985258 --- /dev/null +++ b/R/DBConnection.R @@ -0,0 +1,43 @@ + +class_env <- new.env(parent = baseenv()) + +res <- lapply(list.files("R/DBConnectionClass", pattern = ".R"), function(fn) { + eval( + parse(paste0("R/DBConnectionClass/", fn)), + envir = class_env, + enclos = emptyenv() + ) +}) + +fn_names <- names(which(vapply(class_env, is.function, logical(1)))) +fn_list <- lapply(fn_names, function(fn_name) { + class_env[[fn_name]] +}) +names(fn_list) <- fn_names + +priv_fn_names <- ls(class_env, pattern = "^\\..*", all.names = TRUE) +priv_fn_list <- lapply(priv_fn_names, function(fn_name) { + class_env[[fn_name]] +}) +names(priv_fn_list) <- gsub("^\\.", "", priv_fn_names) + +member_names <- names(which(!vapply(class_env, is.function, logical(1)))) +member_list <- lapply(member_names, function(member_name) { + class_env[[member_name]] +}) +names(member_list) <- member_names + + +#' @name DBConnection +#' @title DBConnection +#' @description Database Connection class +NULL + +#' @export +DBConnection <- R6::R6Class( + "DBConnection", + public = fn_list, + private = c(member_list, priv_fn_list), + lock_class = TRUE, + parent_env = class_env +) diff --git a/R/DBConnectionClass/Configure.R b/R/DBConnectionClass/Configure.R new file mode 100644 index 0000000..1df0141 --- /dev/null +++ b/R/DBConnectionClass/Configure.R @@ -0,0 +1,68 @@ +config <- NULL + +.configureList <- function(db) { + was_connected <- FALSE + if (!is.null(private$conn)) { + was_connected <- TRUE + disconnect() + } + + private$config <- db + + if (was_connected) { + connect() + } + + return(invisible(private$config)) +} + +.configureDefault <- function( + host, port, user, password, dbname, + engine = NULL, name = NULL, schema = NULL, autoconnect = NULL +) { + if (is.null(autoconnect)) { + autoconnect <- getOption("db.autoconnect", FALSE) + } + private$configureList(list( + engine = engine, + host = host, + port = port, + user = user, + password = password, + name = ifelse(is.null(name), dbname, name), + schema = schema, + autoconnect = autoconnect + )) +} + + +#' Configures the DBClient instance. +#' +#' There are 2 options for configuration, either: pass in a named list; or use +#' named parameters. The required elements are: +#' +#' @param ... Named parameters for configuring the connection or a name list of +#' parameters +#' @param engine The database engine (mysql, postgres) as supported by DBI +#' @param host The database host +#' @param port The database port +#' @param user The database user to connect with +#' @param password The database password to connect with +#' @param dbname The database to use +#' @param schema The database schema to use (postgres) +#' @param autoconnect Autoconnect to the database if not already connected +#' +#' @return The configuration accepted +configure <- function(...) { + args <- list(...) + if (sum("list" %in% class(args[[1]])) > 0) { + private$configureList(args[[1]]) + } else { + do.call(private$configureDefault, args) + } +} + +#' Get the current configuration of the DBClient instance. +getConfiguration <- function() { + private$config +} diff --git a/R/DBConnectionClass/Connect.R b/R/DBConnectionClass/Connect.R new file mode 100644 index 0000000..644bd9e --- /dev/null +++ b/R/DBConnectionClass/Connect.R @@ -0,0 +1,85 @@ +conn <- NULL + +connect <- function() { + if (is.null(private$config)) { + if (!file.exists("global.R")) { + return(FALSE) + } + warning("Attempting to load default configuration from global.R") + e <- new.env(parent = baseenv()) + eval(parse("global.R"), envir = e) + if ("db" %in% ls(e)) { + self$configure(e$db) + } else if ("global" %in% ls(e)) { + self$configure(e$global$db) + } else { + return(FALSE) + } + } + + engine_fn <- NULL + if (exists("engine", private$config) && length(private$config$engine) > 0) { + engine_fn <- switch( + private$config$engine, + mariadb = RMariaDB::MariaDB, + postgres = RPostgres::Postgres + ) + } + if (is.null(engine_fn)) { + if (exists("schema", private$config)) { + engine_fn <- RPostgres::Postgres + } else { + engine_fn <- RMariaDB::MariaDB + } + } + + private$conn <- DBI::dbConnect( + engine_fn(), + host = private$config$host, + port = private$config$port, + dbname = private$config$name, + user = private$config$user, + password = private$config$password + ) + + res <- tryCatch( + { + DBI::dbListTables(private$conn) + TRUE + }, + error = function(err) { + print("Database error:") + utils::str(err) + return(FALSE) + } + ) + + return(res) +} + +getConnection <- function(do_connect = NULL) { + if (!self$isConnected()) { + if ((!is.null(do_connect) && do_connect) + || (is.null(do_connect) && private$conn$autoconnect)) { + connect() + } else { + warning("Database is not connected. Caller of getConnection() may error.") + } + } + return(private$conn) +} + +isConnected <- function() { + return(!is.null(private$conn)) +} + +disconnect <- function() { + if (!is.null(private$conn)) { + DBI::dbDisconnect(private$conn) + private$conn <- NULL + } +} + +disconnectOnSessionEnd <- function(session) { + session$onSessionEnded(self$disconnect) +} diff --git a/R/DBConnectionClass/Transaction.R b/R/DBConnectionClass/Transaction.R new file mode 100644 index 0000000..af6a1de --- /dev/null +++ b/R/DBConnectionClass/Transaction.R @@ -0,0 +1,39 @@ + +transactionLevel <- 0 + +beginTransaction <- function(allowRecursive = FALSE, quietly = FALSE) { + if (private$transactionLevel > 0) { + if (allowRecursive == FALSE) { + if (quietly != FALSE) { + stop("Transaction already started. Aborting.") + } + return(FALSE) + } + } + private$transactionLevel <- private$transactionLevel + 1 + DBI::dbBegin(self$getConnection()) +} + +rollbackTransaction <- function(quietly = FALSE) { + if (private$transactionLevel == 0) { + if (quietly != FALSE) { + stop("No transaction to rollback. Aborting.") + } + return(FALSE) + } + DB::dbRollback(self$getConnection()) +} + +commitTransaction <- function(quietly = FALSE) { + if (private$transactionLevel == 0) { + if (quietly != FALSE) { + stop("No transaction to commit. Aborting.") + } + return(FALSE) + } + private$transactionLevel <- private$transactionLevel - 1 + if (transactionLevel > 0) { + return(TRUE) + } + DB::dbRollback(self$getConnection()) +} diff --git a/R/PrintDebug.R b/R/PrintDebug.R new file mode 100644 index 0000000..4d4b32e --- /dev/null +++ b/R/PrintDebug.R @@ -0,0 +1,97 @@ + +printDebugSQLInsert <- function(con, table, cols, from, key) { + dbplyr::sql_query_insert( + con = con, + table = table, + from = from, + insert_cols = cols, + by = key, + conflict = "ignore" + ) +} +printDebugSQLAppend <- function(con, table, cols, from, key) { + dbplyr::sql_query_append( + con = con, + table = table, + from = from, + insert_cols = cols + ) +} +printDebugSQLUpsert <- function(con, table, cols, from, key) { + dbplyr::sql_query_upsert( + con = con, + table = table, + from = from, + by = key, + update_cols = setdiff(cols, key) + ) +} +printDebugSQLUpdate <- function(con, table, cols, from, key) { + update_cols <- setdiff(cols, key) + update_values <- dbplyr:::sql_table_prefix(con, update_cols, "...y") + names(update_values) <- update_cols + dbplyr::sql_query_update_from( + con = con, + table = table, + from = from, + by = key, + update_values = update_values + ) +} +printDebugSQLDelete <- function(con, table, cols, from, key) { + dbplyr::sql_query_delete( + con = con, + table = table, + from = from, + by = key + ) +} +printDebug <- function(fn_name, table, key, tbl, rows, query_only = FALSE) { + if (!query_only) { + cat(fn_name, ": ", table, " (", key, ")\n", sep = "") + cat("\t") + dplyr::glimpse(tbl) + cat("\t") + dplyr::glimpse(rows) + } + + sqlFn <- switch( + tolower(substr(fn_name, 1, 3)), + `ins` = printDebugSQLInsert, + `app` = printDebugSQLAppend, + `ups` = printDebugSQLUpsert, + `upd` = printDebugSQLUpdate, + `del` = printDebugSQLDelete + ) + con <- dbplyr::remote_con(tbl) + lvl <- ifelse(startsWith(fn_name, "del"), 2, 1) + sql <- sqlFn( + con, + dbplyr:::target_table(tbl, TRUE), + colnames(rows), + dbplyr::sql_render(dbplyr::copy_inline(con, rows), con, lvl = lvl), + key + ) + cat(sql, "\n") + + if (is.null(key)) { + return(invisible(rows)) + } + + return_keys <- NULL + if (key %in% colnames(rows)) { + return_keys <- rows %>% + dplyr::select(key) + } else { + return_keys <- rows %>% + dplyr::bind_rows(tibble::tibble(!!key := character(0))) %>% + dplyr::mutate( + !!key := dplyr::coalesce( + !!!dplyr::sym(key), + paste0("<>") + ) + ) %>% + dplyr::select(key) + } + return(invisible(return_keys)) +}