-
Notifications
You must be signed in to change notification settings - Fork 4k
Open
Description
After ARROW-15040 we will be able to use a RecordBatchReader as input to a CSV file writer. It would be similarly helpful to be able to use a RecordBatchReader as input to write Parquet, IPC, and Feather files. The RecordBatchReader is important because it can be imported from C/Python/database results (e.g., DuckDB).
There is a workaround (at least for Parquet), but it's quite verbose:
tf <- tempfile(fileext = ".parquet")
df <- data.frame(a = 1:1e6)
arrow::write_parquet(df, tf)
dbdir <- ":memory:"
sink <- tempfile(fileext = ".parquet")
con <- DBI::dbConnect(duckdb::duckdb(dbdir = dbdir))
res <- DBI::dbSendQuery(con, glue::glue_sql("SELECT * FROM { tf }", .con = con), arrow = TRUE)
reader <- duckdb::duckdb_fetch_record_batch(res)
# write_parquet() doesn't *quite* support a record batch reader yet
# and we want to stream to the writer to support possibly
# bigger-than-memory results
fs <- arrow::LocalFileSystem$create()
stream <- fs$OpenOutputStream(sink)
chunk_size <- asNamespace("arrow")$calculate_chunk_size(1e6, 1)
batch <- arrow::Table$create(reader$read_next_batch())
writer <- arrow::ParquetFileWriter$create(
reader$schema,
stream,
properties = arrow::ParquetWriterProperties$create(batch)
)
writer$WriteTable(batch, chunk_size = chunk_size)
while (!is.null(batch <- reader$read_next_batch())) {
writer$WriteTable(arrow::Table$create(batch), chunk_size = chunk_size)
}
writer$Close()
stream$close()
DBI::dbDisconnect(con, shutdown = TRUE)
df2 <- dplyr::arrange(arrow::read_parquet(sink), a)
identical(df2$a, df$a)
#> [1] TRUEReporter: Dewey Dunnington / @paleolimbot
Related issues:
- [C++] Allow ParquetWriter to take a RecordBatchReader as input (is blocked by)
Note: This issue was originally created as ARROW-15405. Please see the migration documentation for further details.