diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index b14559d12a1..c3c6b151860 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -122,6 +122,7 @@ jobs: ARROW_DATASET: ON ARROW_FLIGHT: ON ARROW_GANDIVA: ON + ARROW_GCS: OFF ARROW_HDFS: ON ARROW_JEMALLOC: ON ARROW_ORC: ON diff --git a/.travis.yml b/.travis.yml index f906ba8686d..b3aa724107c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -60,7 +60,6 @@ jobs: DOCKER_RUN_ARGS: >- " -e ARROW_BUILD_STATIC=OFF - -e ARROW_GCS=OFF -e ARROW_ORC=OFF -e ARROW_USE_GLOG=OFF -e CMAKE_UNITY_BUILD=ON @@ -99,11 +98,11 @@ jobs: -e ARROW_GCS=OFF -e ARROW_MIMALLOC=OFF -e ARROW_ORC=OFF - -e ARROW_SUBSTRAIT=OFF -e ARROW_PARQUET=OFF -e ARROW_S3=OFF - -e CMAKE_UNITY_BUILD=ON + -e ARROW_SUBSTRAIT=OFF -e CMAKE_BUILD_PARALLEL_LEVEL=2 + -e CMAKE_UNITY_BUILD=ON -e PARQUET_BUILD_EXAMPLES=OFF -e PARQUET_BUILD_EXECUTABLES=OFF -e Protobuf_SOURCE=BUNDLED @@ -154,8 +153,8 @@ jobs: -e ARROW_PARQUET=OFF -e ARROW_PYTHON=ON -e ARROW_S3=OFF - -e CMAKE_UNITY_BUILD=ON -e CMAKE_BUILD_PARALLEL_LEVEL=2 + -e CMAKE_UNITY_BUILD=ON -e PARQUET_BUILD_EXAMPLES=OFF -e PARQUET_BUILD_EXECUTABLES=OFF -e Protobuf_SOURCE=BUNDLED diff --git a/appveyor.yml b/appveyor.yml index 8342dbf6cb3..2699e479b79 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -57,6 +57,7 @@ environment: # (as generated by cmake) - JOB: "Toolchain" GENERATOR: Ninja + ARROW_GCS: "ON" ARROW_S3: "ON" ARROW_BUILD_FLIGHT: "ON" ARROW_BUILD_GANDIVA: "ON" diff --git a/ci/docker/conda-python.dockerfile b/ci/docker/conda-python.dockerfile index 5ef69431b84..18106d8b18b 100644 --- a/ci/docker/conda-python.dockerfile +++ b/ci/docker/conda-python.dockerfile @@ -32,6 +32,11 @@ RUN mamba install -q \ nomkl && \ mamba clean --all +# XXX The GCS testbench was already installed in conda-cpp.dockerfile, +# but we changed the installed Python version above, so we need to reinstall it. +COPY ci/scripts/install_gcs_testbench.sh /arrow/ci/scripts +RUN /arrow/ci/scripts/install_gcs_testbench.sh default + ENV ARROW_PYTHON=ON \ ARROW_BUILD_STATIC=OFF \ ARROW_BUILD_TESTS=OFF \ diff --git a/ci/docker/debian-11-cpp.dockerfile b/ci/docker/debian-11-cpp.dockerfile index 1bb67e334e8..dfccd85e559 100644 --- a/ci/docker/debian-11-cpp.dockerfile +++ b/ci/docker/debian-11-cpp.dockerfile @@ -83,6 +83,7 @@ ENV ARROW_BUILD_TESTS=ON \ ARROW_DEPENDENCY_SOURCE=SYSTEM \ ARROW_FLIGHT=ON \ ARROW_GANDIVA=ON \ + ARROW_GCS=ON \ ARROW_HOME=/usr/local \ ARROW_ORC=ON \ ARROW_PARQUET=ON \ @@ -99,6 +100,7 @@ ENV ARROW_BUILD_TESTS=ON \ AWSSDK_SOURCE=BUNDLED \ CC=gcc \ CXX=g++ \ + google_cloud_cpp_storage_SOURCE=BUNDLED \ ORC_SOURCE=BUNDLED \ PATH=/usr/lib/ccache/:$PATH \ Protobuf_SOURCE=BUNDLED diff --git a/ci/docker/fedora-35-cpp.dockerfile b/ci/docker/fedora-35-cpp.dockerfile index b79ceb894bf..947c9aba1b7 100644 --- a/ci/docker/fedora-35-cpp.dockerfile +++ b/ci/docker/fedora-35-cpp.dockerfile @@ -77,6 +77,7 @@ ENV ARROW_BUILD_TESTS=ON \ ARROW_FLIGHT=ON \ ARROW_GANDIVA_JAVA=ON \ ARROW_GANDIVA=ON \ + ARROW_GCS=ON \ ARROW_HOME=/usr/local \ ARROW_ORC=ON \ ARROW_PARQUET=ON \ @@ -92,6 +93,7 @@ ENV ARROW_BUILD_TESTS=ON \ AWSSDK_SOURCE=BUNDLED \ CC=gcc \ CXX=g++ \ + google_cloud_cpp_storage_SOURCE=BUNDLED \ ORC_SOURCE=BUNDLED \ PARQUET_BUILD_EXECUTABLES=ON \ PARQUET_BUILD_EXAMPLES=ON \ diff --git a/ci/docker/linux-apt-docs.dockerfile b/ci/docker/linux-apt-docs.dockerfile index 3a8a9cf8e24..c1ee003f4f3 100644 --- a/ci/docker/linux-apt-docs.dockerfile +++ b/ci/docker/linux-apt-docs.dockerfile @@ -97,6 +97,7 @@ ENV ARROW_BUILD_STATIC=OFF \ ARROW_BUILD_TESTS=OFF \ ARROW_BUILD_UTILITIES=OFF \ ARROW_FLIGHT=ON \ + ARROW_GCS=ON \ ARROW_GLIB_VALA=false \ ARROW_PYTHON=ON \ ARROW_S3=ON \ diff --git a/ci/docker/python-wheel-manylinux-test.dockerfile b/ci/docker/python-wheel-manylinux-test.dockerfile index 55c27d1d7bb..cdd0ae3ced7 100644 --- a/ci/docker/python-wheel-manylinux-test.dockerfile +++ b/ci/docker/python-wheel-manylinux-test.dockerfile @@ -25,3 +25,6 @@ FROM ${arch}/python:${python} # test dependencies in a docker image COPY python/requirements-wheel-test.txt /arrow/python/ RUN pip install -r /arrow/python/requirements-wheel-test.txt + +COPY ci/scripts/install_gcs_testbench.sh /arrow/ci/scripts/ +RUN PYTHON=python /arrow/ci/scripts/install_gcs_testbench.sh default diff --git a/ci/docker/ubuntu-20.04-cpp.dockerfile b/ci/docker/ubuntu-20.04-cpp.dockerfile index 6e811ea2f71..0eade393cd4 100644 --- a/ci/docker/ubuntu-20.04-cpp.dockerfile +++ b/ci/docker/ubuntu-20.04-cpp.dockerfile @@ -96,6 +96,7 @@ RUN apt-get update -y -q && \ nlohmann-json3-dev \ pkg-config \ protobuf-compiler \ + python3-dev \ python3-pip \ python3-rados \ rados-objclass-dev \ diff --git a/ci/docker/ubuntu-22.04-cpp.dockerfile b/ci/docker/ubuntu-22.04-cpp.dockerfile index a7cc5ff38ad..1398dcd636a 100644 --- a/ci/docker/ubuntu-22.04-cpp.dockerfile +++ b/ci/docker/ubuntu-22.04-cpp.dockerfile @@ -156,6 +156,7 @@ ENV ARROW_BUILD_TESTS=ON \ ARROW_FLIGHT=ON \ ARROW_FLIGHT_SQL=ON \ ARROW_GANDIVA=ON \ + ARROW_GCS=ON \ ARROW_HDFS=ON \ ARROW_HOME=/usr/local \ ARROW_INSTALL_NAME_RPATH=OFF \ @@ -175,6 +176,7 @@ ENV ARROW_BUILD_TESTS=ON \ ARROW_WITH_ZLIB=ON \ ARROW_WITH_ZSTD=ON \ AWSSDK_SOURCE=BUNDLED \ + google_cloud_cpp_storage_SOURCE=BUNDLED \ GTest_SOURCE=BUNDLED \ ORC_SOURCE=BUNDLED \ PARQUET_BUILD_EXAMPLES=ON \ diff --git a/ci/scripts/install_gcs_testbench.sh b/ci/scripts/install_gcs_testbench.sh index 0282e0fda50..c7a6ee7a6dc 100755 --- a/ci/scripts/install_gcs_testbench.sh +++ b/ci/scripts/install_gcs_testbench.sh @@ -24,10 +24,24 @@ if [ "$#" -ne 1 ]; then exit 1 fi -if [ "$(uname -m)" != "x86_64" ]; then - echo "GCS testbench won't install on non-x86 architecture" - exit 0 -fi +case "$(uname -m)" in + aarch64|arm64|x86_64) + : # OK + ;; + *) + echo "GCS testbench is installed only on x86 or arm architectures: $(uname -m)" + exit 0 + ;; +esac + +case "$(uname -s)-$(uname -m)" in + Darwin-arm64) + # Workaround for https://github.com/grpc/grpc/issues/28387 . + # Build grpcio instead of using wheel. + # storage-testbench 0.16.0 pins grpcio to 1.44.0. + ${PYTHON:-python3} -m pip install --no-binary :all: "grpcio==1.44.0" + ;; +esac version=$1 if [[ "${version}" -eq "default" ]]; then diff --git a/ci/scripts/python_build.sh b/ci/scripts/python_build.sh index b90321643c7..cfac68bd6ec 100755 --- a/ci/scripts/python_build.sh +++ b/ci/scripts/python_build.sh @@ -58,6 +58,7 @@ export PYARROW_WITH_CUDA=${ARROW_CUDA:-OFF} export PYARROW_WITH_DATASET=${ARROW_DATASET:-ON} export PYARROW_WITH_FLIGHT=${ARROW_FLIGHT:-OFF} export PYARROW_WITH_GANDIVA=${ARROW_GANDIVA:-OFF} +export PYARROW_WITH_GCS=${ARROW_GCS:-OFF} export PYARROW_WITH_HDFS=${ARROW_HDFS:-ON} export PYARROW_WITH_ORC=${ARROW_ORC:-OFF} export PYARROW_WITH_PLASMA=${ARROW_PLASMA:-OFF} diff --git a/ci/scripts/python_test.sh b/ci/scripts/python_test.sh index e1d06c18727..4e2990b84d6 100755 --- a/ci/scripts/python_test.sh +++ b/ci/scripts/python_test.sh @@ -38,6 +38,7 @@ export ARROW_DEBUG_MEMORY_POOL=trap : ${PYARROW_TEST_DATASET:=${ARROW_DATASET:-ON}} : ${PYARROW_TEST_FLIGHT:=${ARROW_FLIGHT:-ON}} : ${PYARROW_TEST_GANDIVA:=${ARROW_GANDIVA:-ON}} +: ${PYARROW_TEST_GCS:=${ARROW_GCS:-ON}} : ${PYARROW_TEST_HDFS:=${ARROW_HDFS:-ON}} : ${PYARROW_TEST_ORC:=${ARROW_ORC:-ON}} : ${PYARROW_TEST_PARQUET:=${ARROW_PARQUET:-ON}} @@ -47,6 +48,7 @@ export PYARROW_TEST_CUDA export PYARROW_TEST_DATASET export PYARROW_TEST_FLIGHT export PYARROW_TEST_GANDIVA +export PYARROW_TEST_GCS export PYARROW_TEST_HDFS export PYARROW_TEST_ORC export PYARROW_TEST_PARQUET diff --git a/ci/scripts/python_wheel_macos_build.sh b/ci/scripts/python_wheel_macos_build.sh index b3ae912dff6..7fa43a3eaa2 100755 --- a/ci/scripts/python_wheel_macos_build.sh +++ b/ci/scripts/python_wheel_macos_build.sh @@ -64,7 +64,7 @@ echo "=== (${PYTHON_VERSION}) Building Arrow C++ libraries ===" : ${ARROW_DATASET:=ON} : ${ARROW_FLIGHT:=ON} : ${ARROW_GANDIVA:=OFF} -: ${ARROW_GCS:=OFF} +: ${ARROW_GCS:=ON} : ${ARROW_HDFS:=ON} : ${ARROW_JEMALLOC:=ON} : ${ARROW_MIMALLOC:=ON} @@ -148,6 +148,7 @@ export PYARROW_INSTALL_TESTS=1 export PYARROW_WITH_DATASET=${ARROW_DATASET} export PYARROW_WITH_FLIGHT=${ARROW_FLIGHT} export PYARROW_WITH_GANDIVA=${ARROW_GANDIVA} +export PYARROW_WITH_GCS=${ARROW_GCS} export PYARROW_WITH_HDFS=${ARROW_HDFS} export PYARROW_WITH_ORC=${ARROW_ORC} export PYARROW_WITH_PARQUET=${ARROW_PARQUET} diff --git a/ci/scripts/python_wheel_manylinux_build.sh b/ci/scripts/python_wheel_manylinux_build.sh index d242fe657c5..6cfd34d851f 100755 --- a/ci/scripts/python_wheel_manylinux_build.sh +++ b/ci/scripts/python_wheel_manylinux_build.sh @@ -51,7 +51,7 @@ echo "=== (${PYTHON_VERSION}) Building Arrow C++ libraries ===" : ${ARROW_DATASET:=ON} : ${ARROW_FLIGHT:=ON} : ${ARROW_GANDIVA:=OFF} -: ${ARROW_GCS:=OFF} +: ${ARROW_GCS:=ON} : ${ARROW_HDFS:=ON} : ${ARROW_JEMALLOC:=ON} : ${ARROW_MIMALLOC:=ON} @@ -144,6 +144,7 @@ export PYARROW_INSTALL_TESTS=1 export PYARROW_WITH_DATASET=${ARROW_DATASET} export PYARROW_WITH_FLIGHT=${ARROW_FLIGHT} export PYARROW_WITH_GANDIVA=${ARROW_GANDIVA} +export PYARROW_WITH_GCS=${ARROW_GCS} export PYARROW_WITH_HDFS=${ARROW_HDFS} export PYARROW_WITH_ORC=${ARROW_ORC} export PYARROW_WITH_PARQUET=${ARROW_PARQUET} diff --git a/ci/scripts/python_wheel_unix_test.sh b/ci/scripts/python_wheel_unix_test.sh index 99436e0c1fa..2b2fe9cdf11 100755 --- a/ci/scripts/python_wheel_unix_test.sh +++ b/ci/scripts/python_wheel_unix_test.sh @@ -31,6 +31,7 @@ source_dir=${1} : ${ARROW_FLIGHT:=ON} : ${ARROW_SUBSTRAIT:=ON} : ${ARROW_S3:=ON} +: ${ARROW_GCS:=ON} : ${CHECK_IMPORTS:=ON} : ${CHECK_UNITTESTS:=ON} : ${INSTALL_PYARROW:=ON} @@ -39,6 +40,7 @@ export PYARROW_TEST_CYTHON=OFF export PYARROW_TEST_DATASET=ON export PYARROW_TEST_FLIGHT=${ARROW_FLIGHT} export PYARROW_TEST_GANDIVA=OFF +export PYARROW_TEST_GCS=${ARROW_GCS} export PYARROW_TEST_HDFS=ON export PYARROW_TEST_ORC=ON export PYARROW_TEST_PANDAS=ON @@ -69,6 +71,9 @@ import pyarrow.orc import pyarrow.parquet import pyarrow.plasma " + if [ "${PYARROW_TEST_GCS}" == "ON" ]; then + python -c "import pyarrow._gcsfs" + fi if [ "${PYARROW_TEST_S3}" == "ON" ]; then python -c "import pyarrow._s3fs" fi @@ -81,8 +86,14 @@ import pyarrow.plasma fi if [ "${CHECK_UNITTESTS}" == "ON" ]; then - # Install testing dependencies - pip install -U -r ${source_dir}/python/requirements-wheel-test.txt + # Generally, we should install testing dependencies here to install + # built wheels without testing dependencies. Testing dependencies are + # installed in ci/docker/python-wheel-manylinux-test.dockerfile to + # reduce test time. + # + # We also need to update dev/tasks/python-wheels/*.yml when we need + # to add more steps to prepare testing dependencies. + # Execute unittest, test dependencies must be installed python -c 'import pyarrow; pyarrow.create_library_symlinks()' python -m pytest -r s --pyargs pyarrow diff --git a/cpp/src/arrow/filesystem/api.h b/cpp/src/arrow/filesystem/api.h index 5b0c97d150a..732be5f928f 100644 --- a/cpp/src/arrow/filesystem/api.h +++ b/cpp/src/arrow/filesystem/api.h @@ -21,8 +21,11 @@ #include "arrow/filesystem/filesystem.h" // IWYU pragma: export #include "arrow/filesystem/hdfs.h" // IWYU pragma: export -#include "arrow/filesystem/localfs.h" // IWYU pragma: export -#include "arrow/filesystem/mockfs.h" // IWYU pragma: export +#ifdef ARROW_GCS +#include "arrow/filesystem/gcsfs.h" // IWYU pragma: export +#endif +#include "arrow/filesystem/localfs.h" // IWYU pragma: export +#include "arrow/filesystem/mockfs.h" // IWYU pragma: export #ifdef ARROW_S3 #include "arrow/filesystem/s3fs.h" // IWYU pragma: export #endif diff --git a/cpp/src/arrow/filesystem/filesystem.cc b/cpp/src/arrow/filesystem/filesystem.cc index 18b39125f50..48b4646bea0 100644 --- a/cpp/src/arrow/filesystem/filesystem.cc +++ b/cpp/src/arrow/filesystem/filesystem.cc @@ -695,8 +695,7 @@ Result> FileSystemFromUriReal(const Uri& uri, if (scheme == "gs" || scheme == "gcs") { #ifdef ARROW_GCS ARROW_ASSIGN_OR_RAISE(auto options, GcsOptions::FromUri(uri, out_path)); - ARROW_ASSIGN_OR_RAISE(auto gcsfs, GcsFileSystem::Make(options, io_context)); - return gcsfs; + return GcsFileSystem::Make(options, io_context); #else return Status::NotImplemented("Got GCS URI but Arrow compiled without GCS support"); #endif diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h index dfa2b740087..6dc18d7de84 100644 --- a/cpp/src/arrow/filesystem/filesystem.h +++ b/cpp/src/arrow/filesystem/filesystem.h @@ -452,7 +452,8 @@ class ARROW_EXPORT SlowFileSystem : public FileSystem { /// \brief Create a new FileSystem by URI /// -/// Recognized schemes are "file", "mock", "hdfs" and "s3fs". +/// Recognized schemes are "file", "mock", "hdfs", "viewfs", "s3", +/// "gs" and "gcs". /// /// \param[in] uri a URI-based path, ex: file:///some/local/path /// \param[out] out_path (optional) Path inside the filesystem. @@ -463,7 +464,8 @@ Result> FileSystemFromUri(const std::string& uri, /// \brief Create a new FileSystem by URI with a custom IO context /// -/// Recognized schemes are "file", "mock", "hdfs" and "s3fs". +/// Recognized schemes are "file", "mock", "hdfs", "viewfs", "s3", +/// "gs" and "gcs". /// /// \param[in] uri a URI-based path, ex: file:///some/local/path /// \param[in] io_context an IOContext which will be associated with the filesystem diff --git a/cpp/src/arrow/filesystem/gcsfs.cc b/cpp/src/arrow/filesystem/gcsfs.cc index 9bd1b15b998..82d2439a607 100644 --- a/cpp/src/arrow/filesystem/gcsfs.cc +++ b/cpp/src/arrow/filesystem/gcsfs.cc @@ -19,11 +19,13 @@ #include #include +#include #include "arrow/buffer.h" #include "arrow/filesystem/gcsfs_internal.h" #include "arrow/filesystem/path_util.h" #include "arrow/filesystem/util_internal.h" +#include "arrow/io/util_internal.h" #include "arrow/result.h" #include "arrow/util/checked_cast.h" #include "arrow/util/thread_pool.h" @@ -33,13 +35,23 @@ namespace arrow { namespace fs { -struct GcsCredentials { - explicit GcsCredentials(std::shared_ptr c) - : credentials(std::move(c)) {} - +struct GcsCredentialsHolder { + // Constructor needed for make_shared + explicit GcsCredentialsHolder(std::shared_ptr credentials) + : credentials(std::move(credentials)) {} std::shared_ptr credentials; }; +bool GcsCredentials::Equals(const GcsCredentials& other) const { + if (holder_->credentials == other.holder_->credentials) { + return true; + } + return anonymous_ == other.anonymous_ && access_token_ == other.access_token_ && + expiration_ == other.expiration_ && + json_credentials_ == other.json_credentials_ && + target_service_account_ == other.target_service_account_; +} + namespace { namespace gcs = google::cloud::storage; @@ -95,12 +107,10 @@ struct GcsPath { class GcsInputStream : public arrow::io::InputStream { public: explicit GcsInputStream(gcs::ObjectReadStream stream, GcsPath path, - gcs::Generation generation, gcs::ReadFromOffset offset, - gcs::Client client) + gcs::Generation generation, gcs::Client client) : stream_(std::move(stream)), path_(std::move(path)), generation_(generation), - offset_(offset.value_or(0)), client_(std::move(client)) {} ~GcsInputStream() override = default; @@ -115,7 +125,7 @@ class GcsInputStream : public arrow::io::InputStream { Result Tell() const override { if (closed()) return Status::Invalid("Cannot use Tell() on a closed stream"); - return stream_.tellg() + offset_; + return stream_.tellg(); } // A gcs::ObjectReadStream can be "born closed". For small objects the stream returns @@ -156,7 +166,6 @@ class GcsInputStream : public arrow::io::InputStream { mutable gcs::ObjectReadStream stream_; GcsPath path_; gcs::Generation generation_; - std::int64_t offset_; gcs::Client client_; bool closed_ = false; }; @@ -164,9 +173,17 @@ class GcsInputStream : public arrow::io::InputStream { class GcsOutputStream : public arrow::io::OutputStream { public: explicit GcsOutputStream(gcs::ObjectWriteStream stream) : stream_(std::move(stream)) {} - ~GcsOutputStream() override = default; + ~GcsOutputStream() { + if (!closed_) { + // The common pattern is to close OutputStreams from destructor in arrow. + io::internal::CloseFromDestructor(this); + } + } Status Close() override { + if (closed_) { + return Status::OK(); + } stream_.Close(); closed_ = true; return internal::ToArrowStatus(stream_.last_status()); @@ -297,8 +314,15 @@ google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o) { if (!o.endpoint_override.empty()) { options.set(scheme + "://" + o.endpoint_override); } - if (o.credentials && o.credentials->credentials) { - options.set(o.credentials->credentials); + if (o.credentials.holder() && o.credentials.holder()->credentials) { + options.set( + o.credentials.holder()->credentials); + } + if (o.retry_limit_seconds.has_value()) { + options.set( + gcs::LimitedTimeRetryPolicy( + std::chrono::milliseconds(static_cast(*o.retry_limit_seconds * 1000))) + .clone()); } return options; } @@ -318,19 +342,44 @@ class GcsFileSystem::Impl { return GetFileInfoBucket(path, std::move(meta).status()); } auto meta = client_.GetObjectMetadata(path.bucket, path.object); - return GetFileInfoObject(path, meta); + Result info = GetFileInfoObject(path, meta); + if (!info.ok() || info->type() != FileType::NotFound) { + return info; + } + // Not found case. It could be this was written to GCS with a different + // "Directory" convention, so if there is at least one object that + // matches the prefix we assume it is a directory. + std::string canonical = internal::EnsureTrailingSlash(path.object); + auto list_result = client_.ListObjects(path.bucket, gcs::Prefix(canonical)); + if (list_result.begin() != list_result.end()) { + // If there is at least one result it indicates this is a directory (at + // least one object exists that starts with "path/") + return FileInfo(path.full_path, FileType::Directory); + } + // Return the original not-found info if there was no match. + return info; } Result GetFileInfo(const FileSelector& select) { ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(select.base_dir)); - // Adding the trailing '/' avoids problems with files named 'a', 'ab', 'ac' where GCS - // would return all of them if the prefix is 'a'. + // Adding the trailing '/' avoids problems with files named 'a', 'ab', 'ac' where + // GCS would return all of them if the prefix is 'a'. const auto canonical = internal::EnsureTrailingSlash(p.object); - const auto max_depth = internal::Depth(canonical) + select.max_recursion; + // Need to add one level when the object is not empty because all + // directories have an extra slash. + const auto max_depth = + internal::Depth(canonical) + select.max_recursion + !p.object.empty(); auto prefix = p.object.empty() ? gcs::Prefix() : gcs::Prefix(canonical); auto delimiter = select.recursive ? gcs::Delimiter() : gcs::Delimiter("/"); + // Include trailing delimiters ensures that files matching "directory" + // conventions are also included in the listing. + // Only included for select.recursive false because a delimiter needs + // to be specified. + auto include_trailing = select.recursive ? gcs::IncludeTrailingDelimiter(false) + : gcs::IncludeTrailingDelimiter(true); FileInfoVector result; - for (auto const& o : client_.ListObjects(p.bucket, prefix, delimiter)) { + for (auto const& o : + client_.ListObjects(p.bucket, prefix, delimiter, include_trailing)) { if (!o.ok()) { if (select.allow_not_found && o.status().code() == google::cloud::StatusCode::kNotFound) { @@ -340,11 +389,11 @@ class GcsFileSystem::Impl { } // Skip the directory itself from the results, and any result that is "too deep" // into the recursion. - if (o->name() == p.object || internal::Depth(o->name()) > max_depth) { + if (o->name() == canonical || internal::Depth(o->name()) > max_depth) { continue; } auto path = internal::ConcatAbstractPath(o->bucket(), o->name()); - result.push_back(ToFileInfo(path, *o)); + result.push_back(ToFileInfo(path, *o, /*normalize_directories=*/true)); } // Finding any elements indicates the directory was found. if (!result.empty() || select.allow_not_found) { @@ -365,7 +414,7 @@ class GcsFileSystem::Impl { google::cloud::StatusOr CreateDirMarker(const std::string& bucket, util::string_view name) { // Make the name canonical. - const auto canonical = internal::RemoveTrailingSlash(name).to_string(); + const auto canonical = internal::EnsureTrailingSlash(name); google::cloud::StatusOr object = client_.InsertObject( bucket, canonical, std::string(), gcs::WithObjectMetadata( @@ -398,6 +447,13 @@ class GcsFileSystem::Impl { if (o) { if (IsDirectory(*o)) break; return NotDirectoryError(*o); + } else { + // If we didn't find the raw path, check if there is an entry + // ending in a slash. + o = client_.GetObjectMetadata(bucket, internal::EnsureTrailingSlash(dir)); + if (o) { + break; + } } missing_parents.push_back(dir); } @@ -430,15 +486,17 @@ class GcsFileSystem::Impl { Status CreateDir(const GcsPath& p) { if (p.object.empty()) { - return internal::ToArrowStatus( - client_ - .CreateBucket(p.bucket, gcs::BucketMetadata().set_location( - options_.default_bucket_location)) - .status()); + auto metadata = + gcs::BucketMetadata().set_location(options_.default_bucket_location); + return internal::ToArrowStatus(client_.CreateBucket(p.bucket, metadata).status()); } auto parent = p.parent(); if (!parent.object.empty()) { - auto o = client_.GetObjectMetadata(p.bucket, parent.object); + auto o = client_.GetObjectMetadata(p.bucket, + internal::EnsureTrailingSlash(parent.object)); + if (!o.ok()) { + return internal::ToArrowStatus(o.status()); + } if (!IsDirectory(*o)) return NotDirectoryError(*o); } return internal::ToArrowStatus(CreateDirMarker(p.bucket, p.object).status()); @@ -451,7 +509,8 @@ class GcsFileSystem::Impl { Status DeleteDir(const GcsPath& p, const io::IOContext& io_context) { RETURN_NOT_OK(DeleteDirContents(p, /*missing_dir_ok=*/false, io_context)); if (!p.object.empty()) { - return internal::ToArrowStatus(client_.DeleteObject(p.bucket, p.object)); + auto canonical = std::string(internal::EnsureTrailingSlash(p.object)); + return internal::ToArrowStatus(client_.DeleteObject(p.bucket, canonical)); } return internal::ToArrowStatus(client_.DeleteBucket(p.bucket)); } @@ -484,7 +543,7 @@ class GcsFileSystem::Impl { submitted.push_back(DeferNotOk(io_context.executor()->Submit(async_delete, o))); } - if (!missing_dir_ok && !at_least_one_obj && !dir) { + if (!missing_dir_ok && !at_least_one_obj && !dir && !p.object.empty()) { // No files were found and no directory marker exists return Status::IOError("No such directory: ", p.full_path); } @@ -544,8 +603,7 @@ class GcsFileSystem::Impl { gcs::ReadFromOffset offset) { auto stream = client_.ReadObject(path.bucket, path.object, generation, offset); ARROW_GCS_RETURN_NOT_OK(stream.status()); - return std::make_shared(std::move(stream), path, gcs::Generation(), - offset, client_); + return std::make_shared(std::move(stream), path, generation, client_); } Result> OpenOutputStream( @@ -603,10 +661,25 @@ class GcsFileSystem::Impl { return internal::ToArrowStatus(meta.status()); } + // The normalize_directories parameter is needed because + // how a directory is listed. If a specific path is asked + // for with a trailing slash it is expected to have a trailing + // slash [1] but for recursive listings it is expected that + // directories have their path normalized [2]. + // [1] + // https://github.com/apache/arrow/blob/3eaa7dd0e8b3dabc5438203331f05e3e6c011e37/python/pyarrow/tests/test_fs.py#L688 + // [2] + // https://github.com/apache/arrow/blob/3eaa7dd0e8b3dabc5438203331f05e3e6c011e37/cpp/src/arrow/filesystem/test_util.cc#L767 static FileInfo ToFileInfo(const std::string& full_path, - const gcs::ObjectMetadata& meta) { - if (IsDirectory(meta)) { - return FileInfo(full_path, FileType::Directory); + const gcs::ObjectMetadata& meta, + bool normalize_directories = false) { + if (IsDirectory(meta) || (!full_path.empty() && full_path.back() == '/')) { + if (normalize_directories) { + auto normalized = std::string(internal::RemoveTrailingSlash(full_path)); + return FileInfo(std::move(normalized), FileType::Directory); + } else { + return FileInfo(full_path, FileType::Directory); + } } auto info = FileInfo(full_path, FileType::File); info.set_size(static_cast(meta.size())); @@ -621,33 +694,43 @@ class GcsFileSystem::Impl { gcs::Client client_; }; +GcsOptions::GcsOptions() { + this->credentials.holder_ = std::make_shared( + google::cloud::MakeGoogleDefaultCredentials()); + this->scheme = "https"; +} + bool GcsOptions::Equals(const GcsOptions& other) const { - return credentials == other.credentials && + return credentials.Equals(other.credentials) && endpoint_override == other.endpoint_override && scheme == other.scheme && - default_bucket_location == other.default_bucket_location; + default_bucket_location == other.default_bucket_location && + retry_limit_seconds == other.retry_limit_seconds; } GcsOptions GcsOptions::Defaults() { - GcsOptions options{}; - options.credentials = - std::make_shared(google::cloud::MakeGoogleDefaultCredentials()); - options.scheme = "https"; + GcsOptions options; return options; } GcsOptions GcsOptions::Anonymous() { GcsOptions options{}; - options.credentials = - std::make_shared(google::cloud::MakeInsecureCredentials()); + options.credentials.holder_ = + std::make_shared(google::cloud::MakeInsecureCredentials()); + options.credentials.anonymous_ = true; options.scheme = "http"; return options; } GcsOptions GcsOptions::FromAccessToken(const std::string& access_token, - std::chrono::system_clock::time_point expiration) { + TimePoint expiration) { GcsOptions options{}; - options.credentials = std::make_shared( - google::cloud::MakeAccessTokenCredentials(access_token, expiration)); + options.credentials.holder_ = + std::make_shared(google::cloud::MakeAccessTokenCredentials( + access_token, + std::chrono::time_point_cast( + expiration))); + options.credentials.access_token_ = access_token; + options.credentials.expiration_ = expiration; options.scheme = "https"; return options; } @@ -655,17 +738,20 @@ GcsOptions GcsOptions::FromAccessToken(const std::string& access_token, GcsOptions GcsOptions::FromImpersonatedServiceAccount( const GcsCredentials& base_credentials, const std::string& target_service_account) { GcsOptions options{}; - options.credentials = std::make_shared( + options.credentials = base_credentials; + options.credentials.holder_ = std::make_shared( google::cloud::MakeImpersonateServiceAccountCredentials( - base_credentials.credentials, target_service_account)); + base_credentials.holder_->credentials, target_service_account)); + options.credentials.target_service_account_ = target_service_account; options.scheme = "https"; return options; } GcsOptions GcsOptions::FromServiceAccountCredentials(const std::string& json_object) { GcsOptions options{}; - options.credentials = std::make_shared( + options.credentials.holder_ = std::make_shared( google::cloud::MakeServiceAccountCredentials(json_object)); + options.credentials.json_credentials_ = json_object; options.scheme = "https"; return options; } @@ -698,11 +784,16 @@ Result GcsOptions::FromUri(const arrow::internal::Uri& uri, options_map.emplace(kv.first, kv.second); } - if (!uri.password().empty() || !uri.username().empty()) { - return Status::Invalid("GCS does not accept username or password."); + const std::string& username = uri.username(); + bool anonymous = username == "anonymous"; + if (!username.empty() && !anonymous) { + return Status::Invalid("GCS URIs do not accept username except \"anonymous\"."); + } + if (!uri.password().empty()) { + return Status::Invalid("GCS URIs do not accept password."); } + auto options = anonymous ? GcsOptions::Anonymous() : GcsOptions::Defaults(); - auto options = GcsOptions::Defaults(); for (const auto& kv : options_map) { if (kv.first == "location") { options.default_bucket_location = kv.second; @@ -710,6 +801,13 @@ Result GcsOptions::FromUri(const arrow::internal::Uri& uri, options.scheme = kv.second; } else if (kv.first == "endpoint_override") { options.endpoint_override = kv.second; + } else if (kv.first == "retry_limit_seconds") { + double parsed_seconds = atof(kv.second.c_str()); + if (parsed_seconds <= 0.0) { + return Status::Invalid("retry_limit_seconds must be a positive integer, got '", + kv.second, "'"); + } + options.retry_limit_seconds = parsed_seconds; } else { return Status::Invalid("Unexpected query parameter in GCS URI: '", kv.first, "'"); } @@ -726,6 +824,7 @@ Result GcsOptions::FromUri(const std::string& uri_string, } std::string GcsFileSystem::type_name() const { return "gcs"; } +const GcsOptions& GcsFileSystem::options() const { return impl_->options(); } bool GcsFileSystem::Equals(const FileSystem& other) const { if (this == &other) { diff --git a/cpp/src/arrow/filesystem/gcsfs.h b/cpp/src/arrow/filesystem/gcsfs.h index c84374cdbb8..8458c7f2108 100644 --- a/cpp/src/arrow/filesystem/gcsfs.h +++ b/cpp/src/arrow/filesystem/gcsfs.h @@ -22,22 +22,56 @@ #include #include "arrow/filesystem/filesystem.h" +#include "arrow/util/optional.h" #include "arrow/util/uri.h" namespace arrow { namespace fs { -struct GcsCredentials; +// Opaque wrapper for GCS's library credentials to avoid exposing in Arrow headers. +struct GcsCredentialsHolder; +class GcsFileSystem; + +/// \brief Container for GCS Credentials and information necessary to recreate them. +class ARROW_EXPORT GcsCredentials { + public: + bool Equals(const GcsCredentials& other) const; + bool anonymous() const { return anonymous_; } + const std::string& access_token() const { return access_token_; } + TimePoint expiration() const { return expiration_; } + const std::string& target_service_account() const { return target_service_account_; } + const std::string& json_credentials() const { return json_credentials_; } + const std::shared_ptr& holder() const { return holder_; } + + private: + GcsCredentials() = default; + bool anonymous_ = false; + std::string access_token_; + TimePoint expiration_; + std::string target_service_account_; + std::string json_credentials_; + std::shared_ptr holder_; + friend class GcsFileSystem; + friend struct GcsOptions; +}; /// Options for the GcsFileSystem implementation. struct ARROW_EXPORT GcsOptions { - std::shared_ptr credentials; + /// \brief Equivalent to GcsOptions::Defaults(). + GcsOptions(); + GcsCredentials credentials; std::string endpoint_override; std::string scheme; /// \brief Location to use for creating buckets. std::string default_bucket_location; + /// \brief If set used to control total time allowed for retrying underlying + /// errors. + /// + /// The default policy is to retry for up to 15 minutes. + arrow::util::optional retry_limit_seconds; + /// \brief Default metadata for OpenOutputStream. /// /// This will be ignored if non-empty metadata is passed to OpenOutputStream. @@ -68,7 +102,7 @@ struct ARROW_EXPORT GcsOptions { /// tokens. Note that access tokens are time limited, you will need to manually refresh /// the tokens created by the out-of-band mechanism. static GcsOptions FromAccessToken(const std::string& access_token, - std::chrono::system_clock::time_point expiration); + TimePoint expiration); /// \brief Initialize with service account impersonation /// @@ -141,6 +175,7 @@ class ARROW_EXPORT GcsFileSystem : public FileSystem { ~GcsFileSystem() override = default; std::string type_name() const override; + const GcsOptions& options() const; bool Equals(const FileSystem& other) const override; diff --git a/cpp/src/arrow/filesystem/gcsfs_internal.cc b/cpp/src/arrow/filesystem/gcsfs_internal.cc index a75b51430f7..b8f0ab80b21 100644 --- a/cpp/src/arrow/filesystem/gcsfs_internal.cc +++ b/cpp/src/arrow/filesystem/gcsfs_internal.cc @@ -296,7 +296,10 @@ Result> FromObjectMetadata( } std::int64_t Depth(arrow::util::string_view path) { - return std::count(path.begin(), path.end(), fs::internal::kSep); + // The last slash is not counted towards depth because it represents a + // directory. + bool has_trailing_slash = !path.empty() && path.back() == '/'; + return std::count(path.begin(), path.end(), fs::internal::kSep) - has_trailing_slash; } } // namespace internal diff --git a/cpp/src/arrow/filesystem/gcsfs_test.cc b/cpp/src/arrow/filesystem/gcsfs_test.cc index 9eaacb0dc15..4d8f52ef48e 100644 --- a/cpp/src/arrow/filesystem/gcsfs_test.cc +++ b/cpp/src/arrow/filesystem/gcsfs_test.cc @@ -50,6 +50,7 @@ namespace bp = boost::process; namespace gc = google::cloud; namespace gcs = google::cloud::storage; +using ::testing::Eq; using ::testing::HasSubstr; using ::testing::IsEmpty; using ::testing::Not; @@ -99,6 +100,8 @@ class GcsTestbench : public ::testing::Environment { error_ = std::move(error); } + bool running() { return server_process_.running(); } + ~GcsTestbench() override { // Brutal shutdown, kill the full process group because the GCS testbench may launch // additional children. @@ -130,6 +133,7 @@ class GcsIntegrationTest : public ::testing::Test { void SetUp() override { ASSERT_THAT(Testbench(), NotNull()); ASSERT_THAT(Testbench()->error(), IsEmpty()); + ASSERT_TRUE(Testbench()->running()); // Initialize a PRNG with a small amount of entropy. generator_ = std::mt19937_64(std::random_device()()); @@ -140,7 +144,11 @@ class GcsIntegrationTest : public ::testing::Test { auto client = gcs::Client( google::cloud::Options{} .set("http://127.0.0.1:" + Testbench()->port()) - .set(gc::MakeInsecureCredentials())); + .set(gc::MakeInsecureCredentials()) + .set(std::chrono::seconds(5)) + .set( + gcs::LimitedTimeRetryPolicy(std::chrono::seconds(45)).clone())); + google::cloud::StatusOr bucket = client.CreateBucketForProject( PreexistingBucketName(), "ignored-by-testbench", gcs::BucketMetadata{}); ASSERT_TRUE(bucket.ok()) << "Failed to create bucket <" << PreexistingBucketName() @@ -167,6 +175,7 @@ class GcsIntegrationTest : public ::testing::Test { GcsOptions TestGcsOptions() { auto options = GcsOptions::Anonymous(); options.endpoint_override = "127.0.0.1:" + Testbench()->port(); + options.retry_limit_seconds = 60; return options; } @@ -291,15 +300,15 @@ TEST(GcsFileSystem, OptionsCompare) { TEST(GcsFileSystem, OptionsAnonymous) { GcsOptions a = GcsOptions::Anonymous(); - EXPECT_THAT(a.credentials, NotNull()); + EXPECT_THAT(a.credentials.holder(), NotNull()); + EXPECT_TRUE(a.credentials.anonymous()); EXPECT_EQ(a.scheme, "http"); } TEST(GcsFileSystem, OptionsFromUri) { std::string path; - GcsOptions options; - ASSERT_OK_AND_ASSIGN(options, GcsOptions::FromUri("gs://", &path)); + ASSERT_OK_AND_ASSIGN(GcsOptions options, GcsOptions::FromUri("gs://", &path)); EXPECT_EQ(options.default_bucket_location, ""); EXPECT_EQ(options.scheme, "https"); EXPECT_EQ(options.endpoint_override, ""); @@ -327,35 +336,48 @@ TEST(GcsFileSystem, OptionsFromUri) { ASSERT_OK_AND_ASSIGN( options, GcsOptions::FromUri("gs://mybucket/foo/bar/" - "?endpoint_override=localhost&scheme=http&location=us-west2", + "?endpoint_override=localhost&scheme=http&location=us-west2" + "&retry_limit_seconds=40.5", &path)); EXPECT_EQ(options.default_bucket_location, "us-west2"); EXPECT_EQ(options.scheme, "http"); EXPECT_EQ(options.endpoint_override, "localhost"); EXPECT_EQ(path, "mybucket/foo/bar"); + ASSERT_TRUE(options.retry_limit_seconds.has_value()); + EXPECT_EQ(*options.retry_limit_seconds, 40.5); // Missing bucket name ASSERT_RAISES(Invalid, GcsOptions::FromUri("gs:///foo/bar/", &path)); // Invalid option ASSERT_RAISES(Invalid, GcsOptions::FromUri("gs://mybucket/?xxx=zzz", &path)); + + // Invalid retry limit + ASSERT_RAISES(Invalid, + GcsOptions::FromUri("gs://foo/bar/?retry_limit_seconds=0", &path)); + ASSERT_RAISES(Invalid, + GcsOptions::FromUri("gs://foo/bar/?retry_limit_seconds=-1", &path)); } TEST(GcsFileSystem, OptionsAccessToken) { - auto a = GcsOptions::FromAccessToken( - "invalid-access-token-test-only", - std::chrono::system_clock::now() + std::chrono::minutes(5)); - EXPECT_THAT(a.credentials, NotNull()); + TimePoint expiration = std::chrono::system_clock::now() + std::chrono::minutes(5); + auto a = GcsOptions::FromAccessToken(/*access_token=*/"accesst", expiration); + EXPECT_THAT(a.credentials.holder(), NotNull()); + EXPECT_THAT(a.credentials.access_token(), Eq("accesst")); + EXPECT_THAT(a.credentials.expiration(), Eq(expiration)); EXPECT_EQ(a.scheme, "https"); } TEST(GcsFileSystem, OptionsImpersonateServiceAccount) { - auto base = GcsOptions::FromAccessToken( - "invalid-access-token-test-only", - std::chrono::system_clock::now() + std::chrono::minutes(5)); - auto a = GcsOptions::FromImpersonatedServiceAccount( - *base.credentials, "invalid-sa-test-only@my-project.iam.gserviceaccount.com"); - EXPECT_THAT(a.credentials, NotNull()); + TimePoint expiration = std::chrono::system_clock::now() + std::chrono::minutes(5); + auto base = GcsOptions::FromAccessToken(/*access_token=*/"at", expiration); + std::string account = "invalid-sa-test-only@my-project.iam.gserviceaccount.com"; + auto a = GcsOptions::FromImpersonatedServiceAccount(base.credentials, account); + EXPECT_THAT(a.credentials.holder(), NotNull()); + EXPECT_THAT(a.credentials.access_token(), Eq("at")); + EXPECT_THAT(a.credentials.expiration(), Eq(expiration)); + EXPECT_THAT(a.credentials.target_service_account(), Eq(account)); + EXPECT_EQ(a.scheme, "https"); } @@ -378,7 +400,8 @@ TEST(GcsFileSystem, OptionsServiceAccountCredentials) { })"""; auto a = GcsOptions::FromServiceAccountCredentials(kJsonKeyfileContents); - EXPECT_THAT(a.credentials, NotNull()); + EXPECT_THAT(a.credentials.holder(), NotNull()); + EXPECT_THAT(a.credentials.json_credentials(), kJsonKeyfileContents); EXPECT_EQ(a.scheme, "https"); } @@ -568,7 +591,50 @@ TEST_F(GcsIntegrationTest, GetFileInfoBucket) { ASSERT_RAISES(Invalid, fs->GetFileInfo("gs://" + PreexistingBucketName())); } -TEST_F(GcsIntegrationTest, GetFileInfoObject) { +TEST_F(GcsIntegrationTest, GetFileInfoObjectWithNestedStructure) { + // Adds detailed tests to handle cases of different edge cases + // with directory naming conventions (e.g. with and without slashes). + auto fs = GcsFileSystem::Make(TestGcsOptions()); + constexpr auto kObjectName = "test-object-dir/some_other_dir/another_dir/foo"; + ASSERT_OK_AND_ASSIGN( + auto output, + fs->OpenOutputStream(PreexistingBucketPath() + kObjectName, /*metadata=*/{})); + const auto data = std::string(kLoremIpsum); + ASSERT_OK(output->Write(data.data(), data.size())); + ASSERT_OK(output->Close()); + + // 0 is immediately after "/" lexicographically, ensure that this doesn't + // cause unexpected issues. + ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(PreexistingBucketPath() + + "test-object-dir/some_other_dir0", + /*metadata=*/{})); + ASSERT_OK(output->Write(data.data(), data.size())); + ASSERT_OK(output->Close()); + ASSERT_OK_AND_ASSIGN( + output, + fs->OpenOutputStream(PreexistingBucketPath() + kObjectName + "0", /*metadata=*/{})); + ASSERT_OK(output->Write(data.data(), data.size())); + ASSERT_OK(output->Close()); + + AssertFileInfo(fs.get(), PreexistingBucketPath() + kObjectName, FileType::File); + AssertFileInfo(fs.get(), PreexistingBucketPath() + kObjectName + "/", + FileType::NotFound); + AssertFileInfo(fs.get(), PreexistingBucketPath() + "test-object-dir", + FileType::Directory); + AssertFileInfo(fs.get(), PreexistingBucketPath() + "test-object-dir/", + FileType::Directory); + AssertFileInfo(fs.get(), PreexistingBucketPath() + "test-object-dir/some_other_dir", + FileType::Directory); + AssertFileInfo(fs.get(), PreexistingBucketPath() + "test-object-dir/some_other_dir/", + FileType::Directory); + + AssertFileInfo(fs.get(), PreexistingBucketPath() + "test-object-di", + FileType::NotFound); + AssertFileInfo(fs.get(), PreexistingBucketPath() + "test-object-dir/some_other_di", + FileType::NotFound); +} + +TEST_F(GcsIntegrationTest, GetFileInfoObjectNoExplicitObject) { auto fs = GcsFileSystem::Make(TestGcsOptions()); auto object = GcsClient().GetObjectMetadata(PreexistingBucketName(), PreexistingObjectName()); @@ -633,7 +699,7 @@ TEST_F(GcsIntegrationTest, GetFileInfoSelectorLimitedRecursion) { SCOPED_TRACE("Testing with max_recursion=" + std::to_string(max_recursion)); const auto max_depth = internal::Depth(internal::EnsureTrailingSlash(hierarchy.base_dir)) + - max_recursion; + max_recursion + 1; // Add 1 because files in a directory should be included std::vector expected; std::copy_if(hierarchy.contents.begin(), hierarchy.contents.end(), std::back_inserter(expected), [&](const arrow::fs::FileInfo& info) { @@ -727,6 +793,13 @@ TEST_F(GcsIntegrationTest, CreateDirUri) { ASSERT_RAISES(Invalid, fs->CreateDir("gs://" + RandomBucketName(), true)); } +TEST_F(GcsIntegrationTest, DeleteBucketDirSuccess) { + auto fs = GcsFileSystem::Make(TestGcsOptions()); + ASSERT_OK(fs->CreateDir("pyarrow-filesystem/", /*recursive=*/true)); + ASSERT_RAISES(Invalid, fs->CreateDir("/", false)); + ASSERT_OK(fs->DeleteDir("pyarrow-filesystem/")); +} + TEST_F(GcsIntegrationTest, DeleteDirSuccess) { auto fs = GcsFileSystem::Make(TestGcsOptions()); ASSERT_OK_AND_ASSIGN(auto hierarchy, CreateHierarchy(fs)); @@ -1257,6 +1330,16 @@ TEST_F(GcsIntegrationTest, OpenInputFileClosed) { ASSERT_RAISES(Invalid, stream->Seek(2)); } +TEST_F(GcsIntegrationTest, TestFileSystemFromUri) { + // Smoke test for FileSystemFromUri + ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri(std::string("gs://anonymous@") + + PreexistingBucketPath())); + EXPECT_EQ(fs->type_name(), "gcs"); + ASSERT_OK_AND_ASSIGN(auto fs2, FileSystemFromUri(std::string("gcs://anonymous@") + + PreexistingBucketPath())); + EXPECT_EQ(fs2->type_name(), "gcs"); +} + } // namespace } // namespace fs } // namespace arrow diff --git a/cpp/src/arrow/util/config.h.cmake b/cpp/src/arrow/util/config.h.cmake index 55bc2d01005..bd6447a20e0 100644 --- a/cpp/src/arrow/util/config.h.cmake +++ b/cpp/src/arrow/util/config.h.cmake @@ -45,6 +45,7 @@ #cmakedefine ARROW_IPC #cmakedefine ARROW_JSON +#cmakedefine ARROW_GCS #cmakedefine ARROW_S3 #cmakedefine ARROW_USE_NATIVE_INT128 #cmakedefine ARROW_WITH_OPENTELEMETRY diff --git a/dev/archery/archery/cli.py b/dev/archery/archery/cli.py index dcf15afafe1..8b4c38b42fa 100644 --- a/dev/archery/archery/cli.py +++ b/dev/archery/archery/cli.py @@ -162,6 +162,8 @@ def _apply_options(cmd, options): help="Build with Flight rpc support.") @click.option("--with-gandiva", default=None, type=BOOL, help="Build with Gandiva expression compiler support.") +@click.option("--with-gcs", default=None, type=BOOL, + help="Build Arrow with Google Cloud Storage (GCS) support.") @click.option("--with-hdfs", default=None, type=BOOL, help="Build the Arrow HDFS bridge.") @click.option("--with-hiveserver2", default=None, type=BOOL, diff --git a/dev/archery/archery/lang/cpp.py b/dev/archery/archery/lang/cpp.py index ac3b251f489..158cc48badd 100644 --- a/dev/archery/archery/lang/cpp.py +++ b/dev/archery/archery/lang/cpp.py @@ -52,7 +52,8 @@ def __init__(self, # Components with_compute=None, with_csv=None, with_cuda=None, with_dataset=None, with_filesystem=None, with_flight=None, - with_gandiva=None, with_hdfs=None, with_hiveserver2=None, + with_gandiva=None, with_gcs=None, with_hdfs=None, + with_hiveserver2=None, with_ipc=True, with_json=None, with_jni=None, with_mimalloc=None, with_jemalloc=None, with_parquet=None, with_plasma=None, with_python=True, @@ -95,6 +96,7 @@ def __init__(self, self.with_filesystem = with_filesystem self.with_flight = with_flight self.with_gandiva = with_gandiva + self.with_gcs = with_gcs self.with_hdfs = with_hdfs self.with_hiveserver2 = with_hiveserver2 self.with_ipc = with_ipc @@ -218,7 +220,7 @@ def _gen_defs(self): yield ("ARROW_FILESYSTEM", truthifier(self.with_filesystem)) yield ("ARROW_FLIGHT", truthifier(self.with_flight)) yield ("ARROW_GANDIVA", truthifier(self.with_gandiva)) - yield ("ARROW_PARQUET", truthifier(self.with_parquet)) + yield ("ARROW_GCS", truthifier(self.with_gcs)) yield ("ARROW_HDFS", truthifier(self.with_hdfs)) yield ("ARROW_HIVESERVER2", truthifier(self.with_hiveserver2)) yield ("ARROW_IPC", truthifier(self.with_ipc)) @@ -226,6 +228,7 @@ def _gen_defs(self): yield ("ARROW_JNI", truthifier(self.with_jni)) yield ("ARROW_MIMALLOC", truthifier(self.with_mimalloc)) yield ("ARROW_JEMALLOC", truthifier(self.with_jemalloc)) + yield ("ARROW_PARQUET", truthifier(self.with_parquet)) yield ("ARROW_PLASMA", truthifier(self.with_plasma)) yield ("ARROW_PYTHON", truthifier(self.with_python)) yield ("ARROW_S3", truthifier(self.with_s3)) diff --git a/dev/release/verify-release-candidate.sh b/dev/release/verify-release-candidate.sh index a512449aea5..cbf3c9c51e3 100755 --- a/dev/release/verify-release-candidate.sh +++ b/dev/release/verify-release-candidate.sh @@ -662,6 +662,9 @@ test_python() { if [ "${ARROW_GANDIVA}" = "ON" ]; then export PYARROW_WITH_GANDIVA=1 fi + if [ "${ARROW_GCS}" = "ON" ]; then + export PYARROW_WITH_GCS=1 + fi if [ "${ARROW_PLASMA}" = "ON" ]; then export PYARROW_WITH_PLASMA=1 fi @@ -694,6 +697,9 @@ import pyarrow.parquet if [ "${ARROW_GANDIVA}" == "ON" ]; then python -c "import pyarrow.gandiva" fi + if [ "${ARROW_GCS}" == "ON" ]; then + python -c "import pyarrow._gcsfs" + fi if [ "${ARROW_PLASMA}" == "ON" ]; then python -c "import pyarrow.plasma" fi @@ -701,6 +707,7 @@ import pyarrow.parquet python -c "import pyarrow._s3fs" fi + # Install test dependencies pip install -r requirements-test.txt @@ -1000,6 +1007,7 @@ test_linux_wheels() { } test_macos_wheels() { + local check_gcs=ON local check_s3=ON local check_flight=ON @@ -1019,6 +1027,7 @@ test_macos_wheels() { for platform in ${platform_tags}; do show_header "Testing Python ${pyver} wheel for platform ${platform}" if [[ "$platform" == *"10_9"* ]]; then + check_gcs=OFF check_s3=OFF fi @@ -1026,7 +1035,7 @@ test_macos_wheels() { VENV_ENV=wheel-${pyver}-${platform} PYTHON_VERSION=${pyver} maybe_setup_virtualenv || continue pip install pyarrow-${VERSION}-cp${pyver/.}-cp${python/.}-${platform}.whl - INSTALL_PYARROW=OFF ARROW_FLIGHT=${check_flight} ARROW_S3=${check_s3} \ + INSTALL_PYARROW=OFF ARROW_FLIGHT=${check_flight} ARROW_GCS=${check_gcs} ARROW_S3=${check_s3} \ ${ARROW_SOURCE_DIR}/ci/scripts/python_wheel_unix_test.sh ${ARROW_SOURCE_DIR} done done @@ -1155,9 +1164,9 @@ fi : ${ARROW_CUDA:=OFF} : ${ARROW_FLIGHT:=ON} : ${ARROW_GANDIVA:=ON} +: ${ARROW_GCS:=OFF} : ${ARROW_PLASMA:=ON} : ${ARROW_S3:=OFF} -: ${ARROW_GCS:=OFF} TEST_SUCCESS=no diff --git a/dev/tasks/conda-recipes/arrow-cpp/bld-pyarrow.bat b/dev/tasks/conda-recipes/arrow-cpp/bld-pyarrow.bat index f0e26f0bc82..a03a37722fa 100644 --- a/dev/tasks/conda-recipes/arrow-cpp/bld-pyarrow.bat +++ b/dev/tasks/conda-recipes/arrow-cpp/bld-pyarrow.bat @@ -17,6 +17,7 @@ pushd "%SRC_DIR%"\python SET ARROW_HOME=%LIBRARY_PREFIX% SET SETUPTOOLS_SCM_PRETEND_VERSION=%PKG_VERSION% SET PYARROW_BUILD_TYPE=release +SET PYARROW_WITH_GCS=1 SET PYARROW_WITH_S3=1 SET PYARROW_WITH_HDFS=1 SET PYARROW_WITH_DATASET=1 diff --git a/dev/tasks/conda-recipes/arrow-cpp/build-pyarrow.sh b/dev/tasks/conda-recipes/arrow-cpp/build-pyarrow.sh index 826942b62c7..6e23c5eed90 100644 --- a/dev/tasks/conda-recipes/arrow-cpp/build-pyarrow.sh +++ b/dev/tasks/conda-recipes/arrow-cpp/build-pyarrow.sh @@ -17,6 +17,7 @@ if [[ "${target_platform}" == "osx-arm64" ]]; then else export PYARROW_WITH_GANDIVA=1 fi +export PYARROW_WITH_GCS=1 export PYARROW_WITH_HDFS=1 export PYARROW_WITH_ORC=1 export PYARROW_WITH_PARQUET=1 diff --git a/dev/tasks/homebrew-formulae/apache-arrow.rb b/dev/tasks/homebrew-formulae/apache-arrow.rb index a22b62afc27..94a1a67a1a4 100644 --- a/dev/tasks/homebrew-formulae/apache-arrow.rb +++ b/dev/tasks/homebrew-formulae/apache-arrow.rb @@ -72,6 +72,7 @@ def install args = %W[ -DARROW_FLIGHT=ON -DARROW_GANDIVA=ON + -DARROW_GCS=ON -DARROW_INSTALL_NAME_RPATH=OFF -DARROW_JEMALLOC=ON -DARROW_MIMALLOC=ON @@ -91,7 +92,6 @@ def install -DCMAKE_FIND_PACKAGE_PREFER_CONFIG=TRUE -DPython3_EXECUTABLE=#{Formula["python@3.9"].bin/"python3"} ] - # Re-enable -DARROW_S3=ON and add back aws-sdk-cpp to depends_on in ARROW-6437 mkdir "build" do system "cmake", "../cpp", *std_cmake_args, *args diff --git a/dev/tasks/python-wheels/github.osx.amd64.yml b/dev/tasks/python-wheels/github.osx.amd64.yml index d0f834f40c1..c18e080ac21 100644 --- a/dev/tasks/python-wheels/github.osx.amd64.yml +++ b/dev/tasks/python-wheels/github.osx.amd64.yml @@ -93,7 +93,7 @@ jobs: $PYTHON -m venv build-env source build-env/bin/activate pip install --upgrade pip wheel - arrow/ci/scripts/python_wheel_macos_build.sh x86_64 $(pwd)/arrow $(pwd)/build + PYTHON=python arrow/ci/scripts/python_wheel_macos_build.sh x86_64 $(pwd)/arrow $(pwd)/build - name: Test Wheel shell: bash @@ -101,6 +101,8 @@ jobs: $PYTHON -m venv test-env source test-env/bin/activate pip install --upgrade pip wheel + pip install -r arrow/python/requirements-wheel-test.txt + PYTHON=python arrow/ci/scripts/install_gcs_testbench.sh default arrow/ci/scripts/python_wheel_unix_test.sh $(pwd)/arrow {{ macros.github_upload_releases("arrow/python/repaired_wheels/*.whl")|indent }} diff --git a/dev/tasks/python-wheels/github.osx.arm64.yml b/dev/tasks/python-wheels/github.osx.arm64.yml index 101d8c6ee8d..7198da7de47 100644 --- a/dev/tasks/python-wheels/github.osx.arm64.yml +++ b/dev/tasks/python-wheels/github.osx.arm64.yml @@ -134,6 +134,8 @@ jobs: $PYTHON -m venv test-arm64-env source test-arm64-env/bin/activate pip install --upgrade pip wheel + arch -arm64 pip install -r arrow/python/requirements-wheel-test.txt + PYTHON=python arch -arm64 arrow/ci/scripts/install_gcs_testbench.sh default arch -arm64 arrow/ci/scripts/python_wheel_unix_test.sh $(pwd)/arrow {% if arch == "universal2" %} @@ -145,6 +147,8 @@ jobs: $PYTHON -m venv test-amd64-env source test-amd64-env/bin/activate pip install --upgrade pip wheel + arch -x86_64 pip install -r arrow/python/requirements-wheel-test.txt + PYTHON=python arch -x86_64 arrow/ci/scripts/install_gcs_testbench.sh default arch -x86_64 arrow/ci/scripts/python_wheel_unix_test.sh $(pwd)/arrow {% endif %} diff --git a/dev/tasks/tasks.yml b/dev/tasks/tasks.yml index 95d404932cc..1bdb4d28213 100644 --- a/dev/tasks/tasks.yml +++ b/dev/tasks/tasks.yml @@ -468,9 +468,9 @@ tasks: {############################## Wheel OSX ####################################} -# enable S3 support from macOS 10.13 so we don't need to bundle curl, crypt and ssl -{% for macos_version, macos_codename, arrow_s3 in [("10.9", "mavericks", "OFF"), - ("10.13", "high-sierra", "ON")] %} +# enable S3 and GCS support from macOS 10.13 so we don't need to bundle curl, crypt and ssl +{% for macos_version, macos_codename, arrow_s3, arrow_gcs in [("10.9", "mavericks", "OFF", "OFF"), + ("10.13", "high-sierra", "ON", "ON")] %} {% set platform_tag = "macosx_{}_x86_64".format(macos_version.replace('.', '_')) %} wheel-macos-{{ macos_codename }}-{{ python_tag }}-amd64: @@ -480,6 +480,7 @@ tasks: python_version: "{{ python_version }}" macos_deployment_target: "{{ macos_version }}" arrow_s3: "{{ arrow_s3 }}" + arrow_gcs: "{{ arrow_gcs }}" artifacts: - pyarrow-{no_rc_version}-{{ python_tag }}-{{ abi_tag }}-{{ platform_tag }}.whl diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 7386c256fa4..a657f56bb2d 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -412,6 +412,10 @@ set(CYTHON_EXTENSIONS set(LINK_LIBS arrow_shared arrow_python_shared) +if(PYARROW_BUILD_GCS) + set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _gcsfs) +endif() + if(PYARROW_BUILD_S3) set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _s3fs) endif() diff --git a/python/asv-build.sh b/python/asv-build.sh index 7de5ff4a2c2..188085f927a 100755 --- a/python/asv-build.sh +++ b/python/asv-build.sh @@ -49,6 +49,7 @@ cmake -GNinja \ -DARROW_CXXFLAGS=$CXXFLAGS \ -DARROW_USE_GLOG=off \ -DARROW_FLIGHT=on \ + -DARROW_GCS=on \ -DARROW_ORC=on \ -DARROW_PARQUET=on \ -DARROW_PYTHON=on \ @@ -66,6 +67,7 @@ export SETUPTOOLS_SCM_PRETEND_VERSION=0.0.1 export PYARROW_BUILD_TYPE=release export PYARROW_PARALLEL=8 export PYARROW_WITH_FLIGHT=1 +export PYARROW_WITH_GCS=1 export PYARROW_WITH_ORC=1 export PYARROW_WITH_PARQUET=1 export PYARROW_WITH_PLASMA=1 diff --git a/python/pyarrow/_fs.pyx b/python/pyarrow/_fs.pyx index 75ad0ccd9b3..af5bebe7d6a 100644 --- a/python/pyarrow/_fs.pyx +++ b/python/pyarrow/_fs.pyx @@ -367,6 +367,9 @@ cdef class FileSystem(_Weakrefable): elif typ == 's3': from pyarrow._s3fs import S3FileSystem self = S3FileSystem.__new__(S3FileSystem) + elif typ == 'gcs': + from pyarrow._gcsfs import GcsFileSystem + self = GcsFileSystem.__new__(GcsFileSystem) elif typ == 'hdfs': from pyarrow._hdfs import HadoopFileSystem self = HadoopFileSystem.__new__(HadoopFileSystem) diff --git a/python/pyarrow/_gcsfs.pyx b/python/pyarrow/_gcsfs.pyx new file mode 100644 index 00000000000..9cff12fb2ea --- /dev/null +++ b/python/pyarrow/_gcsfs.pyx @@ -0,0 +1,188 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: language_level = 3 + +from pyarrow.lib cimport (check_status, pyarrow_wrap_metadata, + pyarrow_unwrap_metadata) +from pyarrow.lib import frombytes, tobytes, KeyValueMetadata, ensure_metadata +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.libarrow_fs cimport * +from pyarrow._fs cimport FileSystem, TimePoint_to_ns, PyDateTime_to_TimePoint +from cython.operator cimport dereference as deref + +from datetime import datetime, timedelta, timezone + + +cdef class GcsFileSystem(FileSystem): + """ + Google Cloud Storage (GCS) backed FileSystem implementation + + By default uses the process described in https://google.aip.dev/auth/4110 + to resolve credentials. If not running on Google Cloud Platform (GCP), + this generally requires the environment variable + GOOGLE_APPLICATION_CREDENTIALS to point to a JSON file + containing credentials. + + Note: GCS buckets are special and the operations available on them may be + limited or more expensive than expected compared to local file systems. + + Note: When pickling a GcsFileSystem that uses default credentials, resolution + credentials are not stored in the serialized data. Therefore, when unpickling + it is assumed that the necessary credentials are in place for the target + process. + + Parameters + ---------- + anonymous : boolean, default False + Whether to connect anonymously. + If true, will not attempt to look up credentials using standard GCP + configuration methods. + access_token : str, default None + GCP access token. If provided, temporary credentials will be fetched by + assuming this role; also, a `credential_token_expiration` must be + specified as well. + target_service_account : str, default None + An optional service account to try to impersonate when accessing GCS. This + requires the specified credential user or service account to have the necessary + permissions. + credential_token_expiration : datetime, default None + Expiration for credential generated with an access token. Must be specified + if `access_token` is specified. + default_bucket_location : str, default 'US' + GCP region to create buckets in. + scheme : str, default 'https' + GCS connection transport scheme. + endpoint_override : str, default None + Override endpoint with a connect string such as "localhost:9000" + default_metadata : mapping or pyarrow.KeyValueMetadata, default None + Default metadata for `open_output_stream`. This will be ignored if + non-empty metadata is passed to `open_output_stream`. + retry_time_limit : timedelta, default None + Set the maximum amount of time the GCS client will attempt to retry + transient errors. Subsecond granularity is ignored. + """ + + cdef: + CGcsFileSystem* gcsfs + + def __init__(self, *, bint anonymous=False, access_token=None, + target_service_account=None, credential_token_expiration=None, + default_bucket_location='US', + scheme=None, + endpoint_override=None, + default_metadata=None, + retry_time_limit=None): + cdef: + CGcsOptions options + shared_ptr[CGcsFileSystem] wrapped + double time_limit_seconds + + # Intentional use of truthiness because empty strings aren't valid and + # for reconstruction from pickling will give empty strings. + if anonymous and (target_service_account or access_token): + raise ValueError( + 'anonymous option is not compatible with target_service_account and ' + 'access_token' + ) + elif bool(access_token) != bool(credential_token_expiration): + raise ValueError( + 'access_token and credential_token_expiration must be ' + 'specified together' + ) + + elif anonymous: + options = CGcsOptions.Anonymous() + elif access_token: + if not isinstance(credential_token_expiration, datetime): + raise ValueError( + "credential_token_expiration must be a datetime") + options = CGcsOptions.FromAccessToken( + tobytes(access_token), + PyDateTime_to_TimePoint(credential_token_expiration)) + else: + options = CGcsOptions.Defaults() + + # Target service account requires base credentials so + # it is not part of the if/else chain above which only + # handles base credentials. + if target_service_account: + options = CGcsOptions.FromImpersonatedServiceAccount( + options.credentials, tobytes(target_service_account)) + + options.default_bucket_location = tobytes(default_bucket_location) + + if scheme is not None: + options.scheme = tobytes(scheme) + if endpoint_override is not None: + options.endpoint_override = tobytes(endpoint_override) + if default_metadata is not None: + options.default_metadata = pyarrow_unwrap_metadata( + ensure_metadata(default_metadata)) + if retry_time_limit is not None: + time_limit_seconds = retry_time_limit.total_seconds() + options.retry_limit_seconds = time_limit_seconds + + with nogil: + wrapped = GetResultValue(CGcsFileSystem.Make(options)) + + self.init( wrapped) + + cdef init(self, const shared_ptr[CFileSystem]& wrapped): + FileSystem.init(self, wrapped) + self.gcsfs = wrapped.get() + + @classmethod + def _reconstruct(cls, kwargs): + return cls(**kwargs) + + def _expiration_datetime_from_options(self): + expiration_ns = TimePoint_to_ns( + self.gcsfs.options().credentials.expiration()) + if expiration_ns == 0: + return None + return datetime.fromtimestamp(expiration_ns / 1.0e9, timezone.utc) + + def __reduce__(self): + cdef CGcsOptions opts = self.gcsfs.options() + service_account = frombytes(opts.credentials.target_service_account()) + expiration_dt = self._expiration_datetime_from_options() + retry_time_limit = None + if opts.retry_limit_seconds.has_value(): + retry_time_limit = timedelta( + seconds=opts.retry_limit_seconds.value()) + return ( + GcsFileSystem._reconstruct, (dict( + access_token=frombytes(opts.credentials.access_token()), + anonymous=opts.credentials.anonymous(), + credential_token_expiration=expiration_dt, + target_service_account=service_account, + scheme=frombytes(opts.scheme), + endpoint_override=frombytes(opts.endpoint_override), + default_bucket_location=frombytes( + opts.default_bucket_location), + default_metadata=pyarrow_wrap_metadata(opts.default_metadata), + retry_time_limit=retry_time_limit + ),)) + + @property + def default_bucket_location(self): + """ + The GCP location this filesystem will write to. + """ + return frombytes(self.gcsfs.options().default_bucket_location) diff --git a/python/pyarrow/conftest.py b/python/pyarrow/conftest.py index b114f7d1c60..638dad8568a 100644 --- a/python/pyarrow/conftest.py +++ b/python/pyarrow/conftest.py @@ -26,6 +26,7 @@ 'hypothesis', 'fastparquet', 'gandiva', + 'gcs', 'gdb', 'gzip', 'hdfs', @@ -56,6 +57,7 @@ 'fastparquet': False, 'flight': False, 'gandiva': False, + 'gcs': False, 'gdb': True, 'gzip': Codec.is_available('gzip'), 'hdfs': False, @@ -145,6 +147,13 @@ except ImportError: pass +try: + from pyarrow.fs import GcsFileSystem # noqa + defaults['gcs'] = True +except ImportError: + pass + + try: from pyarrow.fs import S3FileSystem # noqa defaults['s3'] = True diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py index f22eaf03041..932fc82789a 100644 --- a/python/pyarrow/fs.py +++ b/python/pyarrow/fs.py @@ -45,6 +45,11 @@ except ImportError: _not_imported.append("HadoopFileSystem") +try: + from pyarrow._gcsfs import GcsFileSystem # noqa +except ImportError: + _not_imported.append("GcsFileSystem") + try: from pyarrow._s3fs import ( # noqa S3FileSystem, S3LogLevel, initialize_s3, finalize_s3, diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index ba651af50b7..c55fd315b11 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -54,6 +54,13 @@ cdef extern from "arrow/util/decimal.h" namespace "arrow" nogil: cdef cppclass CDecimal256" arrow::Decimal256": c_string ToString(int32_t scale) const +cdef extern from "arrow/util/optional.h" namespace "arrow::util" nogil: + cdef cppclass c_optional"arrow::util::optional"[T]: + c_bool has_value() + T value() + c_optional(T&) + c_optional& operator=[U](U&) + cdef extern from "arrow/config.h" namespace "arrow" nogil: cdef cppclass CBuildInfo" arrow::BuildInfo": diff --git a/python/pyarrow/includes/libarrow_fs.pxd b/python/pyarrow/includes/libarrow_fs.pxd index e491233e88f..d0e15010031 100644 --- a/python/pyarrow/includes/libarrow_fs.pxd +++ b/python/pyarrow/includes/libarrow_fs.pxd @@ -200,6 +200,40 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil: cdef CResult[c_string] ResolveS3BucketRegion(const c_string& bucket) + cdef cppclass CGcsCredentials "arrow::fs::GcsCredentials": + c_bool anonymous() + CTimePoint expiration() + c_string access_token() + c_string target_service_account() + + cdef cppclass CGcsOptions "arrow::fs::GcsOptions": + CGcsCredentials credentials + c_string endpoint_override + c_string scheme + c_string default_bucket_location + c_optional[double] retry_limit_seconds + shared_ptr[const CKeyValueMetadata] default_metadata + c_bool Equals(const CS3Options& other) + + @staticmethod + CGcsOptions Defaults() + + @staticmethod + CGcsOptions Anonymous() + + @staticmethod + CGcsOptions FromAccessToken(const c_string& access_token, + CTimePoint expiration) + + @staticmethod + CGcsOptions FromImpersonatedServiceAccount(const CGcsCredentials& base_credentials, + c_string& target_service_account) + + cdef cppclass CGcsFileSystem "arrow::fs::GcsFileSystem": + @staticmethod + CResult[shared_ptr[CGcsFileSystem]] Make(const CGcsOptions& options) + CGcsOptions options() + cdef cppclass CHdfsOptions "arrow::fs::HdfsOptions": HdfsConnectionConfig connection_config int32_t buffer_size diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py index 0b7f1618b0e..a06ac92095b 100644 --- a/python/pyarrow/tests/conftest.py +++ b/python/pyarrow/tests/conftest.py @@ -18,6 +18,7 @@ import os import pathlib import subprocess +import sys from tempfile import TemporaryDirectory import pytest @@ -173,3 +174,23 @@ def s3_server(s3_connection): finally: if proc is not None: proc.kill() + + +@pytest.fixture(scope='session') +def gcs_server(): + port = find_free_port() + env = os.environ.copy() + args = [sys.executable, '-m', 'testbench', '--port', str(port)] + proc = None + try: + proc = subprocess.Popen(args, env=env) + except OSError as e: + pytest.skip(f"Command {args} failed to execute: {e}") + else: + yield { + 'connection': ('localhost', port), + 'process': proc, + } + finally: + if proc is not None: + proc.kill() diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py index 4fd72704a71..4bd532525ac 100644 --- a/python/pyarrow/tests/test_fs.py +++ b/python/pyarrow/tests/test_fs.py @@ -200,6 +200,34 @@ def subtree_localfs(request, tempdir, localfs): ) +@pytest.fixture +def gcsfs(request, gcs_server): + request.config.pyarrow.requires('gcs') + from pyarrow.fs import GcsFileSystem + + host, port = gcs_server['connection'] + bucket = 'pyarrow-filesystem/' + # Make sure the server is alive. + assert gcs_server['process'].poll() is None + + fs = GcsFileSystem( + endpoint_override=f'{host}:{port}', + scheme='http', + # Mock endpoint doesn't check credentials. + anonymous=True, + retry_time_limit=timedelta(seconds=45) + ) + fs.create_dir(bucket) + + yield dict( + fs=fs, + pathfn=bucket.__add__, + allow_move_dir=False, + allow_append_to_file=False, + ) + fs.delete_dir(bucket) + + @pytest.fixture def s3fs(request, s3_server): request.config.pyarrow.requires('s3') @@ -345,6 +373,11 @@ def py_fsspec_s3fs(request, s3_server): id='S3FileSystem', marks=pytest.mark.s3 ), + pytest.param( + pytest.lazy_fixture('gcsfs'), + id='GcsFileSystem', + marks=pytest.mark.gcs + ), pytest.param( pytest.lazy_fixture('hdfs'), id='HadoopFileSystem', @@ -869,6 +902,10 @@ def test_open_input_file(fs, pathfn): s.write(data) read_from = len(b'some data') * 512 + with fs.open_input_file(p) as f: + result = f.read() + assert result == data + with fs.open_input_file(p) as f: f.seek(read_from) result = f.read() @@ -951,7 +988,7 @@ def test_open_output_stream_metadata(fs, pathfn): assert f.read() == data got_metadata = f.metadata() - if fs.type_name == 's3' or 'mock' in fs.type_name: + if fs.type_name in ['s3', 'gcs'] or 'mock' in fs.type_name: for k, v in metadata.items(): assert got_metadata[k] == v.encode() else: @@ -1010,6 +1047,42 @@ def test_mockfs_mtime_roundtrip(mockfs): assert info.mtime == dt +@pytest.mark.gcs +def test_gcs_options(): + from pyarrow.fs import GcsFileSystem + dt = datetime.now() + fs = GcsFileSystem(access_token='abc', + target_service_account='service_account@apache', + credential_token_expiration=dt, + default_bucket_location='us-west2', + scheme='https', endpoint_override='localhost:8999') + assert isinstance(fs, GcsFileSystem) + assert fs.default_bucket_location == 'us-west2' + assert pickle.loads(pickle.dumps(fs)) == fs + + fs = GcsFileSystem() + assert isinstance(fs, GcsFileSystem) + assert pickle.loads(pickle.dumps(fs)) == fs + + fs = GcsFileSystem(anonymous=True) + assert isinstance(fs, GcsFileSystem) + assert pickle.loads(pickle.dumps(fs)) == fs + + fs = GcsFileSystem(default_metadata={"ACL": "authenticated-read", + "Content-Type": "text/plain"}) + assert isinstance(fs, GcsFileSystem) + assert pickle.loads(pickle.dumps(fs)) == fs + + with pytest.raises(ValueError): + GcsFileSystem(access_token='access') + with pytest.raises(ValueError): + GcsFileSystem(anonymous=True, access_token='secret') + with pytest.raises(ValueError): + GcsFileSystem(anonymous=True, target_service_account='acct') + with pytest.raises(ValueError): + GcsFileSystem(credential_token_expiration=datetime.now()) + + @pytest.mark.s3 def test_s3_options(): from pyarrow.fs import S3FileSystem @@ -1321,6 +1394,26 @@ def test_filesystem_from_uri_s3(s3_server): assert info.type == FileType.Directory +@pytest.mark.gcs +def test_filesystem_from_uri_gcs(gcs_server): + from pyarrow.fs import GcsFileSystem + + host, port = gcs_server['connection'] + + uri = ("gs://anonymous@" + + f"mybucket/foo/bar?scheme=http&endpoint_override={host}:{port}&" + + "retry_limit_seconds=5") + + fs, path = FileSystem.from_uri(uri) + assert isinstance(fs, GcsFileSystem) + assert path == "mybucket/foo/bar" + + fs.create_dir(path) + [info] = fs.get_file_info([path]) + assert info.path == path + assert info.type == FileType.Directory + + def test_py_filesystem(): handler = DummyHandler() fs = PyFileSystem(handler) diff --git a/python/pyarrow/tests/test_pandas.py b/python/pyarrow/tests/test_pandas.py index 078714df041..143bb0e33e0 100644 --- a/python/pyarrow/tests/test_pandas.py +++ b/python/pyarrow/tests/test_pandas.py @@ -226,7 +226,7 @@ def test_rangeindex_doesnt_warn(self): with pytest.warns(None) as record: _check_pandas_roundtrip(df, preserve_index=True) - assert len(record) == 0 + assert len(record) == 0, [r.message for r in record] def test_multiindex_columns(self): columns = pd.MultiIndex.from_arrays([ @@ -277,7 +277,7 @@ def test_multiindex_doesnt_warn(self): with pytest.warns(None) as record: _check_pandas_roundtrip(df, preserve_index=True) - assert len(record) == 0 + assert len(record) == 0, [r.message for r in record] def test_integer_index_column(self): df = pd.DataFrame([(1, 'a'), (2, 'b'), (3, 'c')]) diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi index 0a54b401b1f..de9677f3d40 100644 --- a/python/pyarrow/types.pxi +++ b/python/pyarrow/types.pxi @@ -1145,7 +1145,7 @@ cdef class KeyValueMetadata(_Metadata, Mapping): return result -cdef KeyValueMetadata ensure_metadata(object meta, c_bool allow_none=False): +cpdef KeyValueMetadata ensure_metadata(object meta, c_bool allow_none=False): if allow_none and meta is None: return None elif isinstance(meta, KeyValueMetadata): diff --git a/python/setup.py b/python/setup.py index 2486a8959e6..b572be1cee4 100755 --- a/python/setup.py +++ b/python/setup.py @@ -113,6 +113,8 @@ def run(self): ('with-parquet', None, 'build the Parquet extension'), ('with-parquet-encryption', None, 'build the Parquet encryption extension'), + ('with-gcs', None, + 'build the Google Cloud Storage (GCS) extension'), ('with-s3', None, 'build the Amazon S3 extension'), ('with-static-parquet', None, 'link parquet statically'), ('with-static-boost', None, 'link boost statically'), @@ -155,6 +157,8 @@ def initialize_options(self): if not hasattr(sys, 'gettotalrefcount'): self.build_type = 'release' + self.with_gcs = strtobool( + os.environ.get('PYARROW_WITH_GCS', '0')) self.with_s3 = strtobool( os.environ.get('PYARROW_WITH_S3', '0')) self.with_hdfs = strtobool( @@ -216,6 +220,7 @@ def initialize_options(self): '_parquet_encryption', '_orc', '_plasma', + '_gcsfs', '_s3fs', '_substrait', '_hdfs', @@ -281,6 +286,7 @@ def append_cmake_bool(value, varname): append_cmake_bool(self.with_parquet_encryption, 'PYARROW_BUILD_PARQUET_ENCRYPTION') append_cmake_bool(self.with_plasma, 'PYARROW_BUILD_PLASMA') + append_cmake_bool(self.with_gcs, 'PYARROW_BUILD_GCS') append_cmake_bool(self.with_s3, 'PYARROW_BUILD_S3') append_cmake_bool(self.with_hdfs, 'PYARROW_BUILD_HDFS') append_cmake_bool(self.with_tensorflow, 'PYARROW_USE_TENSORFLOW') @@ -447,6 +453,8 @@ def _failure_permitted(self, name): return True if name == '_substrait' and not self.with_substrait: return True + if name == '_gcsfs' and not self.with_gcs: + return True if name == '_s3fs' and not self.with_s3: return True if name == '_hdfs' and not self.with_hdfs: