Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export(Id)
export(Postgres)
export(Redshift)
export(postgresDefault)
export(postgresExportLargeObject)
export(postgresHasDefault)
export(postgresImportLargeObject)
export(postgresIsTransacting)
Expand Down
48 changes: 47 additions & 1 deletion R/PqConnection.R
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ postgresIsTransacting <- function(conn) {

#' Imports a large object from file
#'
#' Returns an object idenfier (Oid) for the imported large object
#' Returns an object identifier (Oid) for the imported large object.
#' This function must be called within a transaction.
#'
#' @export
#' @param conn a [PqConnection-class] object, produced by
Expand Down Expand Up @@ -191,3 +192,48 @@ postgresImportLargeObject <- function(conn, filepath = NULL, oid = 0) {

connection_import_lo_from_file(conn@ptr, filepath, oid)
}

#' Exports a large object to file
#'
#' Exports a large object from the database to a file on disk. This function
#' uses PostgreSQL's `lo_export()` function which efficiently streams the data
#' directly to disk without loading it into memory, making it suitable for
#' very large objects (GB+) that would cause memory issues with `lo_get()`.
#' This function must be called within a transaction.
#'
#' @export
#' @param conn a [PqConnection-class] object, produced by
#' [DBI::dbConnect()]
#' @param oid the object identifier (Oid) of the large object to export
#' @param filepath a path where the large object should be exported
#' @return invisible NULL on success, or stops with an error
#' @examples
#' \dontrun{
#' con <- postgresDefault()
#' filepath <- 'your_image.png'
#' dbWithTransaction(con, {
#' oid <- postgresImportLargeObject(con, filepath)
#' })
#' # Later, export the large object back to a file
#' dbWithTransaction(con, {
#' postgresExportLargeObject(con, oid, 'exported_image.png')
#' })
#' }
postgresExportLargeObject <- function(conn, oid, filepath) {
if (!postgresIsTransacting(conn)) {
stopc("Cannot export a large object outside of a transaction")
}

if (is.null(oid)) {
stopc("'oid' cannot be NULL")
}
if (is.na(oid)) {
stopc("'oid' cannot be NA")
}
if (oid < 0) {
stopc("'oid' cannot be negative")
}

connection_export_lo_to_file(conn@ptr, oid, filepath)
invisible()
}
4 changes: 4 additions & 0 deletions R/cpp11.R
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ connection_import_lo_from_file <- function(con, filename, oid) {
.Call(`_RPostgres_connection_import_lo_from_file`, con, filename, oid)
}

connection_export_lo_to_file <- function(con, oid, filename) {
invisible(.Call(`_RPostgres_connection_export_lo_to_file`, con, oid, filename))
}

connection_copy_data <- function(con, sql, df) {
invisible(.Call(`_RPostgres_connection_copy_data`, con, sql, df))
}
Expand Down
1 change: 1 addition & 0 deletions _pkgdown.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ reference:
- postgresHasDefault
- postgresWaitForNotify
- postgresImportLargeObject
- postgresExportLargeObject

development:
mode: auto
Expand Down
39 changes: 39 additions & 0 deletions man/postgresExportLargeObject.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion man/postgresImportLargeObject.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions src/DbConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ Oid DbConnection::import_lo_from_file(std::string filename, Oid p_oid) {
return(lo_oid);
}

void DbConnection::export_lo_to_file(Oid p_oid, std::string filename) {
int result = lo_export(pConn_, p_oid, filename.c_str());
if (result != 1) cpp11::stop(PQerrorMessage(pConn_));
}

void DbConnection::copy_data(std::string sql, cpp11::list df) {
LOG_DEBUG << sql;

Expand Down
1 change: 1 addition & 0 deletions src/DbConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class DbConnection : boost::noncopyable {
void copy_data(std::string sql, cpp11::list df);

Oid import_lo_from_file(std::string file_path, Oid p_oid);
void export_lo_to_file(Oid p_oid, std::string file_path);


void check_connection();
Expand Down
5 changes: 5 additions & 0 deletions src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ Oid connection_import_lo_from_file(DbConnection* con, std::string filename, Oid
return con->import_lo_from_file(filename, oid);
}

[[cpp11::register]]
void connection_export_lo_to_file(DbConnection* con, Oid oid, std::string filename) {
con->export_lo_to_file(oid, filename);
}

[[cpp11::register]]
void connection_copy_data(DbConnection* con, std::string sql, cpp11::list df) {
return con->copy_data(sql, df);
Expand Down
9 changes: 9 additions & 0 deletions src/cpp11.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ extern "C" SEXP _RPostgres_connection_import_lo_from_file(SEXP con, SEXP filenam
END_CPP11
}
// connection.cpp
void connection_export_lo_to_file(DbConnection* con, Oid oid, std::string filename);
extern "C" SEXP _RPostgres_connection_export_lo_to_file(SEXP con, SEXP oid, SEXP filename) {
BEGIN_CPP11
connection_export_lo_to_file(cpp11::as_cpp<cpp11::decay_t<DbConnection*>>(con), cpp11::as_cpp<cpp11::decay_t<Oid>>(oid), cpp11::as_cpp<cpp11::decay_t<std::string>>(filename));
return R_NilValue;
END_CPP11
}
// connection.cpp
void connection_copy_data(DbConnection* con, std::string sql, cpp11::list df);
extern "C" SEXP _RPostgres_connection_copy_data(SEXP con, SEXP sql, SEXP df) {
BEGIN_CPP11
Expand Down Expand Up @@ -207,6 +215,7 @@ static const R_CallMethodDef CallEntries[] = {
{"_RPostgres_client_version", (DL_FUNC) &_RPostgres_client_version, 0},
{"_RPostgres_connection_copy_data", (DL_FUNC) &_RPostgres_connection_copy_data, 3},
{"_RPostgres_connection_create", (DL_FUNC) &_RPostgres_connection_create, 3},
{"_RPostgres_connection_export_lo_to_file", (DL_FUNC) &_RPostgres_connection_export_lo_to_file, 3},
{"_RPostgres_connection_get_temp_schema", (DL_FUNC) &_RPostgres_connection_get_temp_schema, 1},
{"_RPostgres_connection_import_lo_from_file", (DL_FUNC) &_RPostgres_connection_import_lo_from_file, 3},
{"_RPostgres_connection_info", (DL_FUNC) &_RPostgres_connection_info, 1},
Expand Down
101 changes: 96 additions & 5 deletions tests/testthat/test-ImportLargeObject.R
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
test_that("can import and read a large object", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
test_file_path <- paste0(test_path(), '/data/large_object.txt')
test_file_path <- file.path(test_path(), "data", "large_object.txt")
dbWithTransaction(con, {
oid <- postgresImportLargeObject(con, test_file_path)
})
Expand All @@ -11,15 +11,45 @@ test_that("can import and read a large object", {
"select lo_get($1) as lo_data",
params = list(oid)
)$lo_data[1])
large_object_txt <- as.raw(c(0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73)) # the string 'postgres'
large_object_txt <- as.raw(c(0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73)) # the string "postgres"
expect_equal(lo_data, large_object_txt)
})


test_that("can import and export a large object", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
test_file_path <- file.path(test_path(), "data", "large_object.txt")

# Import the large object
oid <- dbWithTransaction(con, {
postgresImportLargeObject(con, test_file_path)
})
expect_gt(oid, 0)

# Export to a temporary file
temp_file <- tempfile(fileext = ".txt")
on.exit(unlink(temp_file), add = TRUE)

dbWithTransaction(con, {
postgresExportLargeObject(con, oid, temp_file)
})

# Verify the exported file exists and has correct content
expect_true(file.exists(temp_file))
exported_content <- readBin(temp_file, "raw", file.size(temp_file))
original_content <- readBin(test_file_path, "raw", file.size(test_file_path))
expect_equal(exported_content, original_content)

# Clean up large object
dbExecute(con, "SELECT lo_unlink($1)", params = list(oid))
})


test_that("importing to an existing oid throws error", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
test_file_path <- paste0(test_path(), '/data/large_object.txt')
test_file_path <- file.path(test_path(), "data", "large_object.txt")
oid <- 1234
dbWithTransaction(con, {
oid <- postgresImportLargeObject(con, test_file_path, oid)
Expand All @@ -37,13 +67,74 @@ test_that("importing to an existing oid throws error", {
test_that("import from a non-existing path throws error", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
test_file_path <- paste0(
test_file_path <- file.path(
test_path(),
'/data/large_object_that_does_not_exist.txt'
"data",
"large_object_that_does_not_exist.txt"
)
expect_error(
dbWithTransaction(con, {
oid <- postgresImportLargeObject(con, test_file_path)
})
)
})


test_that("export outside transaction throws error", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
expect_error(
postgresExportLargeObject(con, 12345, tempfile()),
"Cannot export a large object outside of a transaction"
)
})


test_that("export with NULL oid throws error", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
expect_error(
dbWithTransaction(con, {
postgresExportLargeObject(con, NULL, tempfile())
}),
"'oid' cannot be NULL"
)
})


test_that("export with NA oid throws error", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
expect_error(
dbWithTransaction(con, {
postgresExportLargeObject(con, NA, tempfile())
}),
"'oid' cannot be NA"
)
})


test_that("export with negative oid throws error", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
expect_error(
dbWithTransaction(con, {
postgresExportLargeObject(con, -1, tempfile())
}),
"'oid' cannot be negative"
)
})


test_that("export of non-existent oid throws error", {
con <- postgresDefault()
on.exit(dbDisconnect(con))
temp_file <- tempfile()
on.exit(unlink(temp_file), add = TRUE)

expect_error(
dbWithTransaction(con, {
postgresExportLargeObject(con, 999999, temp_file)
})
)
})