Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7afd09f
tests: Move S3 tests into shared filesystems tests
wjones127 Jul 6, 2022
729246f
test: try to get GCS testbench working in R CI
wjones127 Jul 7, 2022
86103fa
fix: enable reticulate to enable tests
wjones127 Jul 7, 2022
78d7ff0
feat: Add nice message for gcs tests
wjones127 Jul 11, 2022
e1a9e43
fix: drop slash only for openoutputstream
wjones127 Jul 11, 2022
d9da969
chore: lint R
wjones127 Jul 12, 2022
352b3e7
feat: add help message for starting testbench
wjones127 Jul 13, 2022
da12db9
test: differentiate test names for filesystems
wjones127 Jul 13, 2022
fa99d5f
feat: attempt to test GCS and MinIO on Windows CI
wjones127 Jul 13, 2022
ae01bf5
fix: use curl instead of wget
wjones127 Jul 15, 2022
2317ada
fix: reference absolute path to minio
wjones127 Jul 15, 2022
0ea8710
fix: move install down after new bash installed
wjones127 Jul 15, 2022
ecbd2d3
fix: drop unnecessary port
wjones127 Jul 15, 2022
e64ba03
fix: run gcs testbench in the background
wjones127 Jul 15, 2022
ac68b5e
fix: use R to run services in background
wjones127 Jul 15, 2022
09ec151
chore: undo accidental changes to C++
wjones127 Jul 15, 2022
2b67552
feat: move server init within tests
wjones127 Jul 15, 2022
a5bb5dc
fix: alternative messages should say not installed
wjones127 Jul 15, 2022
b6ac039
fix: install sys in Windows CI
wjones127 Jul 18, 2022
a6bec83
fix: as a workaround, use Rscript to install sys
wjones127 Jul 18, 2022
1fb253d
fix: add minio to path and sys to Suggests
wjones127 Jul 19, 2022
6b97163
fix: fix output path for minio
wjones127 Jul 19, 2022
baba443
fix: try without GCS on Windows for now
wjones127 Jul 20, 2022
c5d5e55
fix: remove unnecessary source
wjones127 Jul 20, 2022
e12f508
fix: move dplyr import and use skip instead of if statement
wjones127 Jul 21, 2022
b42042a
Update .github/workflows/r.yml
wjones127 Sep 20, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .github/workflows/r.yml
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,19 @@ jobs:
working-directory: 'r'
extra-packages: |
any::rcmdcheck
- name: Install MinIO
shell: bash
run: |
mkdir -p "$HOME/.local/bin"
curl \
--output "$HOME/.local/bin/minio.exe" \
https://dl.min.io/server/minio/release/windows-amd64/archive/minio.RELEASE.2022-05-26T05-48-41Z
chmod +x "$HOME/.local/bin/minio.exe"
echo "$HOME/.local/bin" >> $GITHUB_PATH
# TODO(ARROW-17149): figure out why the GCS tests are hanging on Windows
# - name: Install Google Cloud Storage Testbench
# shell: bash
# run: ci/scripts/install_gcs_testbench.sh default
- name: Check
shell: Rscript {0}
run: |
Expand Down
8 changes: 0 additions & 8 deletions ci/scripts/r_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,6 @@ SCRIPT="as_cran <- !identical(tolower(Sys.getenv('NOT_CRAN')), 'true')
} else {
args <- c('--no-manual', '--ignore-vignettes')
build_args <- '--no-build-vignettes'

if (nzchar(Sys.which('minio'))) {
message('Running minio for S3 tests (if build supports them)')
minio_dir <- tempfile()
dir.create(minio_dir)
pid_minio <- sys::exec_background('minio', c('server', minio_dir))
on.exit(tools::pskill(pid_minio), add = TRUE)
}
}

if (requireNamespace('reticulate', quietly = TRUE) && reticulate::py_module_available('pyarrow')) {
Expand Down
1 change: 1 addition & 0 deletions r/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Suggests:
rmarkdown,
stringi,
stringr,
sys,
testthat (>= 3.1.0),
tibble,
tzdb,
Expand Down
9 changes: 6 additions & 3 deletions r/R/filesystem.R
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,9 @@ gs_bucket <- function(bucket, ...) {
GcsFileSystem <- R6Class("GcsFileSystem",
inherit = FileSystem
)
GcsFileSystem$create <- function(anonymous = FALSE, ...) {
GcsFileSystem$create <- function(anonymous = FALSE, retry_limit_seconds = 15, ...) {
# The default retry limit in C++ is 15 minutes, but that is experienced as
# hanging in an interactive context, so default is set here to 15 seconds.
options <- list(...)

# Validate options
Expand Down Expand Up @@ -525,8 +527,7 @@ GcsFileSystem$create <- function(anonymous = FALSE, ...) {

valid_opts <- c(
"access_token", "expiration", "json_credentials", "endpoint_override",
"scheme", "default_bucket_location", "retry_limit_seconds",
"default_metadata"
"scheme", "default_bucket_location", "default_metadata"
)

invalid_opts <- setdiff(names(options), valid_opts)
Expand All @@ -538,6 +539,8 @@ GcsFileSystem$create <- function(anonymous = FALSE, ...) {
)
}

options$retry_limit_seconds <- retry_limit_seconds

fs___GcsFileSystem__Make(anonymous, options)
}

Expand Down
190 changes: 190 additions & 0 deletions r/tests/testthat/helper-filesystems.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

#' Run standard suite of integration tests for a filesystem
#'
#' @param name Name of filesystem to be printed in test name
#' @param fs A `FileSystem` instance to test with
#' @param path_formatter A function that takes a sequence of path segments and
#' returns a absolute path.
#' @param uri_formatter A function that takes a sequence of path segments and
#' returns a URI containing the filesystem scheme (e.g. 's3://', 'gs://'), the
#' absolute path, and any necessary connection options as URL query parameters.
test_filesystem <- function(name, fs, path_formatter, uri_formatter) {
# NOTE: it's important that we label these tests with name of filesystem so
# that we can differentiate the different calls to these test in the output.
test_that(sprintf("read/write Feather on %s using URIs", name), {
write_feather(example_data, uri_formatter("test.feather"))
expect_identical(read_feather(uri_formatter("test.feather")), example_data)
})

test_that(sprintf("read/write Feather on %s using Filesystem", name), {
write_feather(example_data, fs$path(path_formatter("test2.feather")))
expect_identical(
read_feather(fs$path(path_formatter("test2.feather"))),
example_data
)
})

if (!("package:dplyr" %in% search())) {
abort("library(dplyr) required for test_filesystem()")
}

test_that(sprintf("read/write compressed csv on %s using FileSystem", name), {
skip_if_not_available("gzip")
dat <- tibble(x = seq(1, 10, by = 0.2))
write_csv_arrow(dat, fs$path(path_formatter("test.csv.gz")))
expect_identical(
read_csv_arrow(fs$path(path_formatter("test.csv.gz"))),
dat
)
})

test_that(sprintf("read/write csv on %s using FileSystem", name), {
skip_if_not_available("gzip")
dat <- tibble(x = seq(1, 10, by = 0.2))
write_csv_arrow(dat, fs$path(path_formatter("test.csv")))
expect_identical(
read_csv_arrow(fs$path(path_formatter("test.csv"))),
dat
)
})

test_that(sprintf("read/write IPC stream on %s", name), {
write_ipc_stream(example_data, fs$path(path_formatter("test3.ipc")))
expect_identical(
read_ipc_stream(fs$path(path_formatter("test3.ipc"))),
example_data
)
})

test_that(sprintf("read/write Parquet on %s", name), {
skip_if_not_available("parquet")
write_parquet(example_data, fs$path(path_formatter("test.parquet")))
expect_identical(read_parquet(uri_formatter("test.parquet")), example_data)
})

if (arrow_with_dataset()) {
make_temp_dir <- function() {
path <- tempfile()
dir.create(path)
normalizePath(path, winslash = "/")
}

test_that(sprintf("open_dataset with an %s file (not directory) URI", name), {
skip_if_not_available("parquet")
expect_identical(
open_dataset(uri_formatter("test.parquet")) %>% collect() %>% arrange(int),
example_data %>% arrange(int)
)
})

test_that(sprintf("open_dataset with vector of %s file URIs", name), {
expect_identical(
open_dataset(
c(uri_formatter("test.feather"), uri_formatter("test2.feather")),
format = "feather"
) %>%
arrange(int) %>%
collect(),
rbind(example_data, example_data) %>% arrange(int)
)
})

test_that(sprintf("open_dataset errors if passed URIs mixing %s and local fs", name), {
td <- make_temp_dir()
expect_error(
open_dataset(
c(
uri_formatter("test.feather"),
paste0("file://", file.path(td, "fake.feather"))
),
format = "feather"
),
"Vectors of URIs for different file systems are not supported"
)
})

# Dataset test setup, cf. test-dataset.R
first_date <- lubridate::ymd_hms("2015-04-29 03:12:39")
df1 <- tibble(
int = 1:10,
dbl = as.numeric(1:10),
lgl = rep(c(TRUE, FALSE, NA, TRUE, FALSE), 2),
chr = letters[1:10],
fct = factor(LETTERS[1:10]),
ts = first_date + lubridate::days(1:10)
)

second_date <- lubridate::ymd_hms("2017-03-09 07:01:02")
df2 <- tibble(
int = 101:110,
dbl = as.numeric(51:60),
lgl = rep(c(TRUE, FALSE, NA, TRUE, FALSE), 2),
chr = letters[10:1],
fct = factor(LETTERS[10:1]),
ts = second_date + lubridate::days(10:1)
)

# This is also to set up the dataset tests
test_that(sprintf("write_parquet with %s filesystem arg", name), {
skip_if_not_available("parquet")
fs$CreateDir(path_formatter("hive_dir", "group=1", "other=xxx"))
fs$CreateDir(path_formatter("hive_dir", "group=2", "other=yyy"))
expect_length(fs$ls(path_formatter("hive_dir")), 2)
write_parquet(df1, fs$path(path_formatter("hive_dir", "group=1", "other=xxx", "file1.parquet")))
write_parquet(df2, fs$path(path_formatter("hive_dir", "group=2", "other=yyy", "file2.parquet")))
expect_identical(
read_parquet(fs$path(path_formatter("hive_dir", "group=1", "other=xxx", "file1.parquet"))),
df1
)
})

test_that(sprintf("open_dataset with %s", name), {
ds <- open_dataset(fs$path(path_formatter("hive_dir")))
expect_identical(
ds %>% select(int, dbl, lgl) %>% collect() %>% arrange(int),
rbind(df1[, c("int", "dbl", "lgl")], df2[, c("int", "dbl", "lgl")]) %>% arrange(int)
)
})

test_that(sprintf("write_dataset with %s", name), {
ds <- open_dataset(fs$path(path_formatter("hive_dir")))
write_dataset(ds, fs$path(path_formatter("new_dataset_dir")))
expect_length(fs$ls(path_formatter("new_dataset_dir")), 1)
})

test_that(sprintf("copy files with %s", name), {
td <- make_temp_dir()
copy_files(uri_formatter("hive_dir"), td)
expect_length(dir(td), 2)
ds <- open_dataset(td)
expect_identical(
ds %>% select(int, dbl, lgl) %>% collect() %>% arrange(int),
rbind(df1[, c("int", "dbl", "lgl")], df2[, c("int", "dbl", "lgl")]) %>% arrange(int)
)

# Let's copy the other way and use a SubTreeFileSystem rather than URI
copy_files(td, fs$path(path_formatter("hive_dir2")))
ds2 <- open_dataset(fs$path(path_formatter("hive_dir2")))
expect_identical(
ds2 %>% select(int, dbl, lgl) %>% collect() %>% arrange(int),
rbind(df1[, c("int", "dbl", "lgl")], df2[, c("int", "dbl", "lgl")]) %>% arrange(int)
)
})
} # if(arrow_with_dataset())
}
15 changes: 13 additions & 2 deletions r/tests/testthat/helper-skip.R
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,17 @@ process_is_running <- function(x) {
return(TRUE)
}

cmd <- sprintf("ps aux | grep '%s' | grep -v grep", x)
tryCatch(system(cmd, ignore.stdout = TRUE) == 0, error = function(e) FALSE)
if (tolower(Sys.info()[["sysname"]]) == "windows") {
# Batch scripts (CMD.exe) doesn't provide a command that shows the original
# call arguments, which we need for testbench since it's launched from Python.
inner_cmd <- paste("WMIC path win32_process get Commandline",
sprintf("| Select-String %s", x),
"| Select-String powershell.exe -NotMatch")
cmd <- sprintf("powershell -command \"%s\"", inner_cmd)
tryCatch(length(system(cmd, intern = TRUE, show.output.on.console = FALSE)) > 0,
error = function(e) FALSE)
} else {
cmd <- sprintf("ps aux | grep '%s' | grep -v grep", x)
tryCatch(system(cmd, ignore.stdout = TRUE) == 0, error = function(e) FALSE)
}
}
48 changes: 48 additions & 0 deletions r/tests/testthat/test-gcs.R
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,51 @@ test_that("GcsFileSystem$create() input validation", {
'Invalid options for GcsFileSystem: "role_arn"'
)
})

skip_on_cran()
skip_if_not(system('python -c "import testbench"') == 0, message = "googleapis-storage-testbench is not installed.")
library(dplyr)

testbench_port <- Sys.getenv("TESTBENCH_PORT", "9001")

pid_minio <- sys::exec_background("python", c("-m", "testbench", "--port", testbench_port),
std_out = FALSE,
std_err = FALSE # TODO: is there a good place to send output?
)
withr::defer(tools::pskill(pid_minio))
Sys.sleep(1) # Wait for startup

fs <- GcsFileSystem$create(
endpoint_override = sprintf("localhost:%s", testbench_port),
retry_limit_seconds = 1,
scheme = "http",
anonymous = TRUE # Will fail to resolve host name if anonymous isn't TRUE
)

now <- as.character(as.numeric(Sys.time()))
tryCatch(fs$CreateDir(now), error = function(cond) {
if (grepl("Couldn't connect to server", cond, fixed = TRUE)) {
abort(
c(sprintf("Unable to connect to testbench on port %s.", testbench_port),
i = "You can set a custom port with TESTBENCH_PORT environment variable."
),
parent = cond
)
} else {
stop(cond)
}
})
# Clean up when we're all done
withr::defer(fs$DeleteDir(now))

gcs_path <- function(...) {
paste(now, ..., sep = "/")
}
gcs_uri <- function(...) {
template <- "gs://anonymous@%s?scheme=http&endpoint_override=localhost%s%s&retry_limit_seconds=1"
sprintf(template, gcs_path(...), "%3A", testbench_port)
}

test_filesystem("gcs", fs, gcs_path, gcs_uri)

withr::deferred_run()
Loading