From de1795ac6bcf5f75d2e6c0e78c25a2f53643b8f1 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 27 Aug 2019 12:35:00 +0200 Subject: [PATCH] ARROW-453: [C++] Filesystem implementation for S3 Unit testing is done using the Minio standalone S3 server. --- .travis.yml | 1 + appveyor.yml | 3 + ci/appveyor-cpp-setup.bat | 5 + ci/conda_env_cpp.yml | 1 + ci/cpp-msvc-build-main.bat | 1 + ci/travis_before_script_cpp.sh | 4 + ci/travis_install_linux.sh | 8 + cpp/CMakeLists.txt | 4 + cpp/cmake_modules/DefineOptions.cmake | 2 + cpp/cmake_modules/ThirdpartyToolchain.cmake | 87 +- cpp/src/arrow/CMakeLists.txt | 4 + cpp/src/arrow/filesystem/CMakeLists.txt | 11 + cpp/src/arrow/filesystem/filesystem.cc | 21 + cpp/src/arrow/filesystem/filesystem.h | 3 + cpp/src/arrow/filesystem/filesystem_test.cc | 12 +- cpp/src/arrow/filesystem/path_util.cc | 7 + cpp/src/arrow/filesystem/path_util.h | 9 + cpp/src/arrow/filesystem/s3_internal.h | 150 ++ cpp/src/arrow/filesystem/s3fs.cc | 1204 +++++++++++++++++ cpp/src/arrow/filesystem/s3fs.h | 120 ++ .../arrow/filesystem/s3fs_narrative_test.cc | 229 ++++ cpp/src/arrow/filesystem/s3fs_test.cc | 727 ++++++++++ cpp/src/arrow/filesystem/test_util.cc | 167 ++- cpp/src/arrow/filesystem/test_util.h | 39 +- cpp/src/arrow/testing/util.cc | 7 + cpp/src/arrow/testing/util.h | 1 + cpp/thirdparty/versions.txt | 2 + 27 files changed, 2770 insertions(+), 59 deletions(-) create mode 100644 cpp/src/arrow/filesystem/s3_internal.h create mode 100644 cpp/src/arrow/filesystem/s3fs.cc create mode 100644 cpp/src/arrow/filesystem/s3fs.h create mode 100644 cpp/src/arrow/filesystem/s3fs_narrative_test.cc create mode 100644 cpp/src/arrow/filesystem/s3fs_test.cc diff --git a/.travis.yml b/.travis.yml index 6dd5489ab7b..1261f3877a5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -75,6 +75,7 @@ matrix: - ARROW_TRAVIS_ORC=1 - ARROW_TRAVIS_PARQUET=1 - ARROW_TRAVIS_PLASMA=1 + - ARROW_TRAVIS_S3=1 - ARROW_TRAVIS_USE_SYSTEM_JAVA=1 - ARROW_TRAVIS_USE_TOOLCHAIN=1 # TODO: This fails in re2 code diff --git a/appveyor.yml b/appveyor.yml index e84d78c362e..a508b0ee269 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -52,8 +52,10 @@ environment: CLCACHE_SERVER: 1 CLCACHE_COMPRESS: 1 CLCACHE_COMPRESSLEVEL: 6 + ARROW_BUILD_FLIGHT: "OFF" ARROW_BUILD_GANDIVA: "OFF" ARROW_LLVM_VERSION: "7.0.*" + ARROW_S3: "OFF" PYTHON: "3.6" ARCH: "64" @@ -67,6 +69,7 @@ environment: - JOB: "Toolchain" GENERATOR: Ninja CONFIGURATION: "Release" + ARROW_S3: "ON" ARROW_BUILD_FLIGHT: "ON" ARROW_BUILD_GANDIVA: "ON" # NOTE: Since ARROW-5403 we have disabled the static CRT build diff --git a/ci/appveyor-cpp-setup.bat b/ci/appveyor-cpp-setup.bat index 0d2139aa376..90b0e24572f 100644 --- a/ci/appveyor-cpp-setup.bat +++ b/ci/appveyor-cpp-setup.bat @@ -58,3 +58,8 @@ if "%USE_CLCACHE%" == "true" ( clcache -s powershell.exe -Command "Start-Process clcache-server" ) + +if "%ARROW_S3%" == "ON" ( + @rem Download Minio somewhere on PATH, for unit tests + appveyor DownloadFile https://dl.min.io/server/minio/release/windows-amd64/minio.exe -FileName C:\Windows\Minio.exe || exit /B +) diff --git a/ci/conda_env_cpp.yml b/ci/conda_env_cpp.yml index fd21ed8d3fa..2aff2cb72ff 100644 --- a/ci/conda_env_cpp.yml +++ b/ci/conda_env_cpp.yml @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +aws-sdk-cpp benchmark=1.4.1 boost-cpp>=1.68.0 brotli diff --git a/ci/cpp-msvc-build-main.bat b/ci/cpp-msvc-build-main.bat index 8755a520071..62f6b5dfd6c 100644 --- a/ci/cpp-msvc-build-main.bat +++ b/ci/cpp-msvc-build-main.bat @@ -71,6 +71,7 @@ cmake -G "%GENERATOR%" %CMAKE_ARGS% ^ -DCMAKE_CXX_FLAGS_RELEASE="/MD %CMAKE_CXX_FLAGS_RELEASE%" ^ -DARROW_FLIGHT=%ARROW_BUILD_FLIGHT% ^ -DARROW_GANDIVA=%ARROW_BUILD_GANDIVA% ^ + -DARROW_S3=%ARROW_S3% ^ -DARROW_PARQUET=ON ^ -DPARQUET_BUILD_EXECUTABLES=ON ^ -DARROW_PYTHON=ON ^ diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh index 5ee5eb79beb..1c5b6e652e1 100755 --- a/ci/travis_before_script_cpp.sh +++ b/ci/travis_before_script_cpp.sh @@ -120,6 +120,10 @@ if [ "$ARROW_TRAVIS_PYTHON" == "1" ]; then fi fi +if [ "$ARROW_TRAVIS_S3" == "1" ]; then + CMAKE_COMMON_FLAGS="$CMAKE_COMMON_FLAGS -DARROW_S3=ON" +fi + if [ "$ARROW_TRAVIS_PARQUET" == "1" ]; then CMAKE_COMMON_FLAGS="$CMAKE_COMMON_FLAGS \ -DARROW_PARQUET=ON \ diff --git a/ci/travis_install_linux.sh b/ci/travis_install_linux.sh index 441d50d9b16..a5283139a83 100755 --- a/ci/travis_install_linux.sh +++ b/ci/travis_install_linux.sh @@ -42,6 +42,14 @@ if [ "$ARROW_TRAVIS_GANDIVA" == "1" ]; then sudo apt-get install -y -qq llvm-$ARROW_LLVM_MAJOR_VERSION-dev fi +if [ "$ARROW_TRAVIS_S3" == "1" ]; then + # Download the Minio S3 server into PATH + S3FS_DIR=~/.local/bin/ + mkdir -p $S3FS_DIR + wget --directory-prefix $S3FS_DIR https://dl.min.io/server/minio/release/linux-amd64/minio + chmod +x $S3FS_DIR/minio +fi + if [ "$ARROW_TRAVIS_USE_SYSTEM" == "1" ]; then if [ "$DISTRO_CODENAME" == "xenial" ]; then # TODO(ARROW-4761): Install libzstd-dev once we support zstd<1 diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 758ac352f88..2d248890969 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -655,6 +655,10 @@ if(ARROW_USE_GLOG) add_definitions("-DARROW_USE_GLOG") endif() +if(ARROW_S3) + list(APPEND ARROW_LINK_LIBS ${AWSSDK_LINK_LIBRARIES}) +endif() + add_custom_target(arrow_dependencies) add_custom_target(arrow_benchmark_dependencies) add_custom_target(arrow_test_dependencies) diff --git a/cpp/cmake_modules/DefineOptions.cmake b/cpp/cmake_modules/DefineOptions.cmake index 6d34bd28272..bc7528a3387 100644 --- a/cpp/cmake_modules/DefineOptions.cmake +++ b/cpp/cmake_modules/DefineOptions.cmake @@ -174,6 +174,8 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") define_option(ARROW_JSON "Build Arrow with JSON support (requires RapidJSON)" ON) + define_option(ARROW_S3 "Build Arrow with S3 support (requires the AWS SDK for C++)" OFF) + #---------------------------------------------------------------------- set_option_category("Thirdparty toolchain") diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake index 65ef4092a76..74267af71e7 100644 --- a/cpp/cmake_modules/ThirdpartyToolchain.cmake +++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake @@ -51,6 +51,7 @@ endmacro() # Resolve the dependencies set(ARROW_THIRDPARTY_DEPENDENCIES + AWSSDK benchmark BOOST Brotli @@ -127,7 +128,9 @@ foreach(DEPENDENCY ${ARROW_THIRDPARTY_DEPENDENCIES}) endforeach() macro(build_dependency DEPENDENCY_NAME) - if("${DEPENDENCY_NAME}" STREQUAL "benchmark") + if("${DEPENDENCY_NAME}" STREQUAL "AWSSDK") + build_awssdk() + elseif("${DEPENDENCY_NAME}" STREQUAL "benchmark") build_benchmark() elseif("${DEPENDENCY_NAME}" STREQUAL "Brotli") build_brotli() @@ -255,6 +258,13 @@ foreach(_VERSION_ENTRY ${TOOLCHAIN_VERSIONS_TXT}) set(${_LIB_NAME} "${_LIB_VERSION}") endforeach() +if(DEFINED ENV{ARROW_AWSSDK_URL}) + set(AWSSDK_SOURCE_URL "$ENV{ARROW_AWSSDK_URL}") +else() + set(AWSSDK_SOURCE_URL + "https://github.com/aws/aws-sdk-cpp/archive/${AWSSDK_VERSION}.tar.gz") +endif() + if(DEFINED ENV{ARROW_BOOST_URL}) set(BOOST_SOURCE_URL "$ENV{ARROW_BOOST_URL}") else() @@ -910,7 +920,7 @@ if(BREW_BIN AND NOT OPENSSL_ROOT_DIR) set(OPENSSL_ROOT_DIR ${OPENSSL_BREW_PREFIX}) endif() endif() -if(PARQUET_REQUIRE_ENCRYPTION OR ARROW_FLIGHT) +if(PARQUET_REQUIRE_ENCRYPTION OR ARROW_FLIGHT OR ARROW_S3) # This must work find_package(OpenSSL ${ARROW_OPENSSL_REQUIRED_VERSION} REQUIRED) set(ARROW_USE_OPENSSL ON) @@ -2370,6 +2380,79 @@ if(ARROW_ORC) message(STATUS "Found ORC headers: ${ORC_INCLUDE_DIR}") endif() +# ---------------------------------------------------------------------- +# AWS SDK for C++ + +macro(build_awssdk) + message( + FATAL_ERROR "FIXME: Building AWS C++ SDK from source will link with wrong libcrypto") + message("Building AWS C++ SDK from source") + + set(AWSSDK_PREFIX "${THIRDPARTY_DIR}/awssdk_ep-install") + set(AWSSDK_INCLUDE_DIR "${AWSSDK_PREFIX}/include") + + if(WIN32) + # On Windows, need to match build types + set(AWSSDK_BUILD_TYPE ${CMAKE_BUILD_TYPE}) + else() + # Otherwise, always build in release mode. + # Especially with gcc, debug builds can fail with "asm constraint" errors: + # https://github.com/TileDB-Inc/TileDB/issues/1351 + set(AWSSDK_BUILD_TYPE Release) + endif() + + set(AWSSDK_CMAKE_ARGS + -DCMAKE_BUILD_TYPE=Release + -DCMAKE_INSTALL_LIBDIR=lib + -DBUILD_ONLY=s3;core;config + -DENABLE_UNITY_BUILD=on + -DENABLE_TESTING=off + "-DCMAKE_C_FLAGS=${EP_C_FLAGS}" + "-DCMAKE_INSTALL_PREFIX=${AWSSDK_PREFIX}") + + set( + AWSSDK_CORE_SHARED_LIB + "${AWSSDK_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}aws-cpp-sdk-core${CMAKE_SHARED_LIBRARY_SUFFIX}" + ) + set( + AWSSDK_S3_SHARED_LIB + "${AWSSDK_PREFIX}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}aws-cpp-sdk-s3${CMAKE_SHARED_LIBRARY_SUFFIX}" + ) + set(AWSSDK_SHARED_LIBS "${AWSSDK_CORE_SHARED_LIB}" "${AWSSDK_S3_SHARED_LIB}") + + externalproject_add(awssdk_ep + ${EP_LOG_OPTIONS} + URL ${AWSSDK_SOURCE_URL} + CMAKE_ARGS ${AWSSDK_CMAKE_ARGS} + BUILD_BYPRODUCTS ${AWSSDK_SHARED_LIBS}) + + file(MAKE_DIRECTORY ${AWSSDK_INCLUDE_DIR}) + + add_dependencies(toolchain awssdk_ep) + set(AWSSDK_LINK_LIBRARIES ${AWSSDK_SHARED_LIBS}) + set(AWSSDK_VENDORED TRUE) +endmacro() + +if(ARROW_S3) + # See https://aws.amazon.com/blogs/developer/developer-experience-of-the-aws-sdk-for-c-now-simplified-by-cmake/ + + # Need to customize the find_package() call, so cannot call resolve_dependency() + if(AWSSDK_SOURCE STREQUAL "AUTO") + find_package(AWSSDK COMPONENTS config s3 transfer) + if(NOT AWSSDK_FOUND) + build_awssdk() + endif() + elseif(AWSSDK_SOURCE STREQUAL "BUNDLED") + build_awssdk() + elseif(AWSSDK_SOURCE STREQUAL "SYSTEM") + find_package(AWSSDK REQUIRED COMPONENTS config s3 transfer) + endif() + + include_directories(SYSTEM ${AWSSDK_INCLUDE_DIR}) + message(STATUS "Found AWS SDK headers: ${AWSSDK_INCLUDE_DIR}") + message(STATUS "Found AWS SDK libraries: ${AWSSDK_LINK_LIBRARIES}") +endif() + # Write out the package configurations. configure_file("src/arrow/util/config.h.cmake" "src/arrow/util/config.h") diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 186a786e286..f3b9a37c3a6 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -157,6 +157,10 @@ if(ARROW_JSON) json/reader.cc) endif() +if(ARROW_S3) + set(ARROW_SRCS ${ARROW_SRCS} filesystem/s3fs.cc) +endif() + if(ARROW_WITH_URIPARSER) set(ARROW_SRCS ${ARROW_SRCS} util/uri.cc) endif() diff --git a/cpp/src/arrow/filesystem/CMakeLists.txt b/cpp/src/arrow/filesystem/CMakeLists.txt index d1bbad4321a..148fa748403 100644 --- a/cpp/src/arrow/filesystem/CMakeLists.txt +++ b/cpp/src/arrow/filesystem/CMakeLists.txt @@ -20,3 +20,14 @@ arrow_install_all_headers("arrow/filesystem") add_arrow_test(filesystem_test) add_arrow_test(localfs_test) + +if(ARROW_S3) + add_arrow_test(s3fs_test) + + if(ARROW_BUILD_TESTS) + add_executable(arrow-s3fs-narrative-test s3fs_narrative_test.cc) + target_link_libraries(arrow-s3fs-narrative-test ${ARROW_TEST_LINK_LIBS} + ${GFLAGS_LIBRARIES} GTest::GTest) + add_dependencies(arrow-tests arrow-s3fs-narrative-test) + endif() +endif() diff --git a/cpp/src/arrow/filesystem/filesystem.cc b/cpp/src/arrow/filesystem/filesystem.cc index 189615a1ca6..12b1f7cb001 100644 --- a/cpp/src/arrow/filesystem/filesystem.cc +++ b/cpp/src/arrow/filesystem/filesystem.cc @@ -20,6 +20,7 @@ #include "arrow/filesystem/filesystem.h" #include "arrow/filesystem/path_util.h" #include "arrow/util/logging.h" +#include "arrow/util/macros.h" namespace arrow { namespace fs { @@ -45,6 +46,26 @@ std::string ToString(FileType ftype) { } } +// For googletest +ARROW_EXPORT std::ostream& operator<<(std::ostream& os, FileType ftype) { +#define FILE_TYPE_CASE(value_name) \ + case FileType::value_name: \ + os << "FileType::" ARROW_STRINGIFY(value_name); \ + break; + + switch (ftype) { + FILE_TYPE_CASE(NonExistent) + FILE_TYPE_CASE(Unknown) + FILE_TYPE_CASE(File) + FILE_TYPE_CASE(Directory) + default: + ARROW_LOG(FATAL) << "Invalid FileType value: " << static_cast(ftype); + } + +#undef FILE_TYPE_CASE + return os; +} + std::string FileStats::base_name() const { return internal::GetAbstractPathParent(path_).second; } diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h index a11b2a7cc04..0e9b4390503 100644 --- a/cpp/src/arrow/filesystem/filesystem.h +++ b/cpp/src/arrow/filesystem/filesystem.h @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -70,6 +71,8 @@ enum class ARROW_EXPORT FileType { ARROW_EXPORT std::string ToString(FileType); +ARROW_EXPORT std::ostream& operator<<(std::ostream& os, FileType); + static const int64_t kNoSize = -1; static const TimePoint kNoTime = TimePoint(TimePoint::duration(-1)); diff --git a/cpp/src/arrow/filesystem/filesystem_test.cc b/cpp/src/arrow/filesystem/filesystem_test.cc index e028498a1f0..47ddc16273f 100644 --- a/cpp/src/arrow/filesystem/filesystem_test.cc +++ b/cpp/src/arrow/filesystem/filesystem_test.cc @@ -542,17 +542,13 @@ TEST_F(TestSubTreeFileSystem, GetTargetStatsSingle) { FileStats st; ASSERT_OK(subfs_->CreateDir("AB/CD")); - ASSERT_OK(subfs_->GetTargetStats("AB", &st)); - AssertFileStats(st, "AB", FileType::Directory, time_); - ASSERT_OK(subfs_->GetTargetStats("AB/CD", &st)); - AssertFileStats(st, "AB/CD", FileType::Directory, time_); + AssertFileStats(subfs_.get(), "AB", FileType::Directory, time_); + AssertFileStats(subfs_.get(), "AB/CD", FileType::Directory, time_); CreateFile("ab", "data"); - ASSERT_OK(subfs_->GetTargetStats("ab", &st)); - AssertFileStats(st, "ab", FileType::File, time_, 4); + AssertFileStats(subfs_.get(), "ab", FileType::File, time_, 4); - ASSERT_OK(subfs_->GetTargetStats("non-existent", &st)); - AssertFileStats(st, "non-existent", FileType::NonExistent); + AssertFileStats(subfs_.get(), "non-existent", FileType::NonExistent); } TEST_F(TestSubTreeFileSystem, GetTargetStatsVector) { diff --git a/cpp/src/arrow/filesystem/path_util.cc b/cpp/src/arrow/filesystem/path_util.cc index bc0c8b27534..cfc59a6f8d4 100644 --- a/cpp/src/arrow/filesystem/path_util.cc +++ b/cpp/src/arrow/filesystem/path_util.cc @@ -114,6 +114,13 @@ std::string EnsureTrailingSlash(const std::string& s) { } } +util::string_view RemoveTrailingSlash(util::string_view key) { + if (!key.empty() && key.back() == kSep) { + key.remove_suffix(1); + } + return key; +} + } // namespace internal } // namespace fs } // namespace arrow diff --git a/cpp/src/arrow/filesystem/path_util.h b/cpp/src/arrow/filesystem/path_util.h index cf0d0e3dda5..d90c3510e67 100644 --- a/cpp/src/arrow/filesystem/path_util.h +++ b/cpp/src/arrow/filesystem/path_util.h @@ -22,6 +22,7 @@ #include #include "arrow/status.h" +#include "arrow/util/string_view.h" namespace arrow { namespace fs { @@ -56,6 +57,9 @@ std::string ConcatAbstractPath(const std::string& base, const std::string& stem) ARROW_EXPORT std::string EnsureTrailingSlash(const std::string& s); +ARROW_EXPORT +util::string_view RemoveTrailingSlash(util::string_view s); + // Join the components of an abstract path. template std::string JoinAbstractPath(StringIt it, StringIt end) { @@ -69,6 +73,11 @@ std::string JoinAbstractPath(StringIt it, StringIt end) { return path; } +template +std::string JoinAbstractPath(const StringRange& range) { + return JoinAbstractPath(range.begin(), range.end()); +} + } // namespace internal } // namespace fs } // namespace arrow diff --git a/cpp/src/arrow/filesystem/s3_internal.h b/cpp/src/arrow/filesystem/s3_internal.h new file mode 100644 index 00000000000..6f444aa55fb --- /dev/null +++ b/cpp/src/arrow/filesystem/s3_internal.h @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include +#include +#include +#include + +#include "arrow/filesystem/filesystem.h" +#include "arrow/status.h" +#include "arrow/util/logging.h" +#include "arrow/util/string_view.h" + +namespace arrow { +namespace fs { +namespace internal { + +#define ARROW_AWS_ASSIGN_OR_RAISE_IMPL(outcome_name, lhs, rexpr) \ + auto outcome_name = (rexpr); \ + if (!outcome_name.IsSuccess()) { \ + return ErrorToStatus(outcome_name.GetError()); \ + } \ + lhs = std::move(outcome_name).GetResultWithOwnership(); + +#define ARROW_AWS_ASSIGN_OR_RAISE_NAME(x, y) ARROW_CONCAT(x, y) + +#define ARROW_AWS_ASSIGN_OR_RAISE(lhs, rexpr) \ + ARROW_AWS_ASSIGN_OR_RAISE_IMPL( \ + ARROW_AWS_ASSIGN_OR_RAISE_NAME(_aws_error_or_value, __COUNTER__), lhs, rexpr); + +template +inline bool IsConnectError(const Aws::Client::AWSError& error) { + if (error.GetErrorType() == Aws::Client::CoreErrors::NETWORK_CONNECTION) { + return true; + } + // Sometimes Minio may fail with a 503 error + // (exception name: XMinioServerNotInitialized, + // message: "Server not initialized, please try again") + auto http_code = static_cast(error.GetResponseCode()); + switch (http_code) { + case 502: // Bad gateway + case 503: // Service unavailable + case 504: // Gateway timeout + return true; + default: + return false; + } +} + +inline bool IsNotFound(const Aws::Client::AWSError& error) { + const auto error_type = error.GetErrorType(); + return (error_type == Aws::S3::S3Errors::NO_SUCH_BUCKET || + error_type == Aws::S3::S3Errors::RESOURCE_NOT_FOUND); +} + +inline bool IsAlreadyExists(const Aws::Client::AWSError& error) { + const auto error_type = error.GetErrorType(); + return (error_type == Aws::S3::S3Errors::BUCKET_ALREADY_EXISTS || + error_type == Aws::S3::S3Errors::BUCKET_ALREADY_OWNED_BY_YOU); +} + +template +inline Status ErrorToStatus(const Aws::Client::AWSError& error) { + // XXX Handle fine-grained error types + // See + // https://sdk.amazonaws.com/cpp/api/LATEST/namespace_aws_1_1_s3.html#ae3f82f8132b619b6e91c88a9f1bde371 + return Status::IOError("AWS Error [code ", static_cast(error.GetErrorType()), + "]: ", error.GetMessage()); +} + +template +inline Status OutcomeToStatus(const Aws::Utils::Outcome& outcome) { + if (outcome.IsSuccess()) { + return Status::OK(); + } else { + return ErrorToStatus(outcome.GetError()); + } +} + +inline Aws::String ToAwsString(const std::string& s) { + // Direct construction of Aws::String from std::string doesn't work because + // it uses a specific Allocator class. + return Aws::String(s.begin(), s.end()); +} + +inline util::string_view FromAwsString(const Aws::String& s) { + return {s.data(), s.length()}; +} + +inline Aws::String ToURLEncodedAwsString(const std::string& s) { + return Aws::Utils::StringUtils::URLEncode(s.data()); +} + +inline TimePoint ToTimePoint(const Aws::Utils::DateTime& dt) { + return std::chrono::time_point_cast(dt.UnderlyingTimestamp()); +} + +// A connect retry strategy with a controlled max duration. + +class ConnectRetryStrategy : public Aws::Client::RetryStrategy { + public: + static const int32_t kDefaultRetryInterval = 200; /* milliseconds */ + static const int32_t kDefaultMaxRetryDuration = 4000; /* milliseconds */ + + explicit ConnectRetryStrategy(int32_t retry_interval = kDefaultRetryInterval, + int32_t max_retry_duration = kDefaultMaxRetryDuration) + : retry_interval_(retry_interval), max_retry_duration_(max_retry_duration) {} + + bool ShouldRetry(const Aws::Client::AWSError& error, + long attempted_retries) const override { // NOLINT + if (!IsConnectError(error)) { + // Not a connect error, don't retry + return false; + } + return attempted_retries * retry_interval_ < max_retry_duration_; + } + + long CalculateDelayBeforeNextRetry( // NOLINT + const Aws::Client::AWSError& error, + long attempted_retries) const override { // NOLINT + return retry_interval_; + } + + protected: + int32_t retry_interval_; + int32_t max_retry_duration_; +}; + +} // namespace internal +} // namespace fs +} // namespace arrow diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc new file mode 100644 index 00000000000..c696d8c7841 --- /dev/null +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -0,0 +1,1204 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/filesystem/s3fs.h" + +#include +#include +#include +#include +#include + +#ifdef _WIN32 +// Undefine preprocessor macros that interfere with AWS function / method names +#ifdef GetMessage +#undef GetMessage +#endif +#ifdef GetObject +#undef GetObject +#endif +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/buffer.h" +#include "arrow/filesystem/filesystem.h" +#include "arrow/filesystem/path_util.h" +#include "arrow/filesystem/s3_internal.h" +#include "arrow/io/interfaces.h" +#include "arrow/io/memory.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace fs { + +using ::Aws::Client::AWSError; +using ::Aws::S3::S3Errors; +namespace S3Model = Aws::S3::Model; + +using ::arrow::fs::internal::ConnectRetryStrategy; +using ::arrow::fs::internal::ErrorToStatus; +using ::arrow::fs::internal::FromAwsString; +using ::arrow::fs::internal::IsAlreadyExists; +using ::arrow::fs::internal::IsNotFound; +using ::arrow::fs::internal::OutcomeToStatus; +using ::arrow::fs::internal::ToAwsString; +using ::arrow::fs::internal::ToTimePoint; +using ::arrow::fs::internal::ToURLEncodedAwsString; + +const char* kS3DefaultRegion = "us-east-1"; + +static const char kSep = '/'; + +static std::mutex aws_init_lock; +static Aws::SDKOptions aws_options; +static std::atomic aws_initialized(false); + +Status InitializeS3(const S3GlobalOptions& options) { + std::lock_guard lock(aws_init_lock); + Aws::Utils::Logging::LogLevel aws_log_level; + +#define LOG_LEVEL_CASE(level_name) \ + case S3LogLevel::level_name: \ + aws_log_level = Aws::Utils::Logging::LogLevel::level_name; \ + break; + + switch (options.log_level) { + LOG_LEVEL_CASE(Fatal) + LOG_LEVEL_CASE(Error) + LOG_LEVEL_CASE(Warn) + LOG_LEVEL_CASE(Info) + LOG_LEVEL_CASE(Debug) + LOG_LEVEL_CASE(Trace) + default: + aws_log_level = Aws::Utils::Logging::LogLevel::Off; + } + +#undef LOG_LEVEL_CASE + + aws_options.loggingOptions.logLevel = aws_log_level; + // By default the AWS SDK logs to files, log to console instead + aws_options.loggingOptions.logger_create_fn = [] { + return std::make_shared( + aws_options.loggingOptions.logLevel); + }; + Aws::InitAPI(aws_options); + aws_initialized.store(true); + return Status::OK(); +} + +Status FinalizeS3() { + std::lock_guard lock(aws_init_lock); + Aws::ShutdownAPI(aws_options); + aws_initialized.store(false); + return Status::OK(); +} + +namespace { + +Status CheckS3Initialized() { + if (!aws_initialized.load()) { + return Status::Invalid( + "S3 subsystem not initialized; please call InitializeS3() " + "before carrying out any S3-related operation"); + } + return Status::OK(); +} + +// XXX Sanitize paths by removing leading slash? + +struct S3Path { + std::string full_path; + std::string bucket; + std::string key; + std::vector key_parts; + + static Status FromString(const std::string& s, S3Path* out) { + auto first_sep = s.find_first_of(kSep); + if (first_sep == 0) { + return Status::Invalid("Path cannot start with a separator ('", s, "')"); + } + if (first_sep == std::string::npos) { + *out = {s, s, "", {}}; + return Status::OK(); + } + out->full_path = s; + out->bucket = s.substr(0, first_sep); + out->key = s.substr(first_sep + 1); + out->key_parts = internal::SplitAbstractPath(out->key); + return internal::ValidateAbstractPathParts(out->key_parts); + } + + Aws::String ToURLEncodedAwsString() const { + // URL-encode individual parts, not the '/' separator + Aws::String res; + res += internal::ToURLEncodedAwsString(bucket); + for (const auto& part : key_parts) { + res += kSep; + res += internal::ToURLEncodedAwsString(part); + } + return res; + } + + S3Path parent() const { + DCHECK(!key_parts.empty()); + auto parent = S3Path{"", bucket, "", key_parts}; + parent.key_parts.pop_back(); + parent.key = internal::JoinAbstractPath(parent.key_parts); + parent.full_path = parent.bucket + kSep + parent.key; + return parent; + } + + bool has_parent() const { return !key.empty(); } + + bool empty() const { return bucket.empty() && key.empty(); } + + bool operator==(const S3Path& other) const { + return bucket == other.bucket && key == other.key; + } +}; + +// XXX return in OutcomeToStatus instead? +Status PathNotFound(const S3Path& path) { + return Status::IOError("Path does not exist '", path.full_path, "'"); +} + +Status PathNotFound(const std::string& bucket, const std::string& key) { + return Status::IOError("Path does not exist '", bucket, kSep, key, "'"); +} + +Status NotAFile(const S3Path& path) { + return Status::IOError("Not a regular file: '", path.full_path, "'"); +} + +Status ValidateFilePath(const S3Path& path) { + if (path.bucket.empty() || path.key.empty()) { + return NotAFile(path); + } + return Status::OK(); +} + +std::string FormatRange(int64_t start, int64_t length) { + // Format a HTTP range header value + std::stringstream ss; + ss << "bytes=" << start << "-" << start + length - 1; + return ss.str(); +} + +Status GetObjectRange(Aws::S3::S3Client* client, const S3Path& path, int64_t start, + int64_t length, S3Model::GetObjectResult* out) { + S3Model::GetObjectRequest req; + req.SetBucket(ToAwsString(path.bucket)); + req.SetKey(ToAwsString(path.key)); + req.SetRange(ToAwsString(FormatRange(start, length))); + ARROW_AWS_ASSIGN_OR_RAISE(*out, client->GetObject(req)); + return Status::OK(); +} + +// A RandomAccessFile that reads from a S3 object +class ObjectInputFile : public io::RandomAccessFile { + public: + ObjectInputFile(Aws::S3::S3Client* client, const S3Path& path) + : client_(client), path_(path) {} + + Status Init() { + // Issue a HEAD Object to get the content-length and ensure any + // errors (e.g. file not found) don't wait until the first Read() call. + S3Model::HeadObjectRequest req; + req.SetBucket(ToAwsString(path_.bucket)); + req.SetKey(ToAwsString(path_.key)); + + auto outcome = client_->HeadObject(req); + if (!outcome.IsSuccess()) { + if (IsNotFound(outcome.GetError())) { + return PathNotFound(path_); + } else { + return ErrorToStatus(outcome.GetError()); + } + } + content_length_ = outcome.GetResult().GetContentLength(); + DCHECK_GE(content_length_, 0); + return Status::OK(); + } + + Status CheckClosed() const { + if (closed_) { + return Status::Invalid("Operation on closed stream"); + } + return Status::OK(); + } + + Status CheckPosition(int64_t position, const char* action) const { + if (position < 0) { + return Status::Invalid("Cannot ", action, " from negative position"); + } + if (position > content_length_) { + return Status::IOError("Cannot ", action, " past end of file"); + } + return Status::OK(); + } + + // RandomAccessFile APIs + + Status Close() override { + closed_ = true; + return Status::OK(); + } + + bool closed() const override { return closed_; } + + Status Tell(int64_t* position) const override { + RETURN_NOT_OK(CheckClosed()); + + *position = pos_; + return Status::OK(); + } + + Status GetSize(int64_t* size) override { + RETURN_NOT_OK(CheckClosed()); + + *size = content_length_; + return Status::OK(); + } + + Status Seek(int64_t position) override { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "seek")); + + pos_ = position; + return Status::OK(); + } + + Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, + void* out) override { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "read")); + + // Read the desired range of bytes + S3Model::GetObjectResult result; + RETURN_NOT_OK(GetObjectRange(client_, path_, position, nbytes, &result)); + + auto& stream = result.GetBody(); + stream.read(reinterpret_cast(out), nbytes); + // NOTE: the stream is a stringstream by default, there is no actual error + // to check for. However, stream.fail() may return true if EOF is reached. + *bytes_read = stream.gcount(); + return Status::OK(); + } + + Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) override { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "read")); + + // No need to allocate more than the remaining number of bytes + nbytes = std::min(nbytes, content_length_ - position); + + std::shared_ptr buf; + int64_t bytes_read; + RETURN_NOT_OK(AllocateResizableBuffer(nbytes, &buf)); + if (nbytes > 0) { + RETURN_NOT_OK(ReadAt(position, nbytes, &bytes_read, buf->mutable_data())); + DCHECK_LE(bytes_read, nbytes); + RETURN_NOT_OK(buf->Resize(bytes_read)); + } + *out = std::move(buf); + return Status::OK(); + } + + Status Read(int64_t nbytes, int64_t* bytes_read, void* out) override { + RETURN_NOT_OK(ReadAt(pos_, nbytes, bytes_read, out)); + pos_ += *bytes_read; + return Status::OK(); + } + + Status Read(int64_t nbytes, std::shared_ptr* out) override { + RETURN_NOT_OK(ReadAt(pos_, nbytes, out)); + pos_ += (*out)->size(); + return Status::OK(); + } + + protected: + Aws::S3::S3Client* client_; + S3Path path_; + bool closed_ = false; + int64_t pos_ = 0; + int64_t content_length_ = -1; +}; + +// A non-copying istream. +// See https://stackoverflow.com/questions/35322033/aws-c-sdk-uploadpart-times-out +// https://stackoverflow.com/questions/13059091/creating-an-input-stream-from-constant-memory + +class StringViewStream : Aws::Utils::Stream::PreallocatedStreamBuf, public std::iostream { + public: + StringViewStream(const void* data, int64_t nbytes) + : Aws::Utils::Stream::PreallocatedStreamBuf( + reinterpret_cast(const_cast(data)), + static_cast(nbytes)), + std::iostream(this) {} +}; + +// Minimum size for each part of a multipart upload, except for the last part. +// AWS doc says "5 MB" but it's not clear whether those are MB or MiB, +// so I chose the safer value. +// (see https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html) +static constexpr int64_t kMinimumPartUpload = 5 * 1024 * 1024; + +// An OutputStream that writes to a S3 object +class ObjectOutputStream : public io::OutputStream { + public: + ObjectOutputStream(Aws::S3::S3Client* client, const S3Path& path) + : client_(client), path_(path) {} + + ~ObjectOutputStream() override { + if (!closed_) { + auto st = Abort(); + if (!st.ok()) { + ARROW_LOG(ERROR) << "Could not abort multipart upload: " << st; + } + } + } + + Status Init() { + // Initiate the multi-part upload + S3Model::CreateMultipartUploadRequest req; + req.SetBucket(ToAwsString(path_.bucket)); + req.SetKey(ToAwsString(path_.key)); + + auto outcome = client_->CreateMultipartUpload(req); + if (!outcome.IsSuccess()) { + return ErrorToStatus(outcome.GetError()); + } + upload_id_ = outcome.GetResult().GetUploadId(); + closed_ = false; + return Status::OK(); + } + + Status Abort() { + S3Model::AbortMultipartUploadRequest req; + req.SetBucket(ToAwsString(path_.bucket)); + req.SetKey(ToAwsString(path_.key)); + req.SetUploadId(upload_id_); + + auto outcome = client_->AbortMultipartUpload(req); + if (!outcome.IsSuccess()) { + return ErrorToStatus(outcome.GetError()); + } + current_part_.reset(); + closed_ = true; + return Status::OK(); + } + + // OutputStream interface + + Status Close() override { + if (closed_) { + return Status::OK(); + } + + if (current_part_) { + // Upload last part + RETURN_NOT_OK(CommitCurrentPart()); + } + + // S3 mandates at least one part, upload an empty one if necessary + if (completed_upload_.GetParts().empty()) { + RETURN_NOT_OK(UploadPart("", 0)); + } + DCHECK(completed_upload_.PartsHasBeenSet()); + + S3Model::CompleteMultipartUploadRequest req; + req.SetBucket(ToAwsString(path_.bucket)); + req.SetKey(ToAwsString(path_.key)); + req.SetUploadId(upload_id_); + req.SetMultipartUpload(completed_upload_); + + auto outcome = client_->CompleteMultipartUpload(req); + if (!outcome.IsSuccess()) { + return ErrorToStatus(outcome.GetError()); + } + + closed_ = true; + return Status::OK(); + } + + bool closed() const override { return closed_; } + + Status Tell(int64_t* position) const override { + if (closed_) { + return Status::Invalid("Operation on closed stream"); + } + *position = pos_; + return Status::OK(); + } + + Status Write(const void* data, int64_t nbytes) override { + if (closed_) { + return Status::Invalid("Operation on closed stream"); + } + + // With up to 10000 parts in an upload (S3 limit), a stream writing chunks + // of exactly 5MB would be limited to 50GB total. To avoid that, we bump + // the upload threshold every 100 parts. So the pattern is: + // - part 1 to 99: 5MB threshold + // - part 100 to 199: 10MB threshold + // - part 200 to 299: 15MB threshold + // ... + // - part 9900 to 9999: 500MB threshold + // So the total size limit is 2475000MB or ~2.4TB, while keeping manageable + // chunk sizes and avoiding too much buffering in the common case of a small-ish + // stream. If the limit's not enough, we can revisit. + if (part_number_ % 100 == 0) { + part_upload_threshold_ += kMinimumPartUpload; + } + + if (!current_part_ && nbytes >= part_upload_threshold_) { + // No current part and data large enough, upload it directly without copying + RETURN_NOT_OK(UploadPart(data, nbytes)); + pos_ += nbytes; + return Status::OK(); + } + // Can't upload data on its own, need to buffer it + if (!current_part_) { + RETURN_NOT_OK(io::BufferOutputStream::Create( + part_upload_threshold_, default_memory_pool(), ¤t_part_)); + current_part_size_ = 0; + } + RETURN_NOT_OK(current_part_->Write(data, nbytes)); + pos_ += nbytes; + current_part_size_ += nbytes; + + if (current_part_size_ >= part_upload_threshold_) { + // Current part large enough, upload it + RETURN_NOT_OK(CommitCurrentPart()); + } + + return Status::OK(); + } + + Status Flush() override { + if (closed_) { + return Status::Invalid("Operation on closed stream"); + } + return Status::OK(); + } + + Status CommitCurrentPart() { + std::shared_ptr buf; + RETURN_NOT_OK(current_part_->Finish(&buf)); + current_part_.reset(); + current_part_size_ = 0; + return UploadPart(buf->data(), buf->size()); + } + + Status UploadPart(const void* data, int64_t nbytes) { + S3Model::UploadPartRequest req; + req.SetBucket(ToAwsString(path_.bucket)); + req.SetKey(ToAwsString(path_.key)); + req.SetUploadId(upload_id_); + req.SetPartNumber(part_number_); + req.SetContentLength(nbytes); + req.SetBody(std::make_shared(data, nbytes)); + + auto outcome = client_->UploadPart(req); + if (!outcome.IsSuccess()) { + return ErrorToStatus(outcome.GetError()); + } + // Append ETag and part number for this uploaded part + // (will be needed for upload completion in Close()) + S3Model::CompletedPart part; + part.SetPartNumber(part_number_); + part.SetETag(outcome.GetResult().GetETag()); + completed_upload_.AddParts(std::move(part)); + ++part_number_; + return Status::OK(); + } + + protected: + Aws::S3::S3Client* client_; + S3Path path_; + Aws::String upload_id_; + S3Model::CompletedMultipartUpload completed_upload_; + bool closed_ = true; + int64_t pos_ = 0; + int32_t part_number_ = 1; + std::shared_ptr current_part_; + int64_t current_part_size_ = 0; + int64_t part_upload_threshold_ = kMinimumPartUpload; +}; + +// This function assumes st->path() is already set +Status FileObjectToStats(const S3Model::HeadObjectResult& obj, FileStats* st) { + st->set_type(FileType::File); + st->set_size(static_cast(obj.GetContentLength())); + st->set_mtime(ToTimePoint(obj.GetLastModified())); + return Status::OK(); +} + +Status FileObjectToStats(const S3Model::Object& obj, FileStats* st) { + st->set_type(FileType::File); + st->set_size(static_cast(obj.GetSize())); + st->set_mtime(ToTimePoint(obj.GetLastModified())); + return Status::OK(); +} + +} // namespace + +class S3FileSystem::Impl { + public: + S3Options options_; + Aws::Client::ClientConfiguration client_config_; + Aws::Auth::AWSCredentials credentials_; + std::unique_ptr client_; + + const int32_t kListObjectsMaxKeys = 1000; + // At most 1000 keys per multiple-delete request + const int32_t kMultipleDeleteMaxKeys = 1000; + // Limit recursing depth, since a recursion bomb can be created + const int32_t kMaxNestingDepth = 100; + + explicit Impl(S3Options options) : options_(std::move(options)) {} + + Status Init() { + credentials_ = {ToAwsString(options_.access_key), ToAwsString(options_.secret_key)}; + client_config_.region = ToAwsString(options_.region); + client_config_.endpointOverride = ToAwsString(options_.endpoint_override); + if (options_.scheme == "http") { + client_config_.scheme = Aws::Http::Scheme::HTTP; + } else if (options_.scheme == "https") { + client_config_.scheme = Aws::Http::Scheme::HTTPS; + } else { + return Status::Invalid("Invalid S3 connection scheme '", options_.scheme, "'"); + } + client_config_.retryStrategy = std::make_shared(); + bool use_virtual_addressing = options_.endpoint_override.empty(); + client_.reset( + new Aws::S3::S3Client(credentials_, client_config_, + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, + use_virtual_addressing)); + return Status::OK(); + } + + // Create a bucket. Successful if bucket already exists. + Status CreateBucket(const std::string& bucket) { + S3Model::CreateBucketConfiguration config; + S3Model::CreateBucketRequest req; + config.SetLocationConstraint( + S3Model::BucketLocationConstraintMapper::GetBucketLocationConstraintForName( + ToAwsString(options_.region))); + req.SetBucket(ToAwsString(bucket)); + req.SetCreateBucketConfiguration(config); + + auto outcome = client_->CreateBucket(req); + if (!outcome.IsSuccess() && !IsAlreadyExists(outcome.GetError())) { + return ErrorToStatus(outcome.GetError()); + } + return Status::OK(); + } + + // Create an object with empty contents. Successful if object already exists. + Status CreateEmptyObject(const std::string& bucket, const std::string& key) { + S3Model::PutObjectRequest req; + req.SetBucket(ToAwsString(bucket)); + req.SetKey(ToAwsString(key)); + return OutcomeToStatus(client_->PutObject(req)); + } + + Status CreateEmptyDir(const std::string& bucket, const std::string& key) { + DCHECK(!key.empty()); + return CreateEmptyObject(bucket, key + kSep); + } + + Status DeleteObject(const std::string& bucket, const std::string& key) { + S3Model::DeleteObjectRequest req; + req.SetBucket(ToAwsString(bucket)); + req.SetKey(ToAwsString(key)); + return OutcomeToStatus(client_->DeleteObject(req)); + } + + Status CopyObject(const S3Path& src_path, const S3Path& dest_path) { + S3Model::CopyObjectRequest req; + req.SetBucket(ToAwsString(dest_path.bucket)); + req.SetKey(ToAwsString(dest_path.key)); + // Copy source "Must be URL-encoded" according to AWS SDK docs. + req.SetCopySource(src_path.ToURLEncodedAwsString()); + return OutcomeToStatus(client_->CopyObject(req)); + } + + // On Minio, an empty "directory" doesn't satisfy the same API requests as + // a non-empty "directory". This is a Minio-specific quirk, but we need + // to handle it for unit testing. + + Status IsEmptyDirectory(const std::string& bucket, const std::string& key, bool* out) { + S3Model::HeadObjectRequest req; + req.SetBucket(ToAwsString(bucket)); + req.SetKey(ToAwsString(key) + kSep); + + auto outcome = client_->HeadObject(req); + if (outcome.IsSuccess()) { + *out = true; + return Status::OK(); + } + if (IsNotFound(outcome.GetError())) { + *out = false; + return Status::OK(); + } + return ErrorToStatus(outcome.GetError()); + } + + Status IsEmptyDirectory(const S3Path& path, bool* out) { + return IsEmptyDirectory(path.bucket, path.key, out); + } + + Status IsNonEmptyDirectory(const S3Path& path, bool* out) { + S3Model::ListObjectsV2Request req; + req.SetBucket(ToAwsString(path.bucket)); + req.SetPrefix(ToAwsString(path.key) + kSep); + req.SetDelimiter(Aws::String() + kSep); + req.SetMaxKeys(1); + auto outcome = client_->ListObjectsV2(req); + if (outcome.IsSuccess()) { + *out = outcome.GetResult().GetKeyCount() > 0; + return Status::OK(); + } + if (IsNotFound(outcome.GetError())) { + *out = false; + return Status::OK(); + } + return ErrorToStatus(outcome.GetError()); + } + + // List objects under a given prefix, issuing continuation requests if necessary + template + Status ListObjectsV2(const std::string& bucket, const std::string& prefix, + ResultCallable&& result_callable, ErrorCallable&& error_callable) { + S3Model::ListObjectsV2Request req; + req.SetBucket(ToAwsString(bucket)); + if (!prefix.empty()) { + req.SetPrefix(ToAwsString(prefix) + kSep); + } + req.SetDelimiter(Aws::String() + kSep); + req.SetMaxKeys(kListObjectsMaxKeys); + + while (true) { + auto outcome = client_->ListObjectsV2(req); + if (!outcome.IsSuccess()) { + return error_callable(outcome.GetError()); + } + const auto& result = outcome.GetResult(); + RETURN_NOT_OK(result_callable(result)); + // Was the result limited by max-keys? If so, use the continuation token + // to fetch further results. + if (!result.GetIsTruncated()) { + break; + } + DCHECK(!result.GetNextContinuationToken().empty()); + req.SetContinuationToken(result.GetNextContinuationToken()); + } + return Status::OK(); + } + + // Recursive workhorse for GetTargetStats(Selector...) + Status Walk(const Selector& select, const std::string& bucket, const std::string& key, + std::vector* out) { + int32_t nesting_depth = 0; + return Walk(select, bucket, key, nesting_depth, out); + } + + Status Walk(const Selector& select, const std::string& bucket, const std::string& key, + int32_t nesting_depth, std::vector* out) { + if (nesting_depth >= kMaxNestingDepth) { + return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (", + kMaxNestingDepth, ")"); + } + + bool is_empty = true; + std::vector child_keys; + + auto handle_results = [&](const S3Model::ListObjectsV2Result& result) -> Status { + // Walk "files" + for (const auto& obj : result.GetContents()) { + is_empty = false; + FileStats st; + const auto child_key = internal::RemoveTrailingSlash(FromAwsString(obj.GetKey())); + if (child_key == util::string_view(key)) { + // Amazon can return the "directory" key itself as part of the results, skip + continue; + } + std::stringstream child_path; + child_path << bucket << kSep << child_key; + st.set_path(child_path.str()); + RETURN_NOT_OK(FileObjectToStats(obj, &st)); + out->push_back(std::move(st)); + } + // Walk "directories" + for (const auto& prefix : result.GetCommonPrefixes()) { + is_empty = false; + const auto child_key = + internal::RemoveTrailingSlash(FromAwsString(prefix.GetPrefix())); + std::stringstream ss; + ss << bucket << kSep << child_key; + FileStats st; + st.set_path(ss.str()); + st.set_type(FileType::Directory); + out->push_back(std::move(st)); + if (select.recursive) { + child_keys.emplace_back(child_key); + } + } + return Status::OK(); + }; + + auto handle_error = [&](const AWSError& error) -> Status { + if (select.allow_non_existent && IsNotFound(error)) { + return Status::OK(); + } + return ErrorToStatus(error); + }; + + RETURN_NOT_OK( + ListObjectsV2(bucket, key, std::move(handle_results), std::move(handle_error))); + + // Recurse + for (const auto& child_key : child_keys) { + RETURN_NOT_OK(Walk(select, bucket, child_key, nesting_depth + 1, out)); + } + + // If no contents were found, perhaps it's an empty "directory", + // or perhaps it's a non-existent entry. Check. + if (is_empty && !select.allow_non_existent) { + RETURN_NOT_OK(IsEmptyDirectory(bucket, key, &is_empty)); + if (!is_empty) { + return PathNotFound(bucket, key); + } + } + return Status::OK(); + } + + Status WalkForDeleteDir(const std::string& bucket, const std::string& key, + std::vector* file_keys, + std::vector* dir_keys) { + int32_t nesting_depth = 0; + return WalkForDeleteDir(bucket, key, nesting_depth, file_keys, dir_keys); + } + + Status WalkForDeleteDir(const std::string& bucket, const std::string& key, + int32_t nesting_depth, std::vector* file_keys, + std::vector* dir_keys) { + if (nesting_depth >= kMaxNestingDepth) { + return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (", + kMaxNestingDepth, ")"); + } + + std::vector child_keys; + + auto handle_results = [&](const S3Model::ListObjectsV2Result& result) -> Status { + // Walk "files" + for (const auto& obj : result.GetContents()) { + file_keys->emplace_back(FromAwsString(obj.GetKey())); + } + // Walk "directories" + for (const auto& prefix : result.GetCommonPrefixes()) { + auto child_key = FromAwsString(prefix.GetPrefix()); + dir_keys->emplace_back(child_key); + child_keys.emplace_back(internal::RemoveTrailingSlash(child_key)); + } + return Status::OK(); + }; + + auto handle_error = [&](const AWSError& error) -> Status { + return ErrorToStatus(error); + }; + + RETURN_NOT_OK( + ListObjectsV2(bucket, key, std::move(handle_results), std::move(handle_error))); + + // Recurse + for (const auto& child_key : child_keys) { + RETURN_NOT_OK( + WalkForDeleteDir(bucket, child_key, nesting_depth + 1, file_keys, dir_keys)); + } + return Status::OK(); + } + + // Delete multiple objects at once + Status DeleteObjects(const std::string& bucket, const std::vector& keys) { + const auto chunk_size = static_cast(kMultipleDeleteMaxKeys); + for (size_t start = 0; start < keys.size(); start += chunk_size) { + S3Model::DeleteObjectsRequest req; + S3Model::Delete del; + for (size_t i = start; i < std::min(keys.size(), chunk_size); ++i) { + del.AddObjects(S3Model::ObjectIdentifier().WithKey(ToAwsString(keys[i]))); + } + req.SetBucket(ToAwsString(bucket)); + req.SetDelete(std::move(del)); + auto outcome = client_->DeleteObjects(req); + if (!outcome.IsSuccess()) { + return ErrorToStatus(outcome.GetError()); + } + // Also need to check per-key errors, even on successful outcome + // See + // https://docs.aws.amazon.com/fr_fr/AmazonS3/latest/API/multiobjectdeleteapi.html + const auto& errors = outcome.GetResult().GetErrors(); + if (!errors.empty()) { + std::stringstream ss; + ss << "Got the following " << errors.size() + << " errors when deleting objects in S3 bucket '" << bucket << "':\n"; + for (const auto& error : errors) { + ss << "- key '" << error.GetKey() << "': " << error.GetMessage() << "\n"; + } + return Status::IOError(ss.str()); + } + } + return Status::OK(); + } + + Status DeleteDir(const std::string& bucket, const std::string& key) { + std::vector file_keys; + std::vector dir_keys; + RETURN_NOT_OK(WalkForDeleteDir(bucket, key, &file_keys, &dir_keys)); + if (file_keys.empty() && dir_keys.empty() && !key.empty()) { + // No contents found, is it an empty directory? + bool exists = false; + RETURN_NOT_OK(IsEmptyDirectory(bucket, key, &exists)); + if (!exists) { + return PathNotFound(bucket, key); + } + } + // First delete all "files", then delete all child "directories" + RETURN_NOT_OK(DeleteObjects(bucket, file_keys)); + // XXX This doesn't seem necessary on Minio + RETURN_NOT_OK(DeleteObjects(bucket, dir_keys)); + // Finally, delete the base dir itself + if (!key.empty()) { + RETURN_NOT_OK(DeleteObject(bucket, key + kSep)); + } + return Status::OK(); + } + + Status EnsureParentExists(const S3Path& path) { + if (path.has_parent()) { + // Parent may be implicitly deleted if it became empty, recreate it + S3Path parent = path.parent(); + if (!parent.key.empty()) { + return CreateEmptyDir(parent.bucket, parent.key); + } + } + return Status::OK(); + } + + Status ListBuckets(std::vector* out) { + out->clear(); + auto outcome = client_->ListBuckets(); + if (!outcome.IsSuccess()) { + return ErrorToStatus(outcome.GetError()); + } + for (const auto& bucket : outcome.GetResult().GetBuckets()) { + out->emplace_back(FromAwsString(bucket.GetName())); + } + return Status::OK(); + } +}; + +S3FileSystem::S3FileSystem(const S3Options& options) : impl_(new Impl{options}) {} + +S3FileSystem::~S3FileSystem() {} + +Status S3FileSystem::Make(const S3Options& options, std::shared_ptr* out) { + RETURN_NOT_OK(CheckS3Initialized()); + + std::shared_ptr ptr(new S3FileSystem(options)); + RETURN_NOT_OK(ptr->impl_->Init()); + *out = std::move(ptr); + return Status::OK(); +} + +Status S3FileSystem::GetTargetStats(const std::string& s, FileStats* out) { + S3Path path; + RETURN_NOT_OK(S3Path::FromString(s, &path)); + FileStats st; + st.set_path(s); + + if (path.empty()) { + // It's the root path "" + st.set_type(FileType::Directory); + *out = st; + return Status::OK(); + } else if (path.key.empty()) { + // It's a bucket + S3Model::HeadBucketRequest req; + req.SetBucket(ToAwsString(path.bucket)); + + auto outcome = impl_->client_->HeadBucket(req); + if (!outcome.IsSuccess()) { + if (!IsNotFound(outcome.GetError())) { + return ErrorToStatus(outcome.GetError()); + } + st.set_type(FileType::NonExistent); + *out = st; + return Status::OK(); + } + // NOTE: S3 doesn't have a bucket modification time. Only a creation + // time is available, and you have to list all buckets to get it. + st.set_type(FileType::Directory); + *out = st; + return Status::OK(); + } else { + // It's an object + S3Model::HeadObjectRequest req; + req.SetBucket(ToAwsString(path.bucket)); + req.SetKey(ToAwsString(path.key)); + + auto outcome = impl_->client_->HeadObject(req); + if (outcome.IsSuccess()) { + // "File" object found + *out = st; + return FileObjectToStats(outcome.GetResult(), out); + } + if (!IsNotFound(outcome.GetError())) { + return ErrorToStatus(outcome.GetError()); + } + // Not found => perhaps it's an empty "directory" + bool is_dir = false; + RETURN_NOT_OK(impl_->IsEmptyDirectory(path, &is_dir)); + if (is_dir) { + st.set_type(FileType::Directory); + *out = st; + return Status::OK(); + } + // Not found => perhaps it's a non-empty "directory" + RETURN_NOT_OK(impl_->IsNonEmptyDirectory(path, &is_dir)); + if (is_dir) { + st.set_type(FileType::Directory); + } else { + st.set_type(FileType::NonExistent); + } + *out = st; + return Status::OK(); + } +} + +Status S3FileSystem::GetTargetStats(const Selector& select, std::vector* out) { + S3Path base_path; + RETURN_NOT_OK(S3Path::FromString(select.base_dir, &base_path)); + out->clear(); + + if (base_path.empty()) { + // List all buckets + std::vector buckets; + RETURN_NOT_OK(impl_->ListBuckets(&buckets)); + for (const auto& bucket : buckets) { + FileStats st; + st.set_path(bucket); + st.set_type(FileType::Directory); + out->push_back(std::move(st)); + if (select.recursive) { + RETURN_NOT_OK(impl_->Walk(select, bucket, "", out)); + } + } + return Status::OK(); + } + + // Nominal case -> walk a single bucket + return impl_->Walk(select, base_path.bucket, base_path.key, out); +} + +Status S3FileSystem::CreateDir(const std::string& s, bool recursive) { + S3Path path; + RETURN_NOT_OK(S3Path::FromString(s, &path)); + + if (path.key.empty()) { + // Create bucket + return impl_->CreateBucket(path.bucket); + } + + // Create object + if (recursive) { + // Ensure bucket exists + RETURN_NOT_OK(impl_->CreateBucket(path.bucket)); + // Ensure that all parents exist, then the directory itself + std::string parent_key; + for (const auto& part : path.key_parts) { + parent_key += part; + parent_key += kSep; + RETURN_NOT_OK(impl_->CreateEmptyObject(path.bucket, parent_key)); + } + return Status::OK(); + } else { + // Check parent dir exists + if (path.has_parent()) { + S3Path parent_path = path.parent(); + bool exists; + RETURN_NOT_OK(impl_->IsNonEmptyDirectory(parent_path, &exists)); + if (!exists) { + RETURN_NOT_OK(impl_->IsEmptyDirectory(parent_path, &exists)); + } + if (!exists) { + return Status::IOError("Cannot create directory '", path.full_path, + "': parent directory does not exist"); + } + } + + // XXX Should we check that no non-directory entry exists? + // Minio does it for us, not sure about other S3 implementations. + return impl_->CreateEmptyDir(path.bucket, path.key); + } +} + +Status S3FileSystem::DeleteDir(const std::string& s) { + S3Path path; + RETURN_NOT_OK(S3Path::FromString(s, &path)); + + if (path.empty()) { + return Status::NotImplemented("Cannot delete all S3 buckets"); + } + RETURN_NOT_OK(impl_->DeleteDir(path.bucket, path.key)); + if (path.key.empty()) { + // Also delete bucket + S3Model::DeleteBucketRequest req; + req.SetBucket(ToAwsString(path.bucket)); + return OutcomeToStatus(impl_->client_->DeleteBucket(req)); + } + // Parent may be implicitly deleted if it became empty, recreate it + return impl_->EnsureParentExists(path); +} + +Status S3FileSystem::DeleteFile(const std::string& s) { + S3Path path; + RETURN_NOT_OK(S3Path::FromString(s, &path)); + RETURN_NOT_OK(ValidateFilePath(path)); + + // Check the object exists + S3Model::HeadObjectRequest req; + req.SetBucket(ToAwsString(path.bucket)); + req.SetKey(ToAwsString(path.key)); + + auto outcome = impl_->client_->HeadObject(req); + if (!outcome.IsSuccess()) { + if (IsNotFound(outcome.GetError())) { + return PathNotFound(path); + } else { + return ErrorToStatus(outcome.GetError()); + } + } + // Object found, delete it + RETURN_NOT_OK(impl_->DeleteObject(path.bucket, path.key)); + // Parent may be implicitly deleted if it became empty, recreate it + return impl_->EnsureParentExists(path); +} + +Status S3FileSystem::Move(const std::string& src, const std::string& dest) { + // XXX We don't implement moving directories as it would be too expensive: + // one must copy all directory contents one by one (including object data), + // then delete the original contents. + + S3Path src_path, dest_path; + RETURN_NOT_OK(S3Path::FromString(src, &src_path)); + RETURN_NOT_OK(ValidateFilePath(src_path)); + RETURN_NOT_OK(S3Path::FromString(dest, &dest_path)); + RETURN_NOT_OK(ValidateFilePath(dest_path)); + + if (src_path == dest_path) { + return Status::OK(); + } + RETURN_NOT_OK(impl_->CopyObject(src_path, dest_path)); + RETURN_NOT_OK(impl_->DeleteObject(src_path.bucket, src_path.key)); + // Source parent may be implicitly deleted if it became empty, recreate it + return impl_->EnsureParentExists(src_path); +} + +Status S3FileSystem::CopyFile(const std::string& src, const std::string& dest) { + S3Path src_path, dest_path; + RETURN_NOT_OK(S3Path::FromString(src, &src_path)); + RETURN_NOT_OK(ValidateFilePath(src_path)); + RETURN_NOT_OK(S3Path::FromString(dest, &dest_path)); + RETURN_NOT_OK(ValidateFilePath(dest_path)); + + if (src_path == dest_path) { + return Status::OK(); + } + return impl_->CopyObject(src_path, dest_path); +} + +Status S3FileSystem::OpenInputStream(const std::string& s, + std::shared_ptr* out) { + S3Path path; + RETURN_NOT_OK(S3Path::FromString(s, &path)); + RETURN_NOT_OK(ValidateFilePath(path)); + + auto ptr = std::make_shared(impl_->client_.get(), path); + RETURN_NOT_OK(ptr->Init()); + *out = std::move(ptr); + return Status::OK(); +} + +Status S3FileSystem::OpenInputFile(const std::string& s, + std::shared_ptr* out) { + S3Path path; + RETURN_NOT_OK(S3Path::FromString(s, &path)); + RETURN_NOT_OK(ValidateFilePath(path)); + + auto ptr = std::make_shared(impl_->client_.get(), path); + RETURN_NOT_OK(ptr->Init()); + *out = std::move(ptr); + return Status::OK(); +} + +Status S3FileSystem::OpenOutputStream(const std::string& s, + std::shared_ptr* out) { + S3Path path; + RETURN_NOT_OK(S3Path::FromString(s, &path)); + RETURN_NOT_OK(ValidateFilePath(path)); + + auto ptr = std::make_shared(impl_->client_.get(), path); + RETURN_NOT_OK(ptr->Init()); + *out = std::move(ptr); + return Status::OK(); +} + +Status S3FileSystem::OpenAppendStream(const std::string& path, + std::shared_ptr* out) { + // XXX Investigate UploadPartCopy? Does it work with source == destination? + // https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html + // (but would need to fall back to GET if the current data is < 5 MB) + return Status::NotImplemented("It is not possible to append efficiently to S3 objects"); +} + +} // namespace fs +} // namespace arrow diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h new file mode 100644 index 00000000000..17cf62575c1 --- /dev/null +++ b/cpp/src/arrow/filesystem/s3fs.h @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "arrow/filesystem/filesystem.h" +#include "arrow/util/macros.h" + +namespace arrow { +namespace fs { + +extern ARROW_EXPORT const char* kS3DefaultRegion; + +struct ARROW_EXPORT S3Options { + // AWS region to connect to (default "us-east-1") + std::string region = kS3DefaultRegion; + + // XXX perhaps instead take a URL like "http://localhost:9000"? + // If non-empty, override region with a connect string such as "localhost:9000" + std::string endpoint_override; + // Default "https" + std::string scheme = "https"; + + std::string access_key; + std::string secret_key; +}; + +/// S3-backed FileSystem implementation. +/// +/// Some implementation notes: +/// - buckets are special and the operations available on them may be limited +/// or more expensive than desired. +class ARROW_EXPORT S3FileSystem : public FileSystem { + public: + ~S3FileSystem() override; + + using FileSystem::GetTargetStats; + Status GetTargetStats(const std::string& path, FileStats* out) override; + Status GetTargetStats(const Selector& select, std::vector* out) override; + + Status CreateDir(const std::string& path, bool recursive = true) override; + + Status DeleteDir(const std::string& path) override; + + Status DeleteFile(const std::string& path) override; + + Status Move(const std::string& src, const std::string& dest) override; + + Status CopyFile(const std::string& src, const std::string& dest) override; + + /// Create a sequential input stream for reading from a S3 object. + /// + /// NOTE: Reads from the stream will be synchronous and unbuffered. + /// You way want to wrap the stream in a BufferedInputStream or use + /// a custom readahead strategy to avoid idle waits. + Status OpenInputStream(const std::string& path, + std::shared_ptr* out) override; + + /// Create a random access file for reading from a S3 object. + /// + /// See OpenInputStream for performance notes. + Status OpenInputFile(const std::string& path, + std::shared_ptr* out) override; + + /// Create a sequential output stream for writing to a S3 object. + /// + /// NOTE: Writes to the stream will be buffered but synchronous (i.e. + /// when a buffer is implicitly flushed, it waits for the upload to + /// complete and the server to respond). You may want to issue writes + /// in the background to avoid idle waits. + Status OpenOutputStream(const std::string& path, + std::shared_ptr* out) override; + + Status OpenAppendStream(const std::string& path, + std::shared_ptr* out) override; + + static Status Make(const S3Options& options, std::shared_ptr* out); + + protected: + explicit S3FileSystem(const S3Options& options); + + class Impl; + std::unique_ptr impl_; +}; + +enum class S3LogLevel { Off, Fatal, Error, Warn, Info, Debug, Trace }; + +struct ARROW_EXPORT S3GlobalOptions { + S3LogLevel log_level; +}; + +/// Initialize the S3 APIs. It is required to call this function at least once +/// before using S3FileSystem. +ARROW_EXPORT +Status InitializeS3(const S3GlobalOptions& options); + +/// Shutdown the S3 APIs. +ARROW_EXPORT +Status FinalizeS3(); + +} // namespace fs +} // namespace arrow diff --git a/cpp/src/arrow/filesystem/s3fs_narrative_test.cc b/cpp/src/arrow/filesystem/s3fs_narrative_test.cc new file mode 100644 index 00000000000..9432a1f1985 --- /dev/null +++ b/cpp/src/arrow/filesystem/s3fs_narrative_test.cc @@ -0,0 +1,229 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// "Narrative" test for S3. This must be run manually against a S3 endpoint. +// The test bucket must exist and be empty (you can use --clear to delete its +// contents). + +#include +#include +#include + +#include + +#include "arrow/filesystem/s3fs.h" +#include "arrow/filesystem/test_util.h" +#include "arrow/io/interfaces.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/logging.h" + +DEFINE_bool(clear, false, "delete all bucket contents"); +DEFINE_bool(test, false, "run narrative test against bucket"); + +DEFINE_string(access_key, "", "S3 access key"); +DEFINE_string(secret_key, "", "S3 secret key"); + +DEFINE_string(bucket, "", "bucket name"); +DEFINE_string(region, arrow::fs::kS3DefaultRegion, "AWS region"); +DEFINE_string(endpoint, "", "Endpoint override (e.g. '127.0.0.1:9000')"); +DEFINE_string(scheme, "https", "Connection scheme"); + +namespace arrow { +namespace fs { + +std::shared_ptr MakeFileSystem() { + std::shared_ptr s3fs; + S3Options options; + options.access_key = FLAGS_access_key; + options.secret_key = FLAGS_secret_key; + options.endpoint_override = FLAGS_endpoint; + options.scheme = FLAGS_scheme; + options.region = FLAGS_region; + ABORT_NOT_OK(S3FileSystem::Make(options, &s3fs)); + return std::make_shared(FLAGS_bucket, s3fs); +} + +void ClearBucket(int argc, char** argv) { + auto fs = MakeFileSystem(); + + // TODO(ARROW-6358): delete in one shot without deleting the bucket itself + std::vector stats; + Selector select; + select.base_dir = ""; + select.allow_non_existent = false; + select.recursive = false; + ASSERT_OK(fs->GetTargetStats(select, &stats)); + for (const auto& st : stats) { + switch (st.type()) { + case FileType::File: + ASSERT_OK(fs->DeleteFile(st.path())); + break; + case FileType::Directory: + ASSERT_OK(fs->DeleteDir(st.path())); + break; + default: + FAIL() << "Unexpected file type: " << st.type(); + } + } +} + +void TestBucket(int argc, char** argv) { + auto fs = MakeFileSystem(); + FileStats st; + std::vector stats; + Selector select; + std::shared_ptr is; + std::shared_ptr file; + std::shared_ptr buf; + int64_t pos; + + // Check bucket exists and is empty + select.base_dir = ""; + select.allow_non_existent = false; + select.recursive = false; + ASSERT_OK(fs->GetTargetStats(select, &stats)); + ASSERT_EQ(stats.size(), 0) << "Bucket should be empty, perhaps use --clear?"; + + // Create directory structure + ASSERT_OK(fs->CreateDir("EmptyDir", /*recursive=*/false)); + ASSERT_OK(fs->CreateDir("Dir1", /*recursive=*/false)); + ASSERT_OK(fs->CreateDir("Dir1/Subdir", /*recursive=*/false)); + ASSERT_RAISES(IOError, fs->CreateDir("Dir2/Subdir", /*recursive=*/false)); + ASSERT_OK(fs->CreateDir("Dir2/Subdir", /*recursive=*/true)); + CreateFile(fs.get(), "File1", "first data"); + CreateFile(fs.get(), "Dir1/File2", "second data"); + CreateFile(fs.get(), "Dir2/Subdir/File3", "third data"); + + // GetTargetStats(Selector) + select.base_dir = ""; + ASSERT_OK(fs->GetTargetStats(select, &stats)); + ASSERT_EQ(stats.size(), 4); + SortStats(&stats); + AssertFileStats(stats[0], "Dir1", FileType::Directory); + AssertFileStats(stats[1], "Dir2", FileType::Directory); + AssertFileStats(stats[2], "EmptyDir", FileType::Directory); + AssertFileStats(stats[3], "File1", FileType::File, 10); + + select.base_dir = "zzzz"; + ASSERT_RAISES(IOError, fs->GetTargetStats(select, &stats)); + select.allow_non_existent = true; + ASSERT_OK(fs->GetTargetStats(select, &stats)); + ASSERT_EQ(stats.size(), 0); + + select.base_dir = "Dir1"; + select.allow_non_existent = false; + ASSERT_OK(fs->GetTargetStats(select, &stats)); + ASSERT_EQ(stats.size(), 2); + AssertFileStats(stats[0], "Dir1/File2", FileType::File, 11); + AssertFileStats(stats[1], "Dir1/Subdir", FileType::Directory); + + select.base_dir = "Dir2"; + select.recursive = true; + ASSERT_OK(fs->GetTargetStats(select, &stats)); + ASSERT_EQ(stats.size(), 2); + AssertFileStats(stats[0], "Dir2/Subdir", FileType::Directory); + AssertFileStats(stats[1], "Dir2/Subdir/File3", FileType::File, 10); + + // Read a file + ASSERT_RAISES(IOError, fs->OpenInputStream("zzz", &is)); + ASSERT_OK(fs->OpenInputStream("File1", &is)); + ASSERT_OK(is->Read(5, &buf)); + AssertBufferEqual(*buf, "first"); + ASSERT_OK(is->Read(10, &buf)); + AssertBufferEqual(*buf, " data"); + ASSERT_OK(is->Read(10, &buf)); + AssertBufferEqual(*buf, ""); + ASSERT_OK(is->Close()); + + ASSERT_OK(fs->OpenInputFile("Dir1/File2", &file)); + ASSERT_OK(file->Tell(&pos)); + ASSERT_EQ(pos, 0); + ASSERT_OK(file->Seek(7)); + ASSERT_OK(file->Tell(&pos)); + ASSERT_EQ(pos, 7); + ASSERT_OK(file->Read(2, &buf)); + AssertBufferEqual(*buf, "da"); + ASSERT_OK(file->Tell(&pos)); + ASSERT_EQ(pos, 9); + ASSERT_OK(file->ReadAt(2, 4, &buf)); + AssertBufferEqual(*buf, "cond"); + ASSERT_OK(file->Close()); + + // Copy a file + ASSERT_OK(fs->CopyFile("File1", "Dir2/File4")); + AssertFileStats(fs.get(), "File1", FileType::File, 10); + AssertFileStats(fs.get(), "Dir2/File4", FileType::File, 10); + AssertFileContents(fs.get(), "Dir2/File4", "first data"); + + // Copy a file over itself + ASSERT_OK(fs->CopyFile("File1", "File1")); + AssertFileStats(fs.get(), "File1", FileType::File, 10); + AssertFileContents(fs.get(), "File1", "first data"); + + // Move a file + ASSERT_OK(fs->Move("Dir2/File4", "File5")); + AssertFileStats(fs.get(), "Dir2/File4", FileType::NonExistent); + AssertFileStats(fs.get(), "File5", FileType::File, 10); + AssertFileContents(fs.get(), "File5", "first data"); + + // Move a file over itself + ASSERT_OK(fs->Move("File5", "File5")); + AssertFileStats(fs.get(), "File5", FileType::File, 10); + AssertFileContents(fs.get(), "File5", "first data"); +} + +void TestMain(int argc, char** argv) { + S3GlobalOptions options; + options.log_level = S3LogLevel::Fatal; + ASSERT_OK(InitializeS3(options)); + + if (FLAGS_clear) { + ClearBucket(argc, argv); + } else if (FLAGS_test) { + TestBucket(argc, argv); + } + + ASSERT_OK(FinalizeS3()); +} + +} // namespace fs +} // namespace arrow + +int main(int argc, char** argv) { + std::stringstream ss; + ss << "Narrative test for S3. Needs an initialized empty bucket.\n"; + ss << "Usage: " << argv[0]; + gflags::SetUsageMessage(ss.str()); + gflags::ParseCommandLineFlags(&argc, &argv, true); + + if (FLAGS_clear + FLAGS_test != 1) { + ARROW_LOG(ERROR) << "Need exactly one of --test and --clear"; + return 2; + } + if (FLAGS_bucket.empty()) { + ARROW_LOG(ERROR) << "--bucket is mandatory"; + return 2; + } + + arrow::fs::TestMain(argc, argv); + if (::testing::Test::HasFatalFailure() || ::testing::Test::HasNonfatalFailure()) { + return 1; + } else { + std::cout << "Ok" << std::endl; + return 0; + } +} diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc new file mode 100644 index 00000000000..c4da4e845ed --- /dev/null +++ b/cpp/src/arrow/filesystem/s3fs_test.cc @@ -0,0 +1,727 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include + +#include + +#include + +#ifdef _WIN32 +// Undefine preprocessor macros that interfere with AWS function / method names +#ifdef GetMessage +#undef GetMessage +#endif +#ifdef GetObject +#undef GetObject +#endif +#endif + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/filesystem/filesystem.h" +#include "arrow/filesystem/s3_internal.h" +#include "arrow/filesystem/s3fs.h" +#include "arrow/filesystem/test_util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/util.h" +#include "arrow/util/io_util.h" +#include "arrow/util/logging.h" +#include "arrow/util/macros.h" + +namespace arrow { +namespace fs { + +using ::arrow::internal::PlatformFilename; +using ::arrow::internal::TemporaryDir; + +using ::arrow::fs::internal::ConnectRetryStrategy; +using ::arrow::fs::internal::ErrorToStatus; +using ::arrow::fs::internal::OutcomeToStatus; +using ::arrow::fs::internal::ToAwsString; + +namespace bp = boost::process; + +// NOTE: Connecting in Python: +// >>> fs = s3fs.S3FileSystem(key='minio', secret='miniopass', +// client_kwargs=dict(endpoint_url='http://127.0.0.1:9000')) +// >>> fs.ls('') +// ['bucket'] +// or: +// >>> from fs_s3fs import S3FS +// >>> fs = S3FS('bucket', endpoint_url='http://127.0.0.1:9000', +// aws_access_key_id='minio', aws_secret_access_key='miniopass') + +#define ARROW_AWS_ASSIGN_OR_FAIL_IMPL(outcome_name, lhs, rexpr) \ + auto outcome_name = (rexpr); \ + if (!outcome_name.IsSuccess()) { \ + FAIL() << "'" ARROW_STRINGIFY(rexpr) "' failed with " \ + << outcome_name.GetError().GetMessage(); \ + } \ + lhs = std::move(outcome_name).GetResultWithOwnership(); + +#define ARROW_AWS_ASSIGN_OR_FAIL_NAME(x, y) ARROW_CONCAT(x, y) + +#define ARROW_AWS_ASSIGN_OR_FAIL(lhs, rexpr) \ + ARROW_AWS_ASSIGN_OR_FAIL_IMPL( \ + ARROW_AWS_ASSIGN_OR_FAIL_NAME(_aws_error_or_value, __COUNTER__), lhs, rexpr); + +// TODO: allocate an ephemeral port +static const char* kMinioExecutableName = "minio"; +static const char* kMinioConnectString = "127.0.0.1:9123"; +static const char* kMinioAccessKey = "minio"; +static const char* kMinioSecretKey = "miniopass"; + +// A minio test server, managed as a child process + +class MinioTestServer { + public: + Status Start(); + + Status Stop(); + + std::string connect_string() const { return connect_string_; } + + std::string access_key() const { return kMinioAccessKey; } + + std::string secret_key() const { return kMinioSecretKey; } + + private: + std::unique_ptr temp_dir_; + std::string connect_string_; + std::shared_ptr<::boost::process::child> server_process_; +}; + +Status MinioTestServer::Start() { + RETURN_NOT_OK(TemporaryDir::Make("s3fs-test-", &temp_dir_)); + + // Get a copy of the current environment. + // (NOTE: using "auto" would return a native_environment that mutates + // the current environment) + bp::environment env = boost::this_process::environment(); + env["MINIO_ACCESS_KEY"] = kMinioAccessKey; + env["MINIO_SECRET_KEY"] = kMinioSecretKey; + + connect_string_ = kMinioConnectString; + + auto exe_path = bp::search_path(kMinioExecutableName); + if (exe_path.empty()) { + return Status::IOError("Failed to find minio executable ('", kMinioExecutableName, + "') in PATH"); + } + + try { + // NOTE: --quiet makes startup faster by suppressing remote version check + server_process_ = std::make_shared( + env, exe_path, "server", "--quiet", "--compat", "--address", connect_string_, + temp_dir_->path().ToString()); + } catch (const std::exception& e) { + return Status::IOError("Failed to launch Minio server: ", e.what()); + } + return Status::OK(); +} + +Status MinioTestServer::Stop() { + if (server_process_ && server_process_->valid()) { + // Brutal shutdown + server_process_->terminate(); + server_process_->wait(); + } + return Status::OK(); +} + +// A global test "environment", to ensure that the S3 API is initialized before +// running unit tests. + +class S3Environment : public ::testing::Environment { + public: + virtual ~S3Environment() {} + + void SetUp() override { + // Change this to increase logging during tests + S3GlobalOptions options; + options.log_level = S3LogLevel::Fatal; + ASSERT_OK(InitializeS3(options)); + } + + void TearDown() override { ASSERT_OK(FinalizeS3()); } + + protected: + Aws::SDKOptions options_; +}; + +::testing::Environment* s3_env = ::testing::AddGlobalTestEnvironment(new S3Environment); + +class S3TestMixin : public ::testing::Test { + public: + void SetUp() override { + ASSERT_OK(minio_.Start()); + + client_config_.endpointOverride = ToAwsString(minio_.connect_string()); + client_config_.scheme = Aws::Http::Scheme::HTTP; + client_config_.retryStrategy = std::make_shared(); + credentials_ = {ToAwsString(minio_.access_key()), ToAwsString(minio_.secret_key())}; + bool use_virtual_addressing = false; + client_.reset( + new Aws::S3::S3Client(credentials_, client_config_, + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, + use_virtual_addressing)); + } + + void TearDown() override { ASSERT_OK(minio_.Stop()); } + + protected: + MinioTestServer minio_; + Aws::Client::ClientConfiguration client_config_; + Aws::Auth::AWSCredentials credentials_; + std::unique_ptr client_; +}; + +void AssertGetObject(Aws::S3::Model::GetObjectResult& result, + const std::string& expected) { + auto length = static_cast(expected.length()); + ASSERT_EQ(result.GetContentLength(), length); + auto& stream = result.GetBody(); + std::string actual; + actual.resize(length + 1); + stream.read(&actual[0], length + 1); + ASSERT_EQ(stream.gcount(), length); // EOF was reached before length + 1 + actual.resize(length); + ASSERT_EQ(actual, expected); +} + +void AssertObjectContents(Aws::S3::S3Client* client, const std::string& bucket, + const std::string& key, const std::string& expected) { + Aws::S3::Model::GetObjectRequest req; + req.SetBucket(ToAwsString(bucket)); + req.SetKey(ToAwsString(key)); + ARROW_AWS_ASSIGN_OR_FAIL(auto result, client->GetObject(req)); + AssertGetObject(result, expected); +} + +//////////////////////////////////////////////////////////////////////////// +// Basic test for the Minio test server. + +class TestMinioServer : public S3TestMixin { + public: + void SetUp() override { S3TestMixin::SetUp(); } + + protected: +}; + +TEST_F(TestMinioServer, Connect) { + // Just a dummy connection test. Check that we can list buckets, + // and that there are none (the server is launched in an empty temp dir). + ARROW_AWS_ASSIGN_OR_FAIL(auto bucket_list, client_->ListBuckets()); + ASSERT_EQ(bucket_list.GetBuckets().size(), 0); +} + +//////////////////////////////////////////////////////////////////////////// +// Concrete S3 tests + +class TestS3FS : public S3TestMixin { + public: + void SetUp() override { + S3TestMixin::SetUp(); + options_.access_key = minio_.access_key(); + options_.secret_key = minio_.secret_key(); + options_.scheme = "http"; + options_.endpoint_override = minio_.connect_string(); + ASSERT_OK(S3FileSystem::Make(options_, &fs_)); + + // Set up test bucket + { + Aws::S3::Model::CreateBucketRequest req; + req.SetBucket(ToAwsString("bucket")); + ASSERT_OK(OutcomeToStatus(client_->CreateBucket(req))); + req.SetBucket(ToAwsString("empty-bucket")); + ASSERT_OK(OutcomeToStatus(client_->CreateBucket(req))); + } + { + Aws::S3::Model::PutObjectRequest req; + req.SetBucket(ToAwsString("bucket")); + req.SetKey(ToAwsString("emptydir/")); + ASSERT_OK(OutcomeToStatus(client_->PutObject(req))); + // NOTE: no need to create intermediate "directories" somedir/ and + // somedir/subdir/ + req.SetKey(ToAwsString("somedir/subdir/subfile")); + req.SetBody(std::make_shared("sub data")); + ASSERT_OK(OutcomeToStatus(client_->PutObject(req))); + req.SetKey(ToAwsString("somefile")); + req.SetBody(std::make_shared("some data")); + ASSERT_OK(OutcomeToStatus(client_->PutObject(req))); + } + } + + protected: + S3Options options_; + std::shared_ptr fs_; +}; + +TEST_F(TestS3FS, GetTargetStatsRoot) { + FileStats st; + AssertFileStats(fs_.get(), "", FileType::Directory); +} + +TEST_F(TestS3FS, GetTargetStatsBucket) { + FileStats st; + AssertFileStats(fs_.get(), "bucket", FileType::Directory); + AssertFileStats(fs_.get(), "empty-bucket", FileType::Directory); + AssertFileStats(fs_.get(), "non-existent-bucket", FileType::NonExistent); +} + +TEST_F(TestS3FS, GetTargetStatsObject) { + FileStats st; + + // "Directories" + AssertFileStats(fs_.get(), "bucket/emptydir", FileType::Directory, kNoSize); + AssertFileStats(fs_.get(), "bucket/somedir", FileType::Directory, kNoSize); + AssertFileStats(fs_.get(), "bucket/somedir/subdir", FileType::Directory, kNoSize); + + // "Files" + AssertFileStats(fs_.get(), "bucket/somefile", FileType::File, 9); + AssertFileStats(fs_.get(), "bucket/somedir/subdir/subfile", FileType::File, 8); + + // Non-existent + AssertFileStats(fs_.get(), "bucket/emptyd", FileType::NonExistent); + AssertFileStats(fs_.get(), "bucket/somed", FileType::NonExistent); + AssertFileStats(fs_.get(), "non-existent-bucket/somed", FileType::NonExistent); +} + +TEST_F(TestS3FS, GetTargetStatsSelector) { + Selector select; + std::vector stats; + + // Root dir + select.base_dir = ""; + ASSERT_OK(fs_->GetTargetStats(select, &stats)); + ASSERT_EQ(stats.size(), 2); + SortStats(&stats); + AssertFileStats(stats[0], "bucket", FileType::Directory); + AssertFileStats(stats[1], "empty-bucket", FileType::Directory); + + // Empty bucket + select.base_dir = "empty-bucket"; + ASSERT_OK(fs_->GetTargetStats(select, &stats)); + ASSERT_EQ(stats.size(), 0); + // Non-existent bucket + select.base_dir = "non-existent-bucket"; + ASSERT_RAISES(IOError, fs_->GetTargetStats(select, &stats)); + select.allow_non_existent = true; + ASSERT_OK(fs_->GetTargetStats(select, &stats)); + ASSERT_EQ(stats.size(), 0); + select.allow_non_existent = false; + // Non-empty bucket + select.base_dir = "bucket"; + ASSERT_OK(fs_->GetTargetStats(select, &stats)); + SortStats(&stats); + ASSERT_EQ(stats.size(), 3); + AssertFileStats(stats[0], "bucket/emptydir", FileType::Directory); + AssertFileStats(stats[1], "bucket/somedir", FileType::Directory); + AssertFileStats(stats[2], "bucket/somefile", FileType::File, 9); + + // Empty "directory" + select.base_dir = "bucket/emptydir"; + ASSERT_OK(fs_->GetTargetStats(select, &stats)); + ASSERT_EQ(stats.size(), 0); + // Non-empty "directories" + select.base_dir = "bucket/somedir"; + ASSERT_OK(fs_->GetTargetStats(select, &stats)); + ASSERT_EQ(stats.size(), 1); + AssertFileStats(stats[0], "bucket/somedir/subdir", FileType::Directory); + select.base_dir = "bucket/somedir/subdir"; + ASSERT_OK(fs_->GetTargetStats(select, &stats)); + ASSERT_EQ(stats.size(), 1); + AssertFileStats(stats[0], "bucket/somedir/subdir/subfile", FileType::File, 8); + // Non-existent + select.base_dir = "bucket/non-existent"; + ASSERT_RAISES(IOError, fs_->GetTargetStats(select, &stats)); + select.allow_non_existent = true; + ASSERT_OK(fs_->GetTargetStats(select, &stats)); + ASSERT_EQ(stats.size(), 0); + select.allow_non_existent = false; +} + +TEST_F(TestS3FS, GetTargetStatsSelectorRecursive) { + Selector select; + std::vector stats; + select.recursive = true; + + // Root dir + select.base_dir = ""; + ASSERT_OK(fs_->GetTargetStats(select, &stats)); + ASSERT_EQ(stats.size(), 7); + SortStats(&stats); + AssertFileStats(stats[0], "bucket", FileType::Directory); + AssertFileStats(stats[1], "bucket/emptydir", FileType::Directory); + AssertFileStats(stats[2], "bucket/somedir", FileType::Directory); + AssertFileStats(stats[3], "bucket/somedir/subdir", FileType::Directory); + AssertFileStats(stats[4], "bucket/somedir/subdir/subfile", FileType::File, 8); + AssertFileStats(stats[5], "bucket/somefile", FileType::File, 9); + AssertFileStats(stats[6], "empty-bucket", FileType::Directory); + + // Empty bucket + select.base_dir = "empty-bucket"; + ASSERT_OK(fs_->GetTargetStats(select, &stats)); + ASSERT_EQ(stats.size(), 0); + + // Non-empty bucket + select.base_dir = "bucket"; + ASSERT_OK(fs_->GetTargetStats(select, &stats)); + SortStats(&stats); + ASSERT_EQ(stats.size(), 5); + AssertFileStats(stats[0], "bucket/emptydir", FileType::Directory); + AssertFileStats(stats[1], "bucket/somedir", FileType::Directory); + AssertFileStats(stats[2], "bucket/somedir/subdir", FileType::Directory); + AssertFileStats(stats[3], "bucket/somedir/subdir/subfile", FileType::File, 8); + AssertFileStats(stats[4], "bucket/somefile", FileType::File, 9); + + // Empty "directory" + select.base_dir = "bucket/emptydir"; + ASSERT_OK(fs_->GetTargetStats(select, &stats)); + ASSERT_EQ(stats.size(), 0); + + // Non-empty "directories" + select.base_dir = "bucket/somedir"; + ASSERT_OK(fs_->GetTargetStats(select, &stats)); + SortStats(&stats); + ASSERT_EQ(stats.size(), 2); + AssertFileStats(stats[0], "bucket/somedir/subdir", FileType::Directory); + AssertFileStats(stats[1], "bucket/somedir/subdir/subfile", FileType::File, 8); +} + +TEST_F(TestS3FS, CreateDir) { + FileStats st; + + // Existing bucket + ASSERT_OK(fs_->CreateDir("bucket")); + AssertFileStats(fs_.get(), "bucket", FileType::Directory); + + // New bucket + AssertFileStats(fs_.get(), "new-bucket", FileType::NonExistent); + ASSERT_OK(fs_->CreateDir("new-bucket")); + AssertFileStats(fs_.get(), "new-bucket", FileType::Directory); + + // Existing "directory" + AssertFileStats(fs_.get(), "bucket/somedir", FileType::Directory); + ASSERT_OK(fs_->CreateDir("bucket/somedir")); + AssertFileStats(fs_.get(), "bucket/somedir", FileType::Directory); + + AssertFileStats(fs_.get(), "bucket/emptydir", FileType::Directory); + ASSERT_OK(fs_->CreateDir("bucket/emptydir")); + AssertFileStats(fs_.get(), "bucket/emptydir", FileType::Directory); + + // New "directory" + AssertFileStats(fs_.get(), "bucket/newdir", FileType::NonExistent); + ASSERT_OK(fs_->CreateDir("bucket/newdir")); + AssertFileStats(fs_.get(), "bucket/newdir", FileType::Directory); + + // New "directory", recursive + ASSERT_OK(fs_->CreateDir("bucket/newdir/newsub/newsubsub", /*recursive=*/true)); + AssertFileStats(fs_.get(), "bucket/newdir/newsub", FileType::Directory); + AssertFileStats(fs_.get(), "bucket/newdir/newsub/newsubsub", FileType::Directory); + + // Existing "file", should fail + ASSERT_RAISES(IOError, fs_->CreateDir("bucket/somefile")); +} + +TEST_F(TestS3FS, DeleteFile) { + FileStats st; + + // Bucket + ASSERT_RAISES(IOError, fs_->DeleteFile("bucket")); + ASSERT_RAISES(IOError, fs_->DeleteFile("empty-bucket")); + ASSERT_RAISES(IOError, fs_->DeleteFile("non-existent-bucket")); + + // "File" + ASSERT_OK(fs_->DeleteFile("bucket/somefile")); + AssertFileStats(fs_.get(), "bucket/somefile", FileType::NonExistent); + ASSERT_RAISES(IOError, fs_->DeleteFile("bucket/somefile")); + ASSERT_RAISES(IOError, fs_->DeleteFile("bucket/non-existent")); + + // "Directory" + ASSERT_RAISES(IOError, fs_->DeleteFile("bucket/somedir")); + AssertFileStats(fs_.get(), "bucket/somedir", FileType::Directory); +} + +TEST_F(TestS3FS, DeleteDir) { + Selector select; + select.base_dir = "bucket"; + std::vector stats; + FileStats st; + + // Empty "directory" + ASSERT_OK(fs_->DeleteDir("bucket/emptydir")); + ASSERT_OK(fs_->GetTargetStats(select, &stats)); + ASSERT_EQ(stats.size(), 2); + SortStats(&stats); + AssertFileStats(stats[0], "bucket/somedir", FileType::Directory); + AssertFileStats(stats[1], "bucket/somefile", FileType::File); + + // Non-empty "directory" + ASSERT_OK(fs_->DeleteDir("bucket/somedir")); + ASSERT_OK(fs_->GetTargetStats(select, &stats)); + ASSERT_EQ(stats.size(), 1); + AssertFileStats(stats[0], "bucket/somefile", FileType::File); + + // Leaving parent "directory" empty + ASSERT_OK(fs_->CreateDir("bucket/newdir/newsub/newsubsub")); + ASSERT_OK(fs_->DeleteDir("bucket/newdir/newsub")); + ASSERT_OK(fs_->GetTargetStats(select, &stats)); + ASSERT_EQ(stats.size(), 2); + SortStats(&stats); + AssertFileStats(stats[0], "bucket/newdir", FileType::Directory); // still exists + AssertFileStats(stats[1], "bucket/somefile", FileType::File); + + // Bucket + ASSERT_OK(fs_->DeleteDir("bucket")); + ASSERT_OK(fs_->GetTargetStats("bucket", &st)); + AssertFileStats(fs_.get(), "bucket", FileType::NonExistent); +} + +TEST_F(TestS3FS, CopyFile) { + FileStats st; + + // "File" + ASSERT_OK(fs_->CopyFile("bucket/somefile", "bucket/newfile")); + AssertFileStats(fs_.get(), "bucket/newfile", FileType::File, 9); + AssertObjectContents(client_.get(), "bucket", "newfile", "some data"); + AssertFileStats(fs_.get(), "bucket/somefile", FileType::File, 9); // still exists + // Overwrite + ASSERT_OK(fs_->CopyFile("bucket/somedir/subdir/subfile", "bucket/newfile")); + AssertFileStats(fs_.get(), "bucket/newfile", FileType::File, 8); + AssertObjectContents(client_.get(), "bucket", "newfile", "sub data"); + + // Non-existent + ASSERT_RAISES(IOError, fs_->CopyFile("bucket/non-existent", "bucket/newfile2")); + ASSERT_RAISES(IOError, + fs_->CopyFile("non-existent-bucket/somefile", "bucket/newfile2")); + ASSERT_RAISES(IOError, + fs_->CopyFile("bucket/somefile", "non-existent-bucket/newfile2")); + AssertFileStats(fs_.get(), "bucket/newfile2", FileType::NonExistent); +} + +TEST_F(TestS3FS, Move) { + FileStats st; + + // "File" + ASSERT_OK(fs_->Move("bucket/somefile", "bucket/newfile")); + AssertFileStats(fs_.get(), "bucket/newfile", FileType::File, 9); + AssertObjectContents(client_.get(), "bucket", "newfile", "some data"); + // Source was deleted + AssertFileStats(fs_.get(), "bucket/somefile", FileType::NonExistent); + + // Overwrite + ASSERT_OK(fs_->Move("bucket/somedir/subdir/subfile", "bucket/newfile")); + AssertFileStats(fs_.get(), "bucket/newfile", FileType::File, 8); + AssertObjectContents(client_.get(), "bucket", "newfile", "sub data"); + // Source was deleted + AssertFileStats(fs_.get(), "bucket/somedir/subdir/subfile", FileType::NonExistent); + + // Non-existent + ASSERT_RAISES(IOError, fs_->Move("bucket/non-existent", "bucket/newfile2")); + ASSERT_RAISES(IOError, fs_->Move("non-existent-bucket/somefile", "bucket/newfile2")); + ASSERT_RAISES(IOError, fs_->Move("bucket/somefile", "non-existent-bucket/newfile2")); + AssertFileStats(fs_.get(), "bucket/newfile2", FileType::NonExistent); +} + +TEST_F(TestS3FS, OpenInputStream) { + std::shared_ptr stream; + std::shared_ptr buf; + + // Non-existent + ASSERT_RAISES(IOError, fs_->OpenInputStream("non-existent-bucket/somefile", &stream)); + ASSERT_RAISES(IOError, fs_->OpenInputStream("bucket/zzzt", &stream)); + + // "Files" + ASSERT_OK(fs_->OpenInputStream("bucket/somefile", &stream)); + ASSERT_OK(stream->Read(2, &buf)); + AssertBufferEqual(*buf, "so"); + ASSERT_OK(stream->Read(5, &buf)); + AssertBufferEqual(*buf, "me da"); + ASSERT_OK(stream->Read(5, &buf)); + AssertBufferEqual(*buf, "ta"); + ASSERT_OK(stream->Read(5, &buf)); + AssertBufferEqual(*buf, ""); + + ASSERT_OK(fs_->OpenInputStream("bucket/somedir/subdir/subfile", &stream)); + ASSERT_OK(stream->Read(100, &buf)); + AssertBufferEqual(*buf, "sub data"); + ASSERT_OK(stream->Read(100, &buf)); + AssertBufferEqual(*buf, ""); + + // "Directories" + ASSERT_RAISES(IOError, fs_->OpenInputStream("bucket/emptydir", &stream)); + ASSERT_RAISES(IOError, fs_->OpenInputStream("bucket/somedir", &stream)); + ASSERT_RAISES(IOError, fs_->OpenInputStream("bucket", &stream)); +} + +TEST_F(TestS3FS, OpenInputFile) { + std::shared_ptr file; + std::shared_ptr buf; + int64_t nbytes = -1, pos = -1; + + // Non-existent + ASSERT_RAISES(IOError, fs_->OpenInputFile("non-existent-bucket/somefile", &file)); + ASSERT_RAISES(IOError, fs_->OpenInputFile("bucket/zzzt", &file)); + + // "Files" + ASSERT_OK(fs_->OpenInputFile("bucket/somefile", &file)); + ASSERT_OK(file->GetSize(&nbytes)); + ASSERT_EQ(nbytes, 9); + ASSERT_OK(file->Read(4, &buf)); + AssertBufferEqual(*buf, "some"); + ASSERT_OK(file->GetSize(&nbytes)); + ASSERT_EQ(nbytes, 9); + ASSERT_OK(file->Tell(&pos)); + ASSERT_EQ(pos, 4); + + ASSERT_OK(file->ReadAt(2, 5, &buf)); + AssertBufferEqual(*buf, "me da"); + ASSERT_OK(file->Tell(&pos)); + ASSERT_EQ(pos, 4); + ASSERT_OK(file->ReadAt(5, 20, &buf)); + AssertBufferEqual(*buf, "data"); + ASSERT_OK(file->ReadAt(9, 20, &buf)); + AssertBufferEqual(*buf, ""); + // Reading past end of file + ASSERT_RAISES(IOError, file->ReadAt(10, 20, &buf)); + + ASSERT_OK(file->Seek(5)); + ASSERT_OK(file->Read(2, &buf)); + AssertBufferEqual(*buf, "da"); + ASSERT_OK(file->Seek(9)); + ASSERT_OK(file->Read(2, &buf)); + AssertBufferEqual(*buf, ""); + // Seeking past end of file + ASSERT_RAISES(IOError, file->Seek(10)); +} + +TEST_F(TestS3FS, OpenOutputStream) { + std::shared_ptr stream; + + // Non-existent + ASSERT_RAISES(IOError, fs_->OpenOutputStream("non-existent-bucket/somefile", &stream)); + + // Create new empty file + ASSERT_OK(fs_->OpenOutputStream("bucket/newfile1", &stream)); + ASSERT_OK(stream->Close()); + AssertObjectContents(client_.get(), "bucket", "newfile1", ""); + + // Create new file with 1 small write + ASSERT_OK(fs_->OpenOutputStream("bucket/newfile2", &stream)); + ASSERT_OK(stream->Write("some data")); + ASSERT_OK(stream->Close()); + AssertObjectContents(client_.get(), "bucket", "newfile2", "some data"); + + // Create new file with 3 small writes + ASSERT_OK(fs_->OpenOutputStream("bucket/newfile3", &stream)); + ASSERT_OK(stream->Write("some ")); + ASSERT_OK(stream->Write("")); + ASSERT_OK(stream->Write("new data")); + ASSERT_OK(stream->Close()); + AssertObjectContents(client_.get(), "bucket", "newfile3", "some new data"); + + // Create new file with some large writes + std::string s1, s2, s3, s4, s5; + s1 = random_string(6000000, /*seed =*/42); // More than the 5 MB minimum part upload + s2 = "xxx"; + s3 = random_string(6000000, 43); + s4 = "zzz"; + s5 = random_string(600000, 44); + ASSERT_OK(fs_->OpenOutputStream("bucket/newfile4", &stream)); + ASSERT_OK(stream->Write(s1)); + ASSERT_OK(stream->Write(s2)); + ASSERT_OK(stream->Write(s3)); + ASSERT_OK(stream->Write(s4)); + ASSERT_OK(stream->Write(s5)); + ASSERT_OK(stream->Close()); + AssertObjectContents(client_.get(), "bucket", "newfile4", s1 + s2 + s3 + s4 + s5); + + // Overwrite + ASSERT_OK(fs_->OpenOutputStream("bucket/newfile1", &stream)); + ASSERT_OK(stream->Write("overwritten data")); + ASSERT_OK(stream->Close()); + AssertObjectContents(client_.get(), "bucket", "newfile1", "overwritten data"); + + // Overwrite and make empty + ASSERT_OK(fs_->OpenOutputStream("bucket/newfile1", &stream)); + ASSERT_OK(stream->Close()); + AssertObjectContents(client_.get(), "bucket", "newfile1", ""); +} + +TEST_F(TestS3FS, OpenOutputStreamAbort) { + std::shared_ptr stream; + ASSERT_OK(fs_->OpenOutputStream("bucket/somefile", &stream)); + ASSERT_OK(stream->Write("new data")); + // Destructor implicitly aborts stream and the underlying multipart upload. + stream.reset(); + AssertObjectContents(client_.get(), "bucket", "somefile", "some data"); +} + +//////////////////////////////////////////////////////////////////////////// +// Generic S3 tests + +class TestS3FSGeneric : public S3TestMixin, public GenericFileSystemTest { + public: + void SetUp() override { + S3TestMixin::SetUp(); + // Set up test bucket + { + Aws::S3::Model::CreateBucketRequest req; + req.SetBucket(ToAwsString("s3fs-test-bucket")); + ASSERT_OK(OutcomeToStatus(client_->CreateBucket(req))); + } + + options_.access_key = minio_.access_key(); + options_.secret_key = minio_.secret_key(); + options_.scheme = "http"; + options_.endpoint_override = minio_.connect_string(); + ASSERT_OK(S3FileSystem::Make(options_, &s3fs_)); + fs_ = std::make_shared("s3fs-test-bucket", s3fs_); + } + + protected: + std::shared_ptr GetEmptyFileSystem() override { return fs_; } + + bool have_implicit_directories() const override { return true; } + bool allow_write_file_over_dir() const override { return true; } + bool allow_move_dir() const override { return false; } + bool allow_append_to_file() const override { return false; } + bool have_directory_mtimes() const override { return false; } + + S3Options options_; + std::shared_ptr s3fs_; + std::shared_ptr fs_; +}; + +GENERIC_FS_TEST_FUNCTIONS(TestS3FSGeneric); + +} // namespace fs +} // namespace arrow diff --git a/cpp/src/arrow/filesystem/test_util.cc b/cpp/src/arrow/filesystem/test_util.cc index f38c98582a5..e9169f1900f 100644 --- a/cpp/src/arrow/filesystem/test_util.cc +++ b/cpp/src/arrow/filesystem/test_util.cc @@ -78,6 +78,10 @@ Status WriteString(io::OutputStream* stream, const std::string& s) { return stream->Write(s.data(), static_cast(s.length())); } +void ValidateTimePoint(TimePoint tp) { ASSERT_GE(tp.time_since_epoch().count(), 0); } + +}; // namespace + void AssertFileContents(FileSystem* fs, const std::string& path, const std::string& expected_data) { FileStats st; @@ -98,10 +102,6 @@ void AssertFileContents(FileSystem* fs, const std::string& path, ASSERT_OK(stream->Close()); } -void ValidateTimePoint(TimePoint tp) { ASSERT_GE(tp.time_since_epoch().count(), 0); } - -}; // namespace - void CreateFile(FileSystem* fs, const std::string& path, const std::string& data) { std::shared_ptr stream; ASSERT_OK(fs->OpenOutputStream(path, &stream)); @@ -118,32 +118,52 @@ void SortStats(std::vector* stats) { void AssertFileStats(const FileStats& st, const std::string& path, FileType type) { ASSERT_EQ(st.path(), path); - ASSERT_EQ(st.type(), type); + ASSERT_EQ(st.type(), type) << "For path '" << st.path() << "'"; } void AssertFileStats(const FileStats& st, const std::string& path, FileType type, TimePoint mtime) { AssertFileStats(st, path, type); - ASSERT_EQ(st.mtime(), mtime); + ASSERT_EQ(st.mtime(), mtime) << "For path '" << st.path() << "'"; } void AssertFileStats(const FileStats& st, const std::string& path, FileType type, TimePoint mtime, int64_t size) { AssertFileStats(st, path, type, mtime); - ASSERT_EQ(st.size(), size); + ASSERT_EQ(st.size(), size) << "For path '" << st.path() << "'"; } void AssertFileStats(const FileStats& st, const std::string& path, FileType type, int64_t size) { AssertFileStats(st, path, type); - ASSERT_EQ(st.size(), size); + ASSERT_EQ(st.size(), size) << "For path '" << st.path() << "'"; +} + +void AssertFileStats(FileSystem* fs, const std::string& path, FileType type) { + FileStats st; + ASSERT_OK(fs->GetTargetStats(path, &st)); + AssertFileStats(st, path, type); } -template -void AssertFileStats(FileSystem* fs, const std::string& path, Args&&... args) { +void AssertFileStats(FileSystem* fs, const std::string& path, FileType type, + TimePoint mtime) { + FileStats st; + ASSERT_OK(fs->GetTargetStats(path, &st)); + AssertFileStats(st, path, type, mtime); +} + +void AssertFileStats(FileSystem* fs, const std::string& path, FileType type, + TimePoint mtime, int64_t size) { FileStats st; ASSERT_OK(fs->GetTargetStats(path, &st)); - AssertFileStats(st, path, std::forward(args)...); + AssertFileStats(st, path, type, mtime, size); +} + +void AssertFileStats(FileSystem* fs, const std::string& path, FileType type, + int64_t size) { + FileStats st; + ASSERT_OK(fs->GetTargetStats(path, &st)); + AssertFileStats(st, path, type, size); } //////////////////////////////////////////////////////////////////////////// @@ -163,8 +183,10 @@ void GenericFileSystemTest::TestEmpty(FileSystem* fs) { void GenericFileSystemTest::TestCreateDir(FileSystem* fs) { ASSERT_OK(fs->CreateDir("AB")); ASSERT_OK(fs->CreateDir("AB/CD/EF")); // Recursive - // Non-recursive, parent doesn't exist - ASSERT_RAISES(IOError, fs->CreateDir("AB/GH/IJ", false /* recursive */)); + if (!have_implicit_directories()) { + // Non-recursive, parent doesn't exist + ASSERT_RAISES(IOError, fs->CreateDir("AB/GH/IJ", false /* recursive */)); + } ASSERT_OK(fs->CreateDir("AB/GH", false /* recursive */)); ASSERT_OK(fs->CreateDir("AB/GH/IJ", false /* recursive */)); // Idempotency @@ -319,18 +341,26 @@ void GenericFileSystemTest::TestMoveFile(FileSystem* fs) { // Source doesn't exist ASSERT_RAISES(IOError, fs->Move("abc", "def")); - // Parent destination doesn't exist - ASSERT_RAISES(IOError, fs->Move("AB/pqr", "XX/mno")); + if (!have_implicit_directories()) { + // Parent destination doesn't exist + ASSERT_RAISES(IOError, fs->Move("AB/pqr", "XX/mno")); + } // Parent destination is not a directory CreateFile(fs, "xxx", ""); ASSERT_RAISES(IOError, fs->Move("AB/pqr", "xxx/mno")); - // Destination is a directory - ASSERT_RAISES(IOError, fs->Move("AB/pqr", "EF")); + if (!allow_write_file_over_dir()) { + // Destination is a directory + ASSERT_RAISES(IOError, fs->Move("AB/pqr", "EF")); + } AssertAllDirs(fs, all_dirs); AssertAllFiles(fs, {"AB/pqr", "xxx"}); } void GenericFileSystemTest::TestMoveDir(FileSystem* fs) { + if (!allow_move_dir()) { + // XXX skip + return; + } ASSERT_OK(fs->CreateDir("AB/CD")); ASSERT_OK(fs->CreateDir("EF")); CreateFile(fs, "AB/abc", "abc data"); @@ -420,14 +450,18 @@ void GenericFileSystemTest::TestCopyFile(FileSystem* fs) { AssertAllFiles(fs, {"AB/abc", "EF/ghi", "def"}); AssertFileContents(fs, "def", "other data"); - // Destination is a non-empty directory - ASSERT_RAISES(IOError, fs->CopyFile("def", "AB")); // Source doesn't exist ASSERT_RAISES(IOError, fs->CopyFile("abc", "xxx")); - // Parent destination doesn't exist - ASSERT_RAISES(IOError, fs->Move("AB/abc", "XX/mno")); + if (!allow_write_file_over_dir()) { + // Destination is a non-empty directory + ASSERT_RAISES(IOError, fs->CopyFile("def", "AB")); + } + if (!have_implicit_directories()) { + // Parent destination doesn't exist + ASSERT_RAISES(IOError, fs->CopyFile("AB/abc", "XX/mno")); + } // Parent destination is not a directory - ASSERT_RAISES(IOError, fs->Move("AB/abc", "def/mno")); + ASSERT_RAISES(IOError, fs->CopyFile("AB/abc", "def/mno")); AssertAllDirs(fs, all_dirs); AssertAllFiles(fs, {"AB/abc", "EF/ghi", "def"}); } @@ -435,15 +469,19 @@ void GenericFileSystemTest::TestCopyFile(FileSystem* fs) { void GenericFileSystemTest::TestGetTargetStatsSingle(FileSystem* fs) { ASSERT_OK(fs->CreateDir("AB/CD/EF")); CreateFile(fs, "AB/CD/ghi", "some data"); + CreateFile(fs, "AB/CD/jkl", "some other data"); FileStats st; - TimePoint first_time; + TimePoint first_dir_time, first_file_time; + ASSERT_OK(fs->GetTargetStats("AB", &st)); AssertFileStats(st, "AB", FileType::Directory); ASSERT_EQ(st.base_name(), "AB"); ASSERT_EQ(st.size(), kNoSize); - first_time = st.mtime(); - ValidateTimePoint(first_time); + first_dir_time = st.mtime(); + if (have_directory_mtimes()) { + ValidateTimePoint(first_dir_time); + } ASSERT_OK(fs->GetTargetStats("AB/CD/EF", &st)); AssertFileStats(st, "AB/CD/EF", FileType::Directory); @@ -451,12 +489,23 @@ void GenericFileSystemTest::TestGetTargetStatsSingle(FileSystem* fs) { ASSERT_EQ(st.size(), kNoSize); // AB/CD's creation can impact AB's modification time, however, AB/CD/EF's // creation doesn't, so AB/CD/EF's mtime should be after AB's. - AssertDurationBetween(st.mtime() - first_time, 0.0, kTimeSlack); + if (have_directory_mtimes()) { + AssertDurationBetween(st.mtime() - first_dir_time, 0.0, kTimeSlack); + } ASSERT_OK(fs->GetTargetStats("AB/CD/ghi", &st)); AssertFileStats(st, "AB/CD/ghi", FileType::File, 9); ASSERT_EQ(st.base_name(), "ghi"); - AssertDurationBetween(st.mtime() - first_time, 0.0, kTimeSlack); + first_file_time = st.mtime(); + // AB/CD/ghi's creation doesn't impact AB's modification time, + // so AB/CD/ghi's mtime should be after AB's. + if (have_directory_mtimes()) { + AssertDurationBetween(first_file_time - first_dir_time, 0.0, kTimeSlack); + } + ASSERT_OK(fs->GetTargetStats("AB/CD/jkl", &st)); + AssertFileStats(st, "AB/CD/jkl", FileType::File, 15); + // This file was created after the one above + AssertDurationBetween(st.mtime() - first_file_time, 0.0, kTimeSlack); ASSERT_OK(fs->GetTargetStats("zz", &st)); AssertFileStats(st, "zz", FileType::NonExistent); @@ -470,13 +519,15 @@ void GenericFileSystemTest::TestGetTargetStatsVector(FileSystem* fs) { CreateFile(fs, "AB/CD/ghi", "some data"); std::vector stats; - TimePoint first_time; + TimePoint dir_time, file_time; ASSERT_OK( fs->GetTargetStats({"AB", "AB/CD", "AB/zz", "zz", "XX/zz", "AB/CD/ghi"}, &stats)); ASSERT_EQ(stats.size(), 6); AssertFileStats(stats[0], "AB", FileType::Directory); - first_time = stats[0].mtime(); - ValidateTimePoint(first_time); + dir_time = stats[0].mtime(); + if (have_directory_mtimes()) { + ValidateTimePoint(dir_time); + } AssertFileStats(stats[1], "AB/CD", FileType::Directory); AssertFileStats(stats[2], "AB/zz", FileType::NonExistent); AssertFileStats(stats[3], "zz", FileType::NonExistent); @@ -484,13 +535,23 @@ void GenericFileSystemTest::TestGetTargetStatsVector(FileSystem* fs) { ASSERT_EQ(stats[4].size(), kNoSize); ASSERT_EQ(stats[4].mtime(), kNoTime); AssertFileStats(stats[5], "AB/CD/ghi", FileType::File, 9); - AssertDurationBetween(stats[5].mtime() - first_time, 0.0, kTimeSlack); + file_time = stats[5].mtime(); + if (have_directory_mtimes()) { + AssertDurationBetween(file_time - dir_time, 0.0, kTimeSlack); + } else { + ValidateTimePoint(file_time); + } // Check the mtime is the same from one call to the other FileStats st; - ASSERT_OK(fs->GetTargetStats("AB", &st)); - AssertFileStats(st, "AB", FileType::Directory); - ASSERT_EQ(st.mtime(), first_time); + if (have_directory_mtimes()) { + ASSERT_OK(fs->GetTargetStats("AB", &st)); + AssertFileStats(st, "AB", FileType::Directory); + ASSERT_EQ(st.mtime(), dir_time); + } + ASSERT_OK(fs->GetTargetStats("AB/CD/ghi", &st)); + AssertFileStats(st, "AB/CD/ghi", FileType::File, 9); + ASSERT_EQ(st.mtime(), file_time); } void GenericFileSystemTest::TestGetTargetStatsSelector(FileSystem* fs) { @@ -500,7 +561,7 @@ void GenericFileSystemTest::TestGetTargetStatsSelector(FileSystem* fs) { CreateFile(fs, "AB/CD/ghi", "some other data"); CreateFile(fs, "AB/CD/jkl", "yet other data"); - TimePoint first_time; + TimePoint first_dir_time, first_file_time; Selector s; s.base_dir = ""; std::vector stats; @@ -509,8 +570,10 @@ void GenericFileSystemTest::TestGetTargetStatsSelector(FileSystem* fs) { SortStats(&stats); ASSERT_EQ(stats.size(), 2); AssertFileStats(stats[0], "AB", FileType::Directory); - first_time = stats[0].mtime(); - ValidateTimePoint(first_time); + first_dir_time = stats[0].mtime(); + if (have_directory_mtimes()) { + ValidateTimePoint(first_dir_time); + } AssertFileStats(stats[1], "abc", FileType::File, 4); s.base_dir = "AB"; @@ -526,7 +589,11 @@ void GenericFileSystemTest::TestGetTargetStatsSelector(FileSystem* fs) { ASSERT_EQ(stats.size(), 2); AssertFileStats(stats[0], "AB/CD/ghi", FileType::File, 15); AssertFileStats(stats[1], "AB/CD/jkl", FileType::File, 14); - AssertDurationBetween(stats[1].mtime() - first_time, 0.0, kTimeSlack); + first_file_time = stats[0].mtime(); + if (have_directory_mtimes()) { + AssertDurationBetween(first_file_time - first_dir_time, 0.0, kTimeSlack); + } + AssertDurationBetween(stats[1].mtime() - first_file_time, 0.0, kTimeSlack); // Recursive s.base_dir = "AB"; @@ -535,15 +602,16 @@ void GenericFileSystemTest::TestGetTargetStatsSelector(FileSystem* fs) { SortStats(&stats); ASSERT_EQ(stats.size(), 4); AssertFileStats(stats[0], "AB/CD", FileType::Directory); - AssertFileStats(stats[1], "AB/CD/ghi", FileType::File, 15); + AssertFileStats(stats[1], "AB/CD/ghi", FileType::File, first_file_time, 15); AssertFileStats(stats[2], "AB/CD/jkl", FileType::File, 14); AssertFileStats(stats[3], "AB/def", FileType::File, 9); // Check the mtime is the same from one call to the other FileStats st; ASSERT_OK(fs->GetTargetStats("AB", &st)); - AssertFileStats(st, "AB", FileType::Directory); - ASSERT_EQ(st.mtime(), first_time); + AssertFileStats(st, "AB", FileType::Directory, first_dir_time); + ASSERT_OK(fs->GetTargetStats("AB/CD/ghi", &st)); + AssertFileStats(st, "AB/CD/ghi", FileType::File, first_file_time, 15); // Doesn't exist s.base_dir = "XX"; @@ -573,8 +641,9 @@ void GenericFileSystemTest::TestOpenOutputStream(FileSystem* fs) { AssertFileContents(fs, "abc", ""); // Parent does not exist - ASSERT_RAISES(IOError, fs->OpenOutputStream("abc/def", &stream)); - ASSERT_RAISES(IOError, fs->OpenOutputStream("ghi/jkl", &stream)); + if (!have_implicit_directories()) { + ASSERT_RAISES(IOError, fs->OpenOutputStream("AB/def", &stream)); + } AssertAllDirs(fs, {}); AssertAllFiles(fs, {"abc"}); @@ -600,12 +669,18 @@ void GenericFileSystemTest::TestOpenOutputStream(FileSystem* fs) { ASSERT_RAISES(Invalid, WriteString(stream.get(), "x")); // Stream is closed - // Cannot turn dir into file - ASSERT_RAISES(IOError, fs->OpenOutputStream("CD", &stream)); - AssertAllDirs(fs, {"CD"}); + if (!allow_write_file_over_dir()) { + // Cannot turn dir into file + ASSERT_RAISES(IOError, fs->OpenOutputStream("CD", &stream)); + AssertAllDirs(fs, {"CD"}); + } } void GenericFileSystemTest::TestOpenAppendStream(FileSystem* fs) { + if (!allow_append_to_file()) { + // XXX skip + return; + } std::shared_ptr stream; int64_t position = -1; diff --git a/cpp/src/arrow/filesystem/test_util.h b/cpp/src/arrow/filesystem/test_util.h index ee2e67dd5b8..9b130ac7b9f 100644 --- a/cpp/src/arrow/filesystem/test_util.h +++ b/cpp/src/arrow/filesystem/test_util.h @@ -29,6 +29,13 @@ namespace fs { static constexpr double kTimeSlack = 2.0; // In seconds +ARROW_EXPORT +void CreateFile(FileSystem* fs, const std::string& path, const std::string& data); + +// Sort a vector of FileStats by lexicographic path order +ARROW_EXPORT +void SortStats(std::vector* stats); + ARROW_EXPORT void AssertFileStats(const FileStats& st, const std::string& path, FileType type); @@ -45,11 +52,23 @@ void AssertFileStats(const FileStats& st, const std::string& path, FileType type int64_t size); ARROW_EXPORT -void CreateFile(FileSystem* fs, const std::string& path, const std::string& data); +void AssertFileStats(FileSystem* fs, const std::string& path, FileType type); -// Sort of vector of FileStats by lexicographic path order ARROW_EXPORT -void SortStats(std::vector* stats); +void AssertFileStats(FileSystem* fs, const std::string& path, FileType type, + TimePoint mtime); + +ARROW_EXPORT +void AssertFileStats(FileSystem* fs, const std::string& path, FileType type, + TimePoint mtime, int64_t size); + +ARROW_EXPORT +void AssertFileStats(FileSystem* fs, const std::string& path, FileType type, + int64_t size); + +ARROW_EXPORT +void AssertFileContents(FileSystem* fs, const std::string& path, + const std::string& expected_data); template void AssertDurationBetween(Duration d, double min_secs, double max_secs) { @@ -83,8 +102,22 @@ class ARROW_EXPORT GenericFileSystemTest { void TestOpenInputFile(); protected: + // This function should return the filesystem under test. virtual std::shared_ptr GetEmptyFileSystem() = 0; + // Override the following functions to specify deviations from expected + // filesystem semantics. + // - Whether the filesystem may "implicitly" create intermediate directories + virtual bool have_implicit_directories() const { return false; } + // - Whether the filesystem may allow writing a file "over" a directory + virtual bool allow_write_file_over_dir() const { return false; } + // - Whether the filesystem allows moving a directory + virtual bool allow_move_dir() const { return true; } + // - Whether the filesystem allows appending to a file + virtual bool allow_append_to_file() const { return true; } + // - Whether the filesystem supports directory modification times + virtual bool have_directory_mtimes() const { return true; } + void TestEmpty(FileSystem* fs); void TestCreateDir(FileSystem* fs); void TestDeleteDir(FileSystem* fs); diff --git a/cpp/src/arrow/testing/util.cc b/cpp/src/arrow/testing/util.cc index f30f704a8b9..f967482b6f1 100644 --- a/cpp/src/arrow/testing/util.cc +++ b/cpp/src/arrow/testing/util.cc @@ -59,6 +59,13 @@ void random_bytes(int64_t n, uint32_t seed, uint8_t* out) { std::generate(out, out + n, [&d, &gen] { return static_cast(d(gen)); }); } +std::string random_string(int64_t n, uint32_t seed) { + std::string s; + s.resize(static_cast(n)); + random_bytes(n, seed, reinterpret_cast(&s[0])); + return s; +} + int32_t DecimalSize(int32_t precision) { DCHECK_GE(precision, 1) << "decimal precision must be greater than or equal to 1, got " << precision; diff --git a/cpp/src/arrow/testing/util.h b/cpp/src/arrow/testing/util.h index d5f4dcaaa03..43b73e518cd 100644 --- a/cpp/src/arrow/testing/util.h +++ b/cpp/src/arrow/testing/util.h @@ -69,6 +69,7 @@ ARROW_EXPORT void random_null_bytes(int64_t n, double pct_null, uint8_t* null_by ARROW_EXPORT void random_is_valid(int64_t n, double pct_null, std::vector* is_valid, int random_seed = 0); ARROW_EXPORT void random_bytes(int64_t n, uint32_t seed, uint8_t* out); +ARROW_EXPORT std::string random_string(int64_t n, uint32_t seed); ARROW_EXPORT int32_t DecimalSize(int32_t precision); ARROW_EXPORT void random_decimals(int64_t n, uint32_t seed, int32_t precision, uint8_t* out); diff --git a/cpp/thirdparty/versions.txt b/cpp/thirdparty/versions.txt index d960cb0d007..8ffc69a7ba7 100644 --- a/cpp/thirdparty/versions.txt +++ b/cpp/thirdparty/versions.txt @@ -23,6 +23,7 @@ # `DEPENDENCIES` array (see the comment on top of the declaration for the # format). +AWSSDK_VERSION=1.7.160 BOOST_VERSION=1.67.0 BROTLI_VERSION=v1.0.7 BZIP2_VERSION=1.0.6 @@ -55,6 +56,7 @@ ZSTD_VERSION=v1.4.0 # generated archive file. The third field is the url of the project for the # given version. DEPENDENCIES=( + "ARROW_AWSSDK_URL aws-sdk-cpp-${AWSSDK_VERSION}.tar.gz https://github.com/aws/aws-sdk-cpp/archive/${AWSSDK_VERSION}.tar.gz" "ARROW_BOOST_URL boost-${BOOST_VERSION}.tar.gz https://dl.bintray.com/boostorg/release/${BOOST_VERSION}/source/boost_${BOOST_VERSION//./_}.tar.gz" "ARROW_BROTLI_URL brotli-${BROTLI_VERSION}.tar.gz https://github.com/google/brotli/archive/${BROTLI_VERSION}.tar.gz" "ARROW_CARES_URL cares-${CARES_VERSION}.tar.gz https://c-ares.haxx.se/download/c-ares-$CARES_VERSION.tar.gz"