diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index bf02d1a2d2a..b4c42f72a78 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -249,6 +249,10 @@ static|shared (default shared)") "Build the plasma object store java client" OFF) + option(ARROW_PLASMA_S3 + "Build S3 external store bindings for the plasma object store" + OFF) + #---------------------------------------------------------------------- # Thirdparty toolchain options diff --git a/cpp/cmake_modules/BuildUtils.cmake b/cpp/cmake_modules/BuildUtils.cmake index 1591d864a1a..6fe7ed11f07 100644 --- a/cpp/cmake_modules/BuildUtils.cmake +++ b/cpp/cmake_modules/BuildUtils.cmake @@ -629,3 +629,12 @@ function(ARROW_ADD_PKG_CONFIG MODULE) FILES "${CMAKE_CURRENT_BINARY_DIR}/${MODULE}.pc" DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/") endfunction() + +function(AWSSDK_TARGETS MODULE_LIST TARGETS) + # We will always link against these targets + set(awssdk_targets "curl_static" "awssdk_core_static") + foreach(MODULE IN LISTS MODULE_LIST) + list(APPEND awssdk_targets "awssdk_${MODULE}_static") + endforeach() + set(${TARGETS} ${awssdk_targets} PARENT_SCOPE) +endfunction(AWSSDK_TARGETS) diff --git a/cpp/cmake_modules/FindAWSSDK.cmake b/cpp/cmake_modules/FindAWSSDK.cmake new file mode 100644 index 00000000000..eef0c15411a --- /dev/null +++ b/cpp/cmake_modules/FindAWSSDK.cmake @@ -0,0 +1,95 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Tries to find AWSSDK headers and libraries. +# +# Usage of this module as follows: +# +# find_package(AWSSDK COMPONENTS service1 service2 ...) +# +# Variables used by this module, they can change the default behaviour and need +# to be set before calling find_package: +# +# AWSSDK_HOME - When set, this path is inspected instead of standard library +# locations as the root of the AWSSDK installation. +# The environment variable AWSSDK_HOME overrides this veriable. +# +# This module defines +# AWSSDK_INCLUDE_DIR, directory containing headers +# AWSSDK_LIBRARY_DIR, directory containing libraries +# AWSSDK_STATIC_LIBS, static AWS SDK libraries +# AWSSDK_FOUND, whether AWS SDK has been found + +if( NOT "${AWSSDK_HOME}" STREQUAL "") + file( TO_CMAKE_PATH "${AWSSDK_HOME}" _native_path ) + list( APPEND _awssdk_roots ${_native_path} ) +elseif ( AWSSDK_HOME ) + list( APPEND _awssdk_roots ${AWSSDK_HOME} ) +endif() + +set(AWSSDK_MODULES ${AWSSDK_FIND_COMPONENTS}) +set(AWSSDK_STATIC_LIBS) + +find_path(AWSSDK_INCLUDE_DIR NAMES aws/core/Aws.h + PATHS ${_awssdk_roots} + NO_DEFAULT_PATH + PATH_SUFFIXES "include") + +foreach(MODULE ${AWSSDK_MODULES}) + string(TOUPPER ${MODULE} MODULE_UPPERCASE) + find_library( AWSSDK_STATIC_${MODULE_UPPERCASE}_LIB NAMES ${CMAKE_STATIC_LIBRARY_PREFIX}aws-cpp-sdk-${MODULE}${CMAKE_STATIC_LIBRARY_SUFFIX} + PATHS ${_awssdk_roots} + NO_DEFAULT_PATH + PATH_SUFFIXES "lib/${CMAKE_LIBRARY_ARCHITECTURE}" "lib" ) + # Fail if any module library is missing + if (NOT AWSSDK_STATIC_${MODULE_UPPERCASE}_LIB) + set(LIBRARY_NOT_FOUND "YES") + else() + list(APPEND AWSSDK_STATIC_LIBS ${AWSSDK_STATIC_${MODULE_UPPERCASE}_LIB}) + endif() +endforeach() + +if (AWSSDK_INCLUDE_DIR AND NOT LIBRARY_NOT_FOUND) + set(AWSSDK_FOUND TRUE) + get_filename_component(AWSSDK_LIBRARY_DIR ${AWSSDK_STATIC_CORE_LIB} PATH) +else () + set(AWSSDK_FOUND FALSE) +endif () + +if (AWSSDK_FOUND) + if (NOT AWSSDK_FIND_QUIETLY) + message(STATUS "Found the AWSSDK libraries: ${AWSSDK_STATIC_LIBS}") + endif () +else () + set(AWSSDK_STATIC_LIBS) + if (NOT AWSSDK_FIND_QUIETLY) + set(AWSSDK_ERR_MSG "Could not find the AWSSDK library. Looked") + if ( _awssdk_roots ) + set(AWSSDK_ERR_MSG "${AWSSDK_ERR_MSG} in ${_awssdk_roots}.") + else () + set(AWSSDK_ERR_MSG "${AWSSDK_ERR_MSG} in system search paths.") + endif () + if (AWSSDK_FIND_REQUIRED) + message(FATAL_ERROR "${AWSSDK_ERR_MSG}") + else (AWSSDK_FIND_REQUIRED) + message(STATUS "${AWSSDK_ERR_MSG}") + endif (AWSSDK_FIND_REQUIRED) + endif () +endif () + +mark_as_advanced( + AWSSDK_INCLUDE_DIR + AWSSDK_LIBRARY_DIR + AWSSDK_STATIC_LIBS + AWSSDK_FOUND +) diff --git a/cpp/cmake_modules/FindCURL.cmake b/cpp/cmake_modules/FindCURL.cmake new file mode 100644 index 00000000000..3750250d13b --- /dev/null +++ b/cpp/cmake_modules/FindCURL.cmake @@ -0,0 +1,102 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Tries to find CURL headers and libraries. +# +# Usage of this module as follows: +# +# find_package(CURL) +# +# Variables used by this module, they can change the default behaviour and need +# to be set before calling find_package: +# +# CURL_HOME - When set, this path is inspected instead of standard library +# locations as the root of the CURL installation. +# The environment variable CURL_HOME overrides this variable. +# +# - Find CURL (curl/curl.h, libcurl.a) +# This module defines +# CURL_INCLUDE_DIR, directory containing headers +# CURL_STATIC_LIB, path to curl's static library +# CURL_FOUND, whether curl has been found + +if( NOT "${CURL_HOME}" STREQUAL "") + file( TO_CMAKE_PATH "${CURL_HOME}" _native_path ) + list( APPEND _curl_roots ${_native_path} ) +elseif ( CURL_HOME ) + list( APPEND _curl_roots ${CURL_HOME} ) +endif() + +if (MSVC) + # curl uses curl.lib for Windows. + set(CURL_LIB_NAME curllib) +else () + # curl uses libcurl.a for non Windows. + set(CURL_LIB_NAME + ${CMAKE_SHARED_LIBRARY_PREFIX}curl${CMAKE_STATIC_LIBRARY_SUFFIX}) +endif () + +# Try the parameterized roots, if they exist +if (_curl_roots) + find_path(CURL_INCLUDE_DIR NAMES curl/curl.h + PATHS ${_curl_roots} NO_DEFAULT_PATH + PATH_SUFFIXES "include") + find_library(CURL_STATIC_LIB + NAMES ${CURL_LIB_NAME} + PATHS ${_curl_roots} NO_DEFAULT_PATH + PATH_SUFFIXES "lib") +else () + pkg_check_modules(PKG_CURL libcurl) + if (PKG_CURL_FOUND) + set(CURL_INCLUDE_DIR ${PKG_CURL_INCLUDEDIR}) + find_library(CURL_STATIC_LIB + NAMES ${CURL_LIB_NAME} + PATHS ${PKG_CURL_LIBDIR} NO_DEFAULT_PATH) + else () + find_path(CURL_INCLUDE_DIR NAMES curl/curl.h) + find_library(CURL_STATIC_LIB NAMES ${CURL_LIB_NAME}) + endif () +endif () + +if (CURL_INCLUDE_DIR AND CURL_STATIC_LIB) + set(CURL_FOUND TRUE) +else () + set(CURL_FOUND FALSE) +endif () + +if (CURL_FOUND) + if (NOT CURL_FIND_QUIETLY) + if (CURL_STATIC_LIB) + message(STATUS "Found the CURL static library: ${CURL_STATIC_LIB}") + endif () + endif () +else () + if (NOT CURL_FIND_QUIETLY) + set(CURL_ERR_MSG "Could not find the CURL library. Looked in") + if ( _curl_roots ) + set(CURL_ERR_MSG "${CURL_ERR_MSG} ${_curl_roots}.") + else () + set(CURL_ERR_MSG "${CURL_ERR_MSG} system search paths.") + endif () + if (CURL_FIND_REQUIRED) + message(FATAL_ERROR "${CURL_ERR_MSG}") + else (CURL_FIND_REQUIRED) + message(STATUS "${CURL_ERR_MSG}") + endif (CURL_FIND_REQUIRED) + endif () +endif () + +mark_as_advanced( + CURL_INCLUDE_DIR + CURL_STATIC_LIB +) \ No newline at end of file diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake index e8ec7394231..bef0444eca0 100644 --- a/cpp/cmake_modules/ThirdpartyToolchain.cmake +++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake @@ -30,9 +30,11 @@ set(THIRDPARTY_DIR "${arrow_SOURCE_DIR}/thirdparty") if (NOT "$ENV{ARROW_BUILD_TOOLCHAIN}" STREQUAL "") + set(AWSSDK_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}") set(BROTLI_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}") set(BZ2_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}") set(CARES_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}") + set(CURL_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}") set(DOUBLE_CONVERSION_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}") set(FLATBUFFERS_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}") set(GFLAGS_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}") @@ -74,6 +76,9 @@ if (NOT "$ENV{ARROW_BUILD_TOOLCHAIN}" STREQUAL "") endif() # Home path for each third-party lib can be overriden with env vars +if (DEFINED ENV{AWSSDK_HOME}) + set(AWSSDK_HOME "$ENV{AWSSDK_HOME}") +endif() if (DEFINED ENV{BROTLI_HOME}) set(BROTLI_HOME "$ENV{BROTLI_HOME}") @@ -87,6 +92,10 @@ if (DEFINED ENV{CARES_HOME}) set(CARES_HOME "$ENV{CARES_HOME}") endif() +if (DEFINED ENV{CURL_HOME}) + set(CURL_HOME "$ENV{CURL_HOME}") +endif() + if (DEFINED ENV{DOUBLE_CONVERSION_HOME}) set(DOUBLE_CONVERSION_HOME "$ENV{DOUBLE_CONVERSION_HOME}") endif() @@ -176,6 +185,14 @@ if (ARROW_ORC OR ARROW_FLIGHT OR ARROW_GANDIVA) set(ARROW_WITH_PROTOBUF ON) endif() +if (ARROW_PLASMA_S3) + set(ARROW_WITH_AWSSDK ON) + set(ARROW_WITH_CURL ON) + set(ARROW_WITH_OPENSSL ON) + set(ARROW_WITH_ZLIB ON) + list(APPEND AWSSDK_MODULES "s3") +endif() + # ---------------------------------------------------------------------- # Versions and URLs for toolchain builds, which also can be used to configure # offline builds @@ -202,6 +219,12 @@ foreach(_VERSION_ENTRY ${TOOLCHAIN_VERSIONS_TXT}) set(${_LIB_NAME} "${_LIB_VERSION}") endforeach() +if (DEFINED ENV{ARROW_AWSSDK_URL}) + set(AWSSDK_SOURCE_URL "$ENV{ARROW_AWSSDK_URL}") +else() + set(AWSSDK_SOURCE_URL "https://github.com/aws/aws-sdk-cpp/archive/${AWSSDK_VERSION}.tar.gz") +endif() + if (DEFINED ENV{ARROW_BOOST_URL}) set(BOOST_SOURCE_URL "$ENV{ARROW_BOOST_URL}") else() @@ -222,6 +245,14 @@ else() set(CARES_SOURCE_URL "https://c-ares.haxx.se/download/c-ares-${CARES_VERSION}.tar.gz") endif() +if (DEFINED ENV{ARROW_CURL_URL}) + set(CURL_SOURCE_URL "$ENV{ARROW_CURL_URL}") +else() + string(REGEX REPLACE "\\." "_" CURL_VERSION_UNDERSCORES ${CURL_VERSION}) + set(CURL_SOURCE_URL + "https://github.com/curl/curl/releases/download/curl-${CURL_VERSION_UNDERSCORES}/curl-${CURL_VERSION}.tar.gz") +endif() + if (DEFINED ENV{ARROW_DOUBLE_CONVERSION_URL}) set(DOUBLE_CONVERSION_SOURCE_URL "$ENV{ARROW_DOUBLE_CONVERSION_URL}") else() @@ -1757,3 +1788,114 @@ if (ARROW_USE_GLOG) DEPS gflags_static) endif() endif() + +# ---------------------------------------------------------------------- +# libcurl + +if (ARROW_WITH_CURL) + + if("${CURL_HOME}" STREQUAL "") + find_package(CURL) + else() + find_package(CURL REQUIRED + PATHS "${CURL_HOME}") + endif() + + if (CURL_FOUND) + set(CURL_VENDORED 0) + else() + set(CURL_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/curl_ep/src/curl_ep") + set(CURL_HOME "${CURL_PREFIX}") + set(CURL_INCLUDE_DIR "${CURL_PREFIX}/include") + set(CURL_STATIC_LIB "${CURL_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}curl${CMAKE_STATIC_LIBRARY_SUFFIX}") + + # Use ./configure instead of CMake build for curl since curl's CMake build is unstable. + ExternalProject_Add(curl_ep + ${EP_LOG_OPTIONS} + INSTALL_DIR ${CURL_PREFIX} + URL ${CURL_SOURCE_URL} + BUILD_IN_SOURCE 1 + CONFIGURE_COMMAND ./configure --prefix=${CURL_PREFIX} --without-ssl --without-libidn2 --without-zlib --disable-ldap --enable-shared=no --enable-static=yes CXX=${CMAKE_CXX_COMPILER} CC=${CMAKE_C_COMPILER} CFLAGS=${EP_C_FLAGS} CXXFLAGS=${EP_CXX_FLAGS} + BUILD_COMMAND "$(MAKE)" + INSTALL_COMMAND "$(MAKE)" install + BUILD_BYPRODUCTS ${CURL_STATIC_LIB}) + set(CURL_VENDORED 1) + endif() + + include_directories(SYSTEM ${CURL_INCLUDE_DIR}) + + ADD_THIRDPARTY_LIB(curl + STATIC_LIB ${CURL_STATIC_LIB}) + if (CURL_VENDORED) + add_dependencies(curl_static curl_ep) + endif() + + message(STATUS "CURL include dir: ${CURL_INCLUDE_DIR}") + message(STATUS "CURL static library: ${CURL_STATIC_LIB}") +endif() + +# ---------------------------------------------------------------------- +# AWS SDK (C++) + +if (ARROW_WITH_AWSSDK) + + # Add pre-requisites for any SDK module + set(AWSSDK_MODULES "core" ${AWSSDK_MODULES}) + list(REMOVE_DUPLICATES AWSSDK_MODULES) + + if("${AWSSDK_HOME}" STREQUAL "") + find_package(AWSSDK COMPONENTS ${AWSSDK_MODULES}) + else() + find_package(AWSSDK REQUIRED + PATHS "${AWSSDK_HOME}" + COMPONENTS "${AWSSDK_MODULES}") + endif() + + if (AWSSDK_FOUND) + set(AWSSDK_VENDORED 0) + else() + set(AWSSDK_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/awssdk_ep/src/awssdk_ep") + set(AWSSDK_HOME "${AWSSDK_PREFIX}") + set(AWSSDK_INCLUDE_DIR "${AWSSDK_PREFIX}/include") + + foreach(MODULE ${AWSSDK_MODULES}) + string(TOUPPER ${MODULE} MODULE_UPPERCASE) + set(AWSSDK_STATIC_${MODULE_UPPERCASE}_LIB "${AWSSDK_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}aws-cpp-sdk-${MODULE}${CMAKE_STATIC_LIBRARY_SUFFIX}") + list(APPEND AWSSDK_STATIC_LIBS ${AWSSDK_STATIC_${MODULE_UPPERCASE}_LIB}) + endforeach() + + set(AWSSDK_CMAKE_ARGS + ${EP_COMMON_CMAKE_ARGS} + "-DCMAKE_INSTALL_PREFIX=${AWSSDK_PREFIX}" + "-DENABLE_TESTING=OFF" + "-DMINIMIZE_SIZE=ON" + "-DBUILD_SHARED_LIBS=OFF" + "-DFORCE_CURL=ON" + "-DZLIB_ROOT=${ZLIB_HOME}" + "-DCMAKE_PREFIX_PATH=${CURL_HOME}" + "-DBUILD_ONLY=s3") + + ExternalProject_Add(awssdk_ep + ${EP_LOG_OPTIONS} + INSTALL_DIR ${AWSSDK_PREFIX} + URL ${AWSSDK_SOURCE_URL} + CMAKE_ARGS ${AWSSDK_CMAKE_ARGS} + DEPENDS curl_static zlib_static + BUILD_BYPRODUCTS ${AWSSDK_STATIC_LIBS}) + set(AWSSDK_VENDORED 1) + endif() + + include_directories(SYSTEM ${AWSSDK_INCLUDE_DIR}) + + message(STATUS "AWSSDK include dir: ${AWSSDK_INCLUDE_DIR}") + foreach(MODULE ${AWSSDK_MODULES}) + string(REPLACE "-" "_" MODULE_UNDERSCORE ${MODULE}) + string(TOUPPER ${MODULE_UNDERSCORE} MODULE_UPPERCASE) + ADD_THIRDPARTY_LIB(awssdk_${MODULE_UNDERSCORE} + STATIC_LIB ${AWSSDK_STATIC_${MODULE_UPPERCASE}_LIB}) + if (AWSSDK_VENDORED) + add_dependencies(awssdk_${MODULE_UNDERSCORE}_static awssdk_ep) + endif() + message(STATUS "AWSSDK static ${MODULE} library: ${AWSSDK_STATIC_${MODULE_UPPERCASE}_LIB}") + endforeach() +endif() \ No newline at end of file diff --git a/cpp/src/plasma/CMakeLists.txt b/cpp/src/plasma/CMakeLists.txt index bb70e9e6417..2b64db551ca 100644 --- a/cpp/src/plasma/CMakeLists.txt +++ b/cpp/src/plasma/CMakeLists.txt @@ -123,6 +123,12 @@ endif() list(APPEND PLASMA_EXTERNAL_STORE_SOURCES "external_store.cc" "hash_table_store.cc") +if (ARROW_PLASMA_S3) + AWSSDK_TARGETS(s3 AWS_LINK_LIBS) + set(PLASMA_STATIC_LINK_LIBS ${PLASMA_STATIC_LINK_LIBS} ${AWS_LINK_LIBS}) + list(APPEND PLASMA_EXTERNAL_STORE_SOURCES "s3_store.cc") +endif() + # We use static libraries for the plasma_store_server executable so that it can # be copied around and used in different locations. add_executable(plasma_store_server ${PLASMA_EXTERNAL_STORE_SOURCES} store.cc) diff --git a/cpp/src/plasma/s3_store.cc b/cpp/src/plasma/s3_store.cc new file mode 100644 index 00000000000..234ae267b68 --- /dev/null +++ b/cpp/src/plasma/s3_store.cc @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include + +#include "arrow/util/logging.h" +#include "plasma/s3_store.h" + +namespace plasma { + +static Aws::String GetRegion() { + auto region = std::getenv("AWS_REGION"); + if (region == nullptr) { + return Aws::String("us-east-1"); // Default region + } + return Aws::String(region); +} + +Status S3Store::ExtractBucketAndKeyPrefix(const std::string& endpoint) { + std::string separator = ":/"; + std::size_t pos = endpoint.find(separator); + if (pos == std::string::npos) { + return Status::KeyError("Malformed endpoint " + endpoint); + } + // Decompose endpoint into URI (s3) and path elements + std::string uri = endpoint.substr(0, pos); + std::size_t path_pos = pos + separator.length(); + std::size_t path_len = endpoint.length() - separator.length() - uri.length(); + std::string path = endpoint.substr(path_pos, path_len); + + // Decompose path into bucket and key-prefix elements + auto bucket_end = std::find(path.begin() + 1, path.end(), '/'); + bucket_name_ = Aws::String(path.begin() + 1, bucket_end); + key_prefix_ = bucket_end == path.end() ? "" : Aws::String(bucket_end + 1, path.end()); + return Status::OK(); +} + +S3Store::S3Store() { Aws::InitAPI(options_); } + +S3Store::~S3Store() { Aws::ShutdownAPI(options_); } + +Status S3Store::Connect(const std::string& endpoint) { + RETURN_NOT_OK(ExtractBucketAndKeyPrefix(endpoint)); + ARROW_LOG(INFO) << "Connecting to s3 bucket \"" << bucket_name_ + << "\" with key-prefix \"" << key_prefix_ << "\""; + config_.region = GetRegion(); + client_ = std::make_shared(config_); + return Status::OK(); +} + +Status S3Store::Put(const std::vector& ids, + const std::vector>& data) { + std::vector put_callables; + for (size_t i = 0; i < ids.size(); ++i) { + Aws::S3::Model::PutObjectRequest request; + request.WithBucket(bucket_name_).WithKey(key_prefix_ + ids[i].hex().data()); + auto objectStream = Aws::MakeShared("DataStream"); + objectStream->write(reinterpret_cast(data[i]->data()), data[i]->size()); + objectStream->flush(); + request.SetBody(objectStream); + put_callables.push_back(client_->PutObjectCallable(request)); + } + + std::string err_msg; + for (auto& put_callable : put_callables) { + auto outcome = put_callable.get(); + if (!outcome.IsSuccess()) + err_msg += std::string(outcome.GetError().GetMessage().data()) + "\n"; + } + return err_msg.empty() ? Status::OK() : Status::IOError(err_msg); +} + +Status S3Store::Get(const std::vector& ids, + std::vector> buffers) { + buffers.resize(ids.size()); + std::vector get_callables; + for (const auto& id : ids) { + Aws::S3::Model::GetObjectRequest request; + request.WithBucket(bucket_name_).WithKey(key_prefix_ + id.hex().data()); + get_callables.push_back(client_->GetObjectCallable(request)); + } + + std::string err_msg; + for (size_t i = 0; i < get_callables.size(); ++i) { + auto outcome = get_callables[i].get(); + if (!outcome.IsSuccess()) + err_msg += std::string(outcome.GetError().GetMessage().data()) + "\n"; + auto in = std::make_shared(outcome.GetResult().GetBody().rdbuf()); + std::string object_data((std::istreambuf_iterator(*in)), + (std::istreambuf_iterator())); + std::memcpy(buffers[i]->mutable_data(), object_data.data(), object_data.size()); + } + return err_msg.empty() ? Status::OK() : Status::IOError(err_msg); +} + +REGISTER_EXTERNAL_STORE("s3", S3Store); + +} // namespace plasma diff --git a/cpp/src/plasma/s3_store.h b/cpp/src/plasma/s3_store.h new file mode 100644 index 00000000000..9e8a7208bbe --- /dev/null +++ b/cpp/src/plasma/s3_store.h @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef S3_STORE_H +#define S3_STORE_H + +#include +#include +#include +#include +#include + +#include "plasma/external_store.h" + +namespace plasma { + +// This provides the implementation of S3 as an external store to plasma +// Note: Populate the AWS_REGION environment variable to set the region for the s3 +// bucket (the default is us-east-1). + +class S3Store : public ExternalStore { + public: + S3Store(); + ~S3Store() override; + Status Connect(const std::string& endpoint) override; + Status Put(const std::vector& ids, + const std::vector>& data) override; + Status Get(const std::vector& ids, + std::vector> buffers) override; + + private: + Status ExtractBucketAndKeyPrefix(const std::string& endpoint); + + Aws::String bucket_name_; + Aws::String key_prefix_; + std::shared_ptr client_; + Aws::SDKOptions options_; + Aws::Client::ClientConfiguration config_; +}; + +} // namespace plasma + +#endif // PLASMA_S3_STORE_H diff --git a/cpp/thirdparty/versions.txt b/cpp/thirdparty/versions.txt index e62a37b0824..64d4233de67 100644 --- a/cpp/thirdparty/versions.txt +++ b/cpp/thirdparty/versions.txt @@ -23,9 +23,11 @@ # `DEPENDENCIES` array (see the comment on top of the declaration for the # format). +AWSSDK_VERSION=1.6.53 BOOST_VERSION=1.67.0 BROTLI_VERSION=v0.6.0 CARES_VERSION=1.15.0 +CURL_VERSION=7.60.0 DOUBLE_CONVERSION_VERSION=v3.1.1 FLATBUFFERS_VERSION=v1.10.0 GBENCHMARK_VERSION=v1.4.1 @@ -49,9 +51,11 @@ ZSTD_VERSION=v1.3.7 # generated archive file. The third field is the url of the project for the # given version. DEPENDENCIES=( + "ARROW_AWSSDK_URL awssdk-${AWSSDK_VERSION}.tar.gz https://github.com/aws/aws-sdk-cpp/archive/${AWSSDK_VERSION}.tar.gz" "ARROW_BOOST_URL boost-${BOOST_VERSION}.tar.gz https://dl.bintray.com/boostorg/release/${BOOST_VERSION}/source/boost_${BOOST_VERSION//./_}.tar.gz" "ARROW_BROTLI_URL brotli-${BROTLI_VERSION}.tar.gz https://github.com/google/brotli/archive/${BROTLI_VERSION}.tar.gz" "ARROW_CARES_URL cares-${CARES_VERSION}.tar.gz https://c-ares.haxx.se/download/c-ares-$CARES_VERSION.tar.gz" + "ARROW_CURL_URL curl-${CURL_VERSION}.tar.gz "https://github.com/curl/curl/releases/download/curl-${CURL_VERSION//./_}/curl-${CURL_VERSION}.tar.gz"" "ARROW_DOUBLE_CONVERSION_URL double-conversion-${DOUBLE_CONVERSION_VERSION}.tar.gz https://github.com/google/double-conversion/archive/${DOUBLE_CONVERSION_VERSION}.tar.gz" "ARROW_FLATBUFFERS_URL flatbuffers-${FLATBUFFERS_VERSION}.tar.gz https://github.com/google/flatbuffers/archive/${FLATBUFFERS_VERSION}.tar.gz" "ARROW_GBENCHMARK_URL gbenchmark-${GBENCHMARK_VERSION}.tar.gz https://github.com/google/benchmark/archive/${GBENCHMARK_VERSION}.tar.gz"