Class implementation for DBConnection and DBClient

This commit is contained in:
2026-01-27 14:36:38 +00:00
parent f2c83c1bd8
commit b03c2691cd
16 changed files with 714 additions and 0 deletions

43
R/DBClient.R Normal file
View File

@@ -0,0 +1,43 @@
class_env <- new.env(parent = baseenv())
res <- lapply(list.files("R/DBClientClass", pattern = ".R"), function(fn) {
eval(
parse(paste0("R/DBClientClass/", fn)),
envir = class_env,
enclos = emptyenv()
)
})
fn_names <- names(which(vapply(class_env, is.function, logical(1))))
fn_list <- lapply(fn_names, function(fn_name) {
class_env[[fn_name]]
})
names(fn_list) <- fn_names
priv_fn_names <- ls(class_env, pattern = "^\\..*", all.names = TRUE)
priv_fn_list <- lapply(priv_fn_names, function(fn_name) {
class_env[[fn_name]]
})
names(priv_fn_list) <- gsub("^\\.", "", priv_fn_names)
member_names <- names(which(!vapply(class_env, is.function, logical(1))))
member_list <- lapply(member_names, function(member_name) {
class_env[[member_name]]
})
names(member_list) <- member_names
#' @name DBClient
#' @title DBClient
#' @description Database client class
NULL
#' @export
DBClient <- R6::R6Class(
"DBClient",
public = fn_list,
private = c(member_list, priv_fn_list),
lock_class = TRUE,
parent_env = class_env
)

View File

@@ -0,0 +1,42 @@
keyExists <- function(table_name, key_name, id) {
key <- dbplyr::ident(key_name)
res <- self$table(table_name) %>%
dplyr::filter(key == !!id) %>%
dplyr::count() %>%
dplyr::pull(n)
return(res[[1]] > 0)
}
selectByKey <- function(table_name, key_name, id) {
key <- dbplyr::ident(key_name)
res <- self$table(table_name) %>%
dplyr::filter(key %in% !!id) %>%
dplyr::collect()
return(res)
}
insertByKey <- function(table_name, key_name, id, ...) {
row <- dplyr::bind_cols(
tibble::tibble(!!key_name := !!id), tibble::as_tibble(list(...))
)
self$insertRows(table_name, row, key_name)
}
updateByKey <- function(table_name, key_name, id, ...) {
row <- dplyr::bind_cols(
tibble::tibble(!!key_name := !!id), tibble::as_tibble(list(...))
)
self$updateRows(table_name, row, key_name)
}
upsertByKey <- function(table_name, key_name, id, ...) {
row <- dplyr::bind_cols(
tibble::tibble(!!key_name := !!id), tibble::as_tibble(list(...))
)
self$upsertRows(table_name, row, key_name)
}
deleteByKey <- function(table_name, key_name, id) {
row <- tibble::tibble_row(!!key_name := !!id)
self$deleteRows(table_name, row, key_name)
}

28
R/DBClientClass/CRUDRow.R Normal file
View File

@@ -0,0 +1,28 @@
insert <- function(table_name, ..., key_name = NULL) {
row <- tibble::as_tibble(list(...))
self$insertRows(table_name, row, key_name)
}
append <- function(table_name, ...) {
row <- tibble::as_tibble(list(...))
self$appendRows(table_name, row)
}
upsert <- function(table_name, ..., key_name = NULL) {
row <- tibble::as_tibble(list(...))
self$upsertRows(table_name, row, key_name)
}
update <- function(table_name, ..., key_name = NULL) {
row <- tibble::as_tibble(list(...))
self$updateRows(table_name, row, key_name)
}
delete <- function(table_name, ..., key_name = NULL) {
row <- tibble::as_tibble(list(...))
self$deleteRows(table_name, row, key_name)
}
truncate <- function(table_name) {
self$dbAction(paste("TRUNCATE TABLE", self$withSchema(table_name)))
}

158
R/DBClientClass/CRUDRows.R Normal file
View File

@@ -0,0 +1,158 @@
# NOTE: Temporary fix for issue: tidyverse/dbplyr#1133
.fix_dbplyr <- function(tbl) {
keys <- c("dbplyr_table_ident", "dbplyr_table_path")
if (sum(keys %in% class(tbl$lazy_query$x)) == 0) {
tbl$lazy_query$x <- dbplyr::as.sql(
tbl$lazy_query$x, private$client$getConnection()
)
}
return(tbl)
}
insertRows <- function(table_name, rows, key_name = NULL) {
tbl <- self$table(table_name)
new_rows <- dbplyr::copy_inline(private$client$getConnection(), rows)
if (getOption("db.simulate", FALSE)) {
return(printDebug("insertRows", table_name, key_name, tbl, rows, TRUE))
}
if (getOption("db.debug", FALSE)) {
printDebug("insertRows", table_name, key_name, tbl, rows)
}
tbl <- private$fix_dbplyr(tbl)
tbl <- dplyr::rows_insert(
tbl,
new_rows,
by = key_name,
conflict = "ignore",
in_place = TRUE,
returning = !!key_name
)
if (dbplyr::has_returned_rows(tbl)) {
return(dbplyr::get_returned_rows(tbl))
}
return(invisible(tbl))
}
appendRows <- function(table_name, rows, key_name = NULL) {
tbl <- self$table(table_name)
new_rows <- dbplyr::copy_inline(private$client$getConnection(), rows)
if (getOption("db.simulate", FALSE)) {
return(printDebug("appendRows", table_name, key_name, tbl, rows, TRUE))
}
if (getOption("db.debug", FALSE)) {
printDebug("appendRows", table_name, key_name, tbl, rows)
}
tbl <- private$fix_dbplyr(tbl)
tbl <- dplyr::rows_append(
tbl, new_rows, in_place = TRUE, returning = !!key_name
)
if (dbplyr::has_returned_rows(tbl)) {
return(dbplyr::get_returned_rows(tbl))
}
return(invisible(TRUE))
}
upsertRows <- function(table_name, rows, key_name = NULL) {
tbl <- self$table(table_name)
new_rows <- dbplyr::copy_inline(private$client$getConnection(), rows)
tbl <- private$fix_dbplyr(tbl)
# NOTE: Temporary fix for issue: tidyverse/dbplyr#1279
if ("MariaDBConnection" %in% class(private$client$getConnection())) {
keys <- rows %>%
dplyr::pull(!!key_name)
n_rows <- tbl %>%
dplyr::select(!!key_name) %>%
dplyr::filter(!!as.symbol(key_name) %in% !!keys) %>%
dplyr::count() %>%
dplyr::pull(n)
if (n_rows == 0) {
return(insertRows(table_name, rows, key_name))
} else {
return(updateRows(table_name, rows, key_name))
}
} else {
if (getOption("db.simulate", FALSE)) {
return(printDebug("upsertRows", table_name, key_name, tbl, rows, TRUE))
}
if (getOption("db.debug", FALSE)) {
printDebug("upsertRows", table_name, key_name, tbl, rows)
}
tbl <- dplyr::rows_upsert(tbl, new_rows, by = key_name, in_place = TRUE)
if (dbplyr::has_returned_rows(tbl)) {
return(dbplyr::get_returned_rows(tbl))
}
}
return(invisible(TRUE))
}
updateRows <- function(table_name, rows, key_name = NULL) {
tbl <- self$table(table_name)
new_rows <- dbplyr::copy_inline(private$client$getConnection(), rows)
if (getOption("db.simulate", FALSE)) {
return(printDebug("updateRows", table_name, key_name, tbl, rows, TRUE))
}
if (getOption("db.debug", FALSE)) {
printDebug("updateRows", table_name, key_name, tbl, rows)
}
tbl <- private$fix_dbplyr(tbl)
tbl <- dplyr::rows_update(
tbl, new_rows, by = key_name, unmatched = "ignore", in_place = TRUE
)
if (dbplyr::has_returned_rows(tbl)) {
return(dbplyr::get_returned_rows(tbl))
}
return(invisible(TRUE))
}
deleteRows <- function(table_name, rows, key_name = NULL) {
tbl <- self$table(table_name)
old_rows <- dbplyr::copy_inline(private$client$getConnection(), rows)
if (getOption("db.simulate", FALSE)) {
return(printDebug("deleteRows", table_name, key_name, tbl, rows, TRUE))
}
if (getOption("db.debug", FALSE)) {
printDebug("deleteRows", table_name, key_name, tbl, rows)
}
tbl <- private$fix_dbplyr(tbl)
tbl <- dplyr::rows_delete(
tbl,
old_rows,
by = key_name,
unmatched = "ignore",
in_place = TRUE,
returning = dplyr::everything()
)
if (dbplyr::has_returned_rows(tbl)) {
return(dbplyr::get_returned_rows(tbl))
}
return(invisible(TRUE))
}

View File

@@ -0,0 +1,8 @@
client <- NULL
Initialize <- function(client) {
if (is.null(client) || !is.R6(client) || inherits(client, "DBClient")) {
stop("DBClient instance required!")
}
private$client <- client
}

View File

@@ -0,0 +1,6 @@
dbAction <- function(statement) {
if (getOption("db.debug", FALSE)) {
cat("dbQuery:", statement, "\n")
}
DBI::dbExecute(private$client$getConnection(), statement)
}

View File

@@ -0,0 +1,52 @@
collectOrReturn <- function(
qry, collect = get("collect", pos = parent.frame())
) {
if (length(collect) == 1 && collect[[1]] == TRUE) {
return(dplyr::collect(qry))
} else {
return(qry)
}
}
collectGeometries <- function(
qry, geometry_cols = c("geometry"), geometry_crs = 4326, check_ring_dir = TRUE
) {
requireNamespace("sf")
qry <- qry %>%
dplyr::collect()
if (sum(geometry_cols %in% colnames(qry)) > 0) {
qry <- qry %>%
dplyr::mutate(
dplyr::across(dplyr::any_of(geometry_cols), function(.col) {
sf::st_as_sfc(
.col,
EWKB = TRUE,
crs = geometry_crs,
check_ring_dir = check_ring_dir
)
})
)
}
if ("geom" %in% colnames(qry) && sum("geom" %in% geometry_cols) == 0) {
qry <- qry %>%
dplyr::mutate(
geom = sf::st_as_sfc(
geom, EWKB = TRUE, crs = 4326, check_ring_dir = check_ring_dir
)
)
}
if ("proj" %in% colnames(qry) && sum("proj" %in% geometry_cols) == 0) {
qry <- qry %>%
dplyr::mutate(
proj = sf::st_as_sfc(
proj, EWKB = TRUE, crs = 3035, check_ring_dir = check_ring_dir
)
)
}
return(qry)
}

View File

@@ -0,0 +1,9 @@
dbQuery <- function(statement) {
if (getOption("db.debug", FALSE)) {
cat("dbQuery:", statement, "\n")
}
query <- DBI::dbSendQuery(private$client$getConnection(), statement)
res <- DBI::dbFetch(query)
DBI::dbClearResult(query)
res
}

View File

@@ -0,0 +1,21 @@
withSchema <- function(table_name) {
config <- private$client$getConfiguration()
if (exists("schema", config)) {
return(DBI::dbQuoteIdentifier(
getConnection(), DBI::Id(schema = config$schema, table = table_name)
))
} else {
return(DBI::dbQuoteIdentifier(
getConnection(), DBI::Id(table = table_name)
))
}
}
dplyrWithSchema <- function(table_name) {
config <- private$client$getConfiguration()
if (exists("schema", config) && !is.null(config$schema)) {
return(dbplyr::in_schema(config$schema, table_name))
} else {
return(table_name)
}
}

View File

@@ -0,0 +1,7 @@
table <- function(table_name) {
if (getOption("db.debug", FALSE)) {
cat("dbTable:", table_name, "\n")
}
dplyr::tbl(private$client$getConnection(), self$dplyrWithSchema(table_name))
}

View File

@@ -0,0 +1,8 @@
withTransaction <- function(expr) {
private$client$beginTransaction()
on.exit(private$client$rollbackTransaction())
expr
on.exit()
private$client$commitTransaction()
}

43
R/DBConnection.R Normal file
View File

@@ -0,0 +1,43 @@
class_env <- new.env(parent = baseenv())
res <- lapply(list.files("R/DBConnectionClass", pattern = ".R"), function(fn) {
eval(
parse(paste0("R/DBConnectionClass/", fn)),
envir = class_env,
enclos = emptyenv()
)
})
fn_names <- names(which(vapply(class_env, is.function, logical(1))))
fn_list <- lapply(fn_names, function(fn_name) {
class_env[[fn_name]]
})
names(fn_list) <- fn_names
priv_fn_names <- ls(class_env, pattern = "^\\..*", all.names = TRUE)
priv_fn_list <- lapply(priv_fn_names, function(fn_name) {
class_env[[fn_name]]
})
names(priv_fn_list) <- gsub("^\\.", "", priv_fn_names)
member_names <- names(which(!vapply(class_env, is.function, logical(1))))
member_list <- lapply(member_names, function(member_name) {
class_env[[member_name]]
})
names(member_list) <- member_names
#' @name DBConnection
#' @title DBConnection
#' @description Database Connection class
NULL
#' @export
DBConnection <- R6::R6Class(
"DBConnection",
public = fn_list,
private = c(member_list, priv_fn_list),
lock_class = TRUE,
parent_env = class_env
)

View File

@@ -0,0 +1,68 @@
config <- NULL
.configureList <- function(db) {
was_connected <- FALSE
if (!is.null(private$conn)) {
was_connected <- TRUE
disconnect()
}
private$config <- db
if (was_connected) {
connect()
}
return(invisible(private$config))
}
.configureDefault <- function(
host, port, user, password, dbname,
engine = NULL, name = NULL, schema = NULL, autoconnect = NULL
) {
if (is.null(autoconnect)) {
autoconnect <- getOption("db.autoconnect", FALSE)
}
private$configureList(list(
engine = engine,
host = host,
port = port,
user = user,
password = password,
name = ifelse(is.null(name), dbname, name),
schema = schema,
autoconnect = autoconnect
))
}
#' Configures the DBClient instance.
#'
#' There are 2 options for configuration, either: pass in a named list; or use
#' named parameters. The required elements are:
#'
#' @param ... Named parameters for configuring the connection or a name list of
#' parameters
#' @param engine The database engine (mysql, postgres) as supported by DBI
#' @param host The database host
#' @param port The database port
#' @param user The database user to connect with
#' @param password The database password to connect with
#' @param dbname The database to use
#' @param schema The database schema to use (postgres)
#' @param autoconnect Autoconnect to the database if not already connected
#'
#' @return The configuration accepted
configure <- function(...) {
args <- list(...)
if (sum("list" %in% class(args[[1]])) > 0) {
private$configureList(args[[1]])
} else {
do.call(private$configureDefault, args)
}
}
#' Get the current configuration of the DBClient instance.
getConfiguration <- function() {
private$config
}

View File

@@ -0,0 +1,85 @@
conn <- NULL
connect <- function() {
if (is.null(private$config)) {
if (!file.exists("global.R")) {
return(FALSE)
}
warning("Attempting to load default configuration from global.R")
e <- new.env(parent = baseenv())
eval(parse("global.R"), envir = e)
if ("db" %in% ls(e)) {
self$configure(e$db)
} else if ("global" %in% ls(e)) {
self$configure(e$global$db)
} else {
return(FALSE)
}
}
engine_fn <- NULL
if (exists("engine", private$config) && length(private$config$engine) > 0) {
engine_fn <- switch(
private$config$engine,
mariadb = RMariaDB::MariaDB,
postgres = RPostgres::Postgres
)
}
if (is.null(engine_fn)) {
if (exists("schema", private$config)) {
engine_fn <- RPostgres::Postgres
} else {
engine_fn <- RMariaDB::MariaDB
}
}
private$conn <- DBI::dbConnect(
engine_fn(),
host = private$config$host,
port = private$config$port,
dbname = private$config$name,
user = private$config$user,
password = private$config$password
)
res <- tryCatch(
{
DBI::dbListTables(private$conn)
TRUE
},
error = function(err) {
print("Database error:")
utils::str(err)
return(FALSE)
}
)
return(res)
}
getConnection <- function(do_connect = NULL) {
if (!self$isConnected()) {
if ((!is.null(do_connect) && do_connect)
|| (is.null(do_connect) && private$conn$autoconnect)) {
connect()
} else {
warning("Database is not connected. Caller of getConnection() may error.")
}
}
return(private$conn)
}
isConnected <- function() {
return(!is.null(private$conn))
}
disconnect <- function() {
if (!is.null(private$conn)) {
DBI::dbDisconnect(private$conn)
private$conn <- NULL
}
}
disconnectOnSessionEnd <- function(session) {
session$onSessionEnded(self$disconnect)
}

View File

@@ -0,0 +1,39 @@
transactionLevel <- 0
beginTransaction <- function(allowRecursive = FALSE, quietly = FALSE) {
if (private$transactionLevel > 0) {
if (allowRecursive == FALSE) {
if (quietly != FALSE) {
stop("Transaction already started. Aborting.")
}
return(FALSE)
}
}
private$transactionLevel <- private$transactionLevel + 1
DBI::dbBegin(self$getConnection())
}
rollbackTransaction <- function(quietly = FALSE) {
if (private$transactionLevel == 0) {
if (quietly != FALSE) {
stop("No transaction to rollback. Aborting.")
}
return(FALSE)
}
DB::dbRollback(self$getConnection())
}
commitTransaction <- function(quietly = FALSE) {
if (private$transactionLevel == 0) {
if (quietly != FALSE) {
stop("No transaction to commit. Aborting.")
}
return(FALSE)
}
private$transactionLevel <- private$transactionLevel - 1
if (transactionLevel > 0) {
return(TRUE)
}
DB::dbRollback(self$getConnection())
}

97
R/PrintDebug.R Normal file
View File

@@ -0,0 +1,97 @@
printDebugSQLInsert <- function(con, table, cols, from, key) {
dbplyr::sql_query_insert(
con = con,
table = table,
from = from,
insert_cols = cols,
by = key,
conflict = "ignore"
)
}
printDebugSQLAppend <- function(con, table, cols, from, key) {
dbplyr::sql_query_append(
con = con,
table = table,
from = from,
insert_cols = cols
)
}
printDebugSQLUpsert <- function(con, table, cols, from, key) {
dbplyr::sql_query_upsert(
con = con,
table = table,
from = from,
by = key,
update_cols = setdiff(cols, key)
)
}
printDebugSQLUpdate <- function(con, table, cols, from, key) {
update_cols <- setdiff(cols, key)
update_values <- dbplyr:::sql_table_prefix(con, update_cols, "...y")
names(update_values) <- update_cols
dbplyr::sql_query_update_from(
con = con,
table = table,
from = from,
by = key,
update_values = update_values
)
}
printDebugSQLDelete <- function(con, table, cols, from, key) {
dbplyr::sql_query_delete(
con = con,
table = table,
from = from,
by = key
)
}
printDebug <- function(fn_name, table, key, tbl, rows, query_only = FALSE) {
if (!query_only) {
cat(fn_name, ": ", table, " (", key, ")\n", sep = "")
cat("\t")
dplyr::glimpse(tbl)
cat("\t")
dplyr::glimpse(rows)
}
sqlFn <- switch(
tolower(substr(fn_name, 1, 3)),
`ins` = printDebugSQLInsert,
`app` = printDebugSQLAppend,
`ups` = printDebugSQLUpsert,
`upd` = printDebugSQLUpdate,
`del` = printDebugSQLDelete
)
con <- dbplyr::remote_con(tbl)
lvl <- ifelse(startsWith(fn_name, "del"), 2, 1)
sql <- sqlFn(
con,
dbplyr:::target_table(tbl, TRUE),
colnames(rows),
dbplyr::sql_render(dbplyr::copy_inline(con, rows), con, lvl = lvl),
key
)
cat(sql, "\n")
if (is.null(key)) {
return(invisible(rows))
}
return_keys <- NULL
if (key %in% colnames(rows)) {
return_keys <- rows %>%
dplyr::select(key)
} else {
return_keys <- rows %>%
dplyr::bind_rows(tibble::tibble(!!key := character(0))) %>%
dplyr::mutate(
!!key := dplyr::coalesce(
!!!dplyr::sym(key),
paste0("<<NEW_ID::", round(stats::runif(1, 1, 100000)), ">>")
)
) %>%
dplyr::select(key)
}
return(invisible(return_keys))
}