diff --git a/.github/workflows/cpp.yml b/.github/workflows/cpp.yml index 602561cba13..d926560cefa 100644 --- a/.github/workflows/cpp.yml +++ b/.github/workflows/cpp.yml @@ -129,6 +129,7 @@ jobs: if: ${{ !contains(github.event.pull_request.title, 'WIP') }} timeout-minutes: 75 env: + ARROW_AZURE: ON ARROW_BUILD_TESTS: ON ARROW_DATASET: ON ARROW_FLIGHT: ON @@ -168,6 +169,9 @@ jobs: ci/scripts/install_minio.sh latest /usr/local - name: Install Google Cloud Storage Testbench run: ci/scripts/install_gcs_testbench.sh default + - name: Install Azurite Storage Emulator + shell: bash + run: ci/scripts/install_azurite.sh - name: Setup ccache run: | ci/scripts/ccache_setup.sh @@ -305,6 +309,7 @@ jobs: - 32 - 64 env: + ARROW_AZURE: OFF ARROW_BUILD_SHARED: ON ARROW_BUILD_STATIC: OFF ARROW_BUILD_TESTS: ON diff --git a/ci/appveyor-cpp-build.bat b/ci/appveyor-cpp-build.bat index 538caaa5b5f..223b8832881 100644 --- a/ci/appveyor-cpp-build.bat +++ b/ci/appveyor-cpp-build.bat @@ -56,6 +56,7 @@ pushd cpp\build @rem and enable runtime assertions. cmake -G "%GENERATOR%" %CMAKE_ARGS% ^ + -DARROW_AZURE=OFF ^ -DARROW_BOOST_USE_SHARED=ON ^ -DARROW_BUILD_EXAMPLES=ON ^ -DARROW_BUILD_STATIC=OFF ^ diff --git a/ci/docker/ubuntu-18.04-cpp.dockerfile b/ci/docker/ubuntu-18.04-cpp.dockerfile index 2a056eda539..72490ee3c46 100644 --- a/ci/docker/ubuntu-18.04-cpp.dockerfile +++ b/ci/docker/ubuntu-18.04-cpp.dockerfile @@ -80,6 +80,7 @@ RUN apt-get update -y -q && \ libre2-dev \ libsnappy-dev \ libssl-dev \ + libxml2-dev \ ninja-build \ pkg-config \ protobuf-compiler \ @@ -105,7 +106,8 @@ RUN apt-get update -y -q && \ COPY ci/scripts/install_sccache.sh /arrow/ci/scripts/ RUN /arrow/ci/scripts/install_sccache.sh unknown-linux-musl /usr/local/bin -ENV ARROW_BUILD_STATIC=ON \ +ENV ARROW_AZURE=ON \ + ARROW_BUILD_STATIC=ON \ ARROW_BUILD_TESTS=ON \ ARROW_DATASET=ON \ ARROW_DEPENDENCY_SOURCE=SYSTEM \ diff --git a/ci/docker/ubuntu-20.04-cpp.dockerfile b/ci/docker/ubuntu-20.04-cpp.dockerfile index 1cd0581aa44..f8934da216e 100644 --- a/ci/docker/ubuntu-20.04-cpp.dockerfile +++ b/ci/docker/ubuntu-20.04-cpp.dockerfile @@ -91,10 +91,12 @@ RUN apt-get update -y -q && \ libssl-dev \ libthrift-dev \ libutf8proc-dev \ + libxml2-dev \ libzstd-dev \ make \ ninja-build \ nlohmann-json3-dev \ + npm \ pkg-config \ protobuf-compiler \ python3-dev \ @@ -114,6 +116,9 @@ RUN /arrow/ci/scripts/install_minio.sh latest /usr/local COPY ci/scripts/install_gcs_testbench.sh /arrow/ci/scripts/ RUN /arrow/ci/scripts/install_gcs_testbench.sh default +COPY ci/scripts/install_azurite.sh /arrow/ci/scripts/ +RUN /arrow/ci/scripts/install_azurite.sh + COPY ci/scripts/install_ceph.sh /arrow/ci/scripts/ RUN /arrow/ci/scripts/install_ceph.sh @@ -131,6 +136,7 @@ RUN /arrow/ci/scripts/install_sccache.sh unknown-linux-musl /usr/local/bin # ARROW-17051: this build uses static Protobuf, so we must also use # static Arrow to run Flight/Flight SQL tests ENV absl_SOURCE=BUNDLED \ + ARROW_AZURE=ON \ ARROW_BUILD_STATIC=ON \ ARROW_BUILD_TESTS=ON \ ARROW_DEPENDENCY_SOURCE=SYSTEM \ diff --git a/ci/docker/ubuntu-22.04-cpp.dockerfile b/ci/docker/ubuntu-22.04-cpp.dockerfile index 4bbb5c2b317..0b177e35e0e 100644 --- a/ci/docker/ubuntu-22.04-cpp.dockerfile +++ b/ci/docker/ubuntu-22.04-cpp.dockerfile @@ -90,10 +90,12 @@ RUN apt-get update -y -q && \ libsqlite3-dev \ libthrift-dev \ libutf8proc-dev \ + libxml2-dev \ libzstd-dev \ make \ ninja-build \ nlohmann-json3-dev \ + npm \ pkg-config \ protobuf-compiler \ protobuf-compiler-grpc \ @@ -147,6 +149,9 @@ RUN /arrow/ci/scripts/install_gcs_testbench.sh default COPY ci/scripts/install_sccache.sh /arrow/ci/scripts/ RUN /arrow/ci/scripts/install_sccache.sh unknown-linux-musl /usr/local/bin +COPY ci/scripts/install_azurite.sh /arrow/ci/scripts/ +RUN /arrow/ci/scripts/install_azurite.sh + # Prioritize system packages and local installation # The following dependencies will be downloaded due to missing/invalid packages # provided by the distribution: @@ -158,6 +163,7 @@ RUN /arrow/ci/scripts/install_sccache.sh unknown-linux-musl /usr/local/bin # ARROW-17051: this build uses static Protobuf, so we must also use # static Arrow to run Flight/Flight SQL tests ENV absl_SOURCE=BUNDLED \ + ARROW_AZURE=ON \ ARROW_BUILD_STATIC=ON \ ARROW_BUILD_TESTS=ON \ ARROW_DEPENDENCY_SOURCE=SYSTEM \ diff --git a/ci/scripts/cpp_build.sh b/ci/scripts/cpp_build.sh index b3d9e0d3ec1..a7792b73fb6 100755 --- a/ci/scripts/cpp_build.sh +++ b/ci/scripts/cpp_build.sh @@ -70,6 +70,7 @@ pushd ${build_dir} cmake \ -Dabsl_SOURCE=${absl_SOURCE:-} \ + -DARROW_AZURE=${ARROW_AZURE:-OFF} \ -DARROW_BOOST_USE_SHARED=${ARROW_BOOST_USE_SHARED:-ON} \ -DARROW_BUILD_BENCHMARKS_REFERENCE=${ARROW_BUILD_BENCHMARKS:-OFF} \ -DARROW_BUILD_BENCHMARKS=${ARROW_BUILD_BENCHMARKS:-OFF} \ diff --git a/ci/scripts/install_azurite.sh b/ci/scripts/install_azurite.sh new file mode 100755 index 00000000000..2e7008360fd --- /dev/null +++ b/ci/scripts/install_azurite.sh @@ -0,0 +1,37 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -e + +case "$(uname)" in + Darwin) + npm install -g azurite + which azurite + ;; + MINGW*) + choco install nodejs.install + npm install -g azurite + ;; + Linux) + npm install -g azurite + which azurite + ;; +esac +echo "node version = $(node --version)" +echo "azurite version = $(azurite --version)" \ No newline at end of file diff --git a/cpp/Brewfile b/cpp/Brewfile index 61fb619dc66..7184805ee73 100644 --- a/cpp/Brewfile +++ b/cpp/Brewfile @@ -32,6 +32,7 @@ brew "llvm" brew "llvm@12" brew "lz4" brew "ninja" +brew "node" brew "numpy" brew "openssl@1.1" brew "protobuf" diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 01d461d14a2..aaac71c5fb1 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -773,6 +773,11 @@ if(ARROW_WITH_OPENTELEMETRY) list(APPEND ARROW_STATIC_INSTALL_INTERFACE_LIBS CURL::libcurl) endif() +if(ARROW_AZURE) + list(APPEND ARROW_SHARED_LINK_LIBS ${AZURESDK_LINK_LIBRARIES}) + list(APPEND ARROW_STATIC_LINK_LIBS ${AZURESDK_LINK_LIBRARIES}) +endif() + if(ARROW_WITH_UTF8PROC) list(APPEND ARROW_SHARED_LINK_LIBS utf8proc::utf8proc) list(APPEND ARROW_STATIC_LINK_LIBS utf8proc::utf8proc) diff --git a/cpp/cmake_modules/DefineOptions.cmake b/cpp/cmake_modules/DefineOptions.cmake index b56918e602c..8d864bb1600 100644 --- a/cpp/cmake_modules/DefineOptions.cmake +++ b/cpp/cmake_modules/DefineOptions.cmake @@ -303,6 +303,9 @@ takes precedence over ccache if a storage backend is configured" ON) #---------------------------------------------------------------------- set_option_category("Project component") + define_option(ARROW_AZURE + "Build Arrow with Azure support (requires the Azure SDK for C++)" OFF) + define_option(ARROW_BUILD_UTILITIES "Build Arrow commandline utilities" OFF) define_option(ARROW_COMPUTE "Build the Arrow Compute Modules" OFF) diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake index d281075cf45..0f79b23d211 100644 --- a/cpp/cmake_modules/ThirdpartyToolchain.cmake +++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake @@ -463,6 +463,47 @@ else() "${THIRDPARTY_MIRROR_URL}/aws-sdk-cpp-${ARROW_AWSSDK_BUILD_VERSION}.tar.gz") endif() +if(DEFINED ENV{ARROW_AZURE_CORE_URL}) + set(AZURE_CORE_SOURCE_URL "$ENV{ARROW_AZURE_CORE_URL}") +else() + set_urls(AZURE_CORE_SOURCE_URL + "https://github.com/Azure/azure-sdk-for-cpp/archive/azure-core_${ARROW_AZURE_CORE_BUILD_VERSION}.tar.gz" + ) +endif() + +if(DEFINED ENV{ARROW_AZURE_IDENTITY_URL}) + set(AZURE_IDENTITY_SOURCE_URL "$ENV{ARROW_AZURE_IDENTITY_URL}") +else() + set_urls(AZURE_IDENTITY_SOURCE_URL + "https://github.com/Azure/azure-sdk-for-cpp/archive/azure-identity_${ARROW_AZURE_IDENTITY_BUILD_VERSION}.tar.gz" + ) +endif() + +if(DEFINED ENV{ARROW_AZURE_STORAGE_BLOBS_URL}) + set(AZURE_STORAGE_BLOBS_SOURCE_URL "$ENV{ARROW_AZURE_STORAGE_BLOBS_URL}") +else() + set_urls(AZURE_STORAGE_BLOBS_SOURCE_URL + "https://github.com/Azure/azure-sdk-for-cpp/archive/azure-storage-blobs_${ARROW_AZURE_STORAGE_BLOBS_BUILD_VERSION}.tar.gz" + ) +endif() + +if(DEFINED ENV{ARROW_AZURE_STORAGE_COMMON_URL}) + set(AZURE_STORAGE_COMMON_SOURCE_URL "$ENV{ARROW_AZURE_STORAGE_COMMON_URL}") +else() + set_urls(AZURE_STORAGE_COMMON_SOURCE_URL + "https://github.com/Azure/azure-sdk-for-cpp/archive/azure-storage-common_${ARROW_AZURE_STORAGE_COMMON_BUILD_VERSION}.tar.gz" + ) +endif() + +if(DEFINED ENV{ARROW_AZURE_STORAGE_FILES_DATALAKE_URL}) + set(AZURE_STORAGE_FILES_DATALAKE_SOURCE_URL + "$ENV{ARROW_AZURE_STORAGE_FILES_DATALAKE_URL}") +else() + set_urls(AZURE_STORAGE_FILES_DATALAKE_SOURCE_URL + "https://github.com/Azure/azure-sdk-for-cpp/archive/azure-storage-files-datalake_${ARROW_AZURE_STORAGE_FILES_DATALAKE_BUILD_VERSION}.tar.gz" + ) +endif() + if(DEFINED ENV{ARROW_BOOST_URL}) set(BOOST_SOURCE_URL "$ENV{ARROW_BOOST_URL}") else() @@ -4763,6 +4804,162 @@ if(ARROW_S3) endif() endif() +macro(build_azuresdk) + message(STATUS "Building Azure C++ SDK from source") + + find_curl() + find_package(LibXml2 REQUIRED) + find_package(OpenSSL ${ARROW_OPENSSL_REQUIRED_VERSION} REQUIRED) + + set(AZURESDK_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/azuresdk_ep-install") + set(AZURESDK_INCLUDE_DIR "${AZURESDK_PREFIX}/include") + set(AZURESDK_LIB_DIR "lib") + + # provide hint for Azure SDK to link with the already located openssl + get_filename_component(OPENSSL_ROOT_HINT "${OPENSSL_INCLUDE_DIR}" DIRECTORY) + + set(AZURESDK_COMMON_CMAKE_ARGS + ${EP_COMMON_CMAKE_ARGS} + "-DCMAKE_INSTALL_PREFIX=${AZURESDK_PREFIX}" + "-DCMAKE_PREFIX_PATH=${AZURESDK_PREFIX}" + -DBUILD_SHARED_LIBS=OFF + -DCMAKE_INSTALL_LIBDIR=${AZURESDK_LIB_DIR} + -DDISABLE_AZURE_CORE_OPENTELEMETRY=ON + -DENABLE_TESTING=OFF + -DENABLE_UNITY_BUILD=ON + -DOPENSSL_ROOT_DIR=${OPENSSL_ROOT_HINT} + -DWARNINGS_AS_ERRORS=OFF) + + file(MAKE_DIRECTORY ${AZURESDK_INCLUDE_DIR}) + + set(AZURE_CORE_STATIC_LIBRARY + "${AZURESDK_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}azure-core${CMAKE_STATIC_LIBRARY_SUFFIX}" + ) + externalproject_add(azure_core_ep + ${EP_LOG_OPTIONS} + INSTALL_DIR ${AZURESDK_PREFIX} + URL ${AZURE_CORE_SOURCE_URL} + URL_HASH "SHA256=${ARROW_AZURE_CORE_BUILD_SHA256_CHECKSUM}" + CMAKE_ARGS ${AZURESDK_COMMON_CMAKE_ARGS} + BUILD_BYPRODUCTS ${AZURE_CORE_STATIC_LIBRARY}) + add_library(Azure::azure-core STATIC IMPORTED) + set_target_properties(Azure::azure-core + PROPERTIES IMPORTED_LOCATION "${AZURE_CORE_STATIC_LIBRARY}" + INTERFACE_INCLUDE_DIRECTORIES + "${AZURESDK_INCLUDE_DIR}") + target_link_libraries(Azure::azure-core INTERFACE CURL::libcurl LibXml2::LibXml2) + add_dependencies(Azure::azure-core azure_core_ep) + + set(AZURE_IDENTITY_STATIC_LIBRARY + "${AZURESDK_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}azure-identity${CMAKE_STATIC_LIBRARY_SUFFIX}" + ) + externalproject_add(azure_identity_ep + ${EP_LOG_OPTIONS} + INSTALL_DIR ${AZURESDK_PREFIX} + URL ${AZURE_IDENTITY_SOURCE_URL} + URL_HASH "SHA256=${ARROW_AZURE_IDENTITY_BUILD_SHA256_CHECKSUM}" + CMAKE_ARGS ${AZURESDK_COMMON_CMAKE_ARGS} + BUILD_BYPRODUCTS ${AZURE_IDENTITY_STATIC_LIBRARY}) + add_library(Azure::azure-identity STATIC IMPORTED) + set_target_properties(Azure::azure-identity + PROPERTIES IMPORTED_LOCATION "${AZURE_IDENTITY_STATIC_LIBRARY}" + INTERFACE_INCLUDE_DIRECTORIES + "${AZURESDK_INCLUDE_DIR}") + target_link_libraries(Azure::azure-identity INTERFACE CURL::libcurl LibXml2::LibXml2) + add_dependencies(Azure::azure-identity azure_identity_ep) + + set(AZURE_STORAGE_BLOBS_STATIC_LIBRARY + "${AZURESDK_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}azure-storage-blobs${CMAKE_STATIC_LIBRARY_SUFFIX}" + ) + externalproject_add(azure_storage_blobs_ep + ${EP_LOG_OPTIONS} + INSTALL_DIR ${AZURESDK_PREFIX} + URL ${AZURE_STORAGE_BLOBS_SOURCE_URL} + URL_HASH "SHA256=${ARROW_AZURE_STORAGE_BLOBS_BUILD_SHA256_CHECKSUM}" + CMAKE_ARGS ${AZURESDK_COMMON_CMAKE_ARGS} + BUILD_BYPRODUCTS ${AZURE_STORAGE_BLOBS_STATIC_LIBRARY}) + add_library(Azure::azure-storage-blobs STATIC IMPORTED) + set_target_properties(Azure::azure-storage-blobs + PROPERTIES IMPORTED_LOCATION + "${AZURE_STORAGE_BLOBS_STATIC_LIBRARY}" + INTERFACE_INCLUDE_DIRECTORIES + "${AZURESDK_INCLUDE_DIR}") + target_link_libraries(Azure::azure-storage-blobs + INTERFACE Azure::azure-core CURL::libcurl LibXml2::LibXml2) + add_dependencies(Azure::azure-storage-blobs azure_storage_blobs_ep) + + set(AZURE_STORAGE_COMMON_STATIC_LIBRARY + "${AZURESDK_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}azure-storage-common${CMAKE_STATIC_LIBRARY_SUFFIX}" + ) + externalproject_add(azure_storage_common_ep + ${EP_LOG_OPTIONS} + INSTALL_DIR ${AZURESDK_PREFIX} + URL ${AZURE_STORAGE_COMMON_SOURCE_URL} + URL_HASH "SHA256=${ARROW_AZURE_STORAGE_COMMON_BUILD_SHA256_CHECKSUM}" + CMAKE_ARGS ${AZURESDK_COMMON_CMAKE_ARGS} + BUILD_BYPRODUCTS ${AZURE_STORAGE_COMMON_STATIC_LIBRARY}) + add_library(Azure::azure-storage-common STATIC IMPORTED) + set_target_properties(Azure::azure-storage-common + PROPERTIES IMPORTED_LOCATION + "${AZURE_STORAGE_COMMON_STATIC_LIBRARY}" + INTERFACE_INCLUDE_DIRECTORIES + "${AZURESDK_INCLUDE_DIR}") + target_link_libraries(Azure::azure-storage-common INTERFACE CURL::libcurl + LibXml2::LibXml2) + add_dependencies(Azure::azure-storage-common azure_storage_common_ep) + set_property(TARGET Azure::azure-storage-common PROPERTY INTERFACE_LINK_LIBRARIES + OpenSSL::Crypto) + + set(AZURE_STORAGE_FILES_DATALAKE_STATIC_LIBRARY + "${AZURESDK_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}azure-storage-files-datalake${CMAKE_STATIC_LIBRARY_SUFFIX}" + ) + externalproject_add(azure_storage_files_datalake_ep + ${EP_LOG_OPTIONS} + INSTALL_DIR ${AZURESDK_PREFIX} + URL ${AZURE_STORAGE_FILES_DATALAKE_SOURCE_URL} + URL_HASH "SHA256=${ARROW_AZURE_STORAGE_FILES_DATALAKE_BUILD_SHA256_CHECKSUM}" + CMAKE_ARGS ${AZURESDK_COMMON_CMAKE_ARGS} + BUILD_BYPRODUCTS ${AZURE_STORAGE_FILES_DATALAKE_STATIC_LIBRARY}) + add_library(Azure::azure-storage-files-datalake STATIC IMPORTED) + set_target_properties(Azure::azure-storage-files-datalake + PROPERTIES IMPORTED_LOCATION + "${AZURE_STORAGE_FILES_DATALAKE_STATIC_LIBRARY}" + INTERFACE_INCLUDE_DIRECTORIES + "${AZURESDK_INCLUDE_DIR}") + target_link_libraries(Azure::azure-storage-files-datalake + INTERFACE Azure::azure-core + Azure::azure-identity + Azure::azure-storage-blobs + Azure::azure-storage-common + CURL::libcurl + LibXml2::LibXml2) + add_dependencies(Azure::azure-storage-files-datalake azure_storage_files_datalake_ep) + + set(AZURESDK_LIBRARIES) + list(APPEND + AZURESDK_LIBRARIES + Azure::azure-core + Azure::azure-storage-blobs + Azure::azure-identity + Azure::azure-storage-common + Azure::azure-storage-files-datalake) + list(APPEND + ARROW_BUNDLED_STATIC_LIBS + Azure::azure-core + Azure::azure-storage-blobs + Azure::azure-identity + Azure::azure-storage-common + Azure::azure-storage-files-datalake) + + set(AZURESDK_LINK_LIBRARIES ${AZURESDK_LIBRARIES}) +endmacro() + +if(ARROW_AZURE) + build_azuresdk() + message(STATUS "Found Azure SDK headers: ${AZURESDK_INCLUDE_DIR}") + message(STATUS "Found Azure SDK libraries: ${AZURESDK_LINK_LIBRARIES}") +endif() + # ---------------------------------------------------------------------- # ucx - communication framework for modern, high-bandwidth and low-latency networks diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 23f0a7c9f1a..4be626f296f 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -481,6 +481,12 @@ if(ARROW_FILESYSTEM) filesystem/path_util.cc filesystem/util_internal.cc) + if(ARROW_AZURE) + list(APPEND ARROW_SRCS filesystem/azurefs.cc) + set_source_files_properties(filesystem/azurefs.cc + PROPERTIES SKIP_PRECOMPILE_HEADERS ON + SKIP_UNITY_BUILD_INCLUSION ON) + endif() if(ARROW_GCS) list(APPEND ARROW_SRCS filesystem/gcsfs.cc filesystem/gcsfs_internal.cc) set_source_files_properties(filesystem/gcsfs.cc filesystem/gcsfs_internal.cc diff --git a/cpp/src/arrow/filesystem/CMakeLists.txt b/cpp/src/arrow/filesystem/CMakeLists.txt index 6888231a35a..e5a35bf8fdc 100644 --- a/cpp/src/arrow/filesystem/CMakeLists.txt +++ b/cpp/src/arrow/filesystem/CMakeLists.txt @@ -47,6 +47,15 @@ if(ARROW_GCS) Boost::system) endif() +if(ARROW_AZURE) + add_arrow_test(azurefs_test + EXTRA_LABELS + filesystem + EXTRA_LINK_LIBS + Boost::filesystem + Boost::system) +endif() + if(ARROW_S3) add_arrow_test(s3fs_test SOURCES diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc new file mode 100644 index 00000000000..485f8afcd94 --- /dev/null +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -0,0 +1,2014 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/filesystem/azurefs.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "arrow/util/windows_fixup.h" + +#include "arrow/buffer.h" +#include "arrow/filesystem/filesystem.h" +#include "arrow/filesystem/path_util.h" +#include "arrow/filesystem/util_internal.h" +#include "arrow/io/interfaces.h" +#include "arrow/io/memory.h" +#include "arrow/io/util_internal.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/async_generator.h" +#include "arrow/util/atomic_shared_ptr.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/future.h" +#include "arrow/util/key_value_metadata.h" +#include "arrow/util/logging.h" +#include "arrow/util/optional.h" +#include "arrow/util/string.h" +#include "arrow/util/task_group.h" +#include "arrow/util/thread_pool.h" + +namespace arrow { + +using internal::Uri; + +namespace fs { + +static const char kSep = '/'; + +// ----------------------------------------------------------------------- +// AzureOptions implementation + +AzureOptions::AzureOptions() {} + +Result AzureOptions::GetAccountNameFromConnectionString( + const std::string& connection_string) { + std::string text = "AccountName="; + auto pos_text = connection_string.find(text); + if (pos_text == std::string::npos) { + return Status::Invalid( + "Cannot find account name in Azure Blob Storage connection string: '", + connection_string, "'"); + } + auto pos_semicolon = connection_string.find(';'); + pos_semicolon = connection_string.find(';', pos_semicolon + 1); + if (pos_semicolon == std::string::npos) { + return Status::Invalid("Invalid Azure Blob Storage connection string: '", + connection_string, "' passed"); + } + std::string account_name = + connection_string.substr(pos_text + text.size(), pos_semicolon); + return account_name; +} + +Status AzureOptions::ConfigureAnonymousCredentials(const std::string& account_name) { + account_dfs_url = "https://" + account_name + ".dfs.core.windows.net/"; + account_blob_url = "https://" + account_name + ".blob.core.windows.net/"; + credentials_kind = AzureCredentialsKind::Anonymous; + return Status::OK(); +} + +Status AzureOptions::ConfigureAccountKeyCredentials(const std::string& account_name, + const std::string& account_key) { + if (this->is_azurite) { + account_blob_url = "http://127.0.0.1:10000/" + account_name + "/"; + account_dfs_url = "http://127.0.0.1:10000/" + account_name + "/"; + } else { + account_dfs_url = "https://" + account_name + ".dfs.core.windows.net/"; + account_blob_url = "https://" + account_name + ".blob.core.windows.net/"; + } + storage_credentials_provider = + std::make_shared(account_name, + account_key); + credentials_kind = AzureCredentialsKind::StorageCredentials; + return Status::OK(); +} + +Status AzureOptions::ConfigureConnectionStringCredentials( + const std::string& connection_string_key) { + ARROW_ASSIGN_OR_RAISE(auto account_name, + GetAccountNameFromConnectionString(connection_string_key)); + account_dfs_url = "https://" + account_name + ".dfs.core.windows.net/"; + account_blob_url = "https://" + account_name + ".blob.core.windows.net/"; + connection_string = connection_string_key; + credentials_kind = AzureCredentialsKind::ConnectionString; + return Status::OK(); +} + +Status AzureOptions::ConfigureServicePrincipleCredentials( + const std::string& account_name, const std::string& tenant_id, + const std::string& client_id, const std::string& client_secret) { + account_dfs_url = "https://" + account_name + ".dfs.core.windows.net/"; + account_blob_url = "https://" + account_name + ".blob.core.windows.net/"; + service_principle_credentials_provider = + std::make_shared(tenant_id, client_id, + client_secret); + credentials_kind = AzureCredentialsKind::ServicePrincipleCredentials; + return Status::OK(); +} + +Status AzureOptions::ConfigureSasCredentials(const std::string& uri) { + Uri url; + RETURN_NOT_OK(url.Parse(uri)); + sas_token = "?" + url.query_string(); + account_blob_url = url.scheme() + "://" + url.host() + kSep; + account_dfs_url = std::regex_replace(account_blob_url, std::regex("[.]blob"), ".dfs"); + credentials_kind = AzureCredentialsKind::Sas; + return Status::OK(); +} + +bool AzureOptions::Equals(const AzureOptions& other) const { + return (scheme == other.scheme && account_dfs_url == other.account_dfs_url && + account_blob_url == other.account_blob_url && + credentials_kind == other.credentials_kind); +} + +Result AzureOptions::FromAnonymous(const std::string& account_name) { + AzureOptions options; + RETURN_NOT_OK(options.ConfigureAnonymousCredentials(account_name)); + return options; +} + +Result AzureOptions::FromAccountKey(const std::string& account_name, + const std::string& account_key) { + AzureOptions options; + RETURN_NOT_OK(options.ConfigureAccountKeyCredentials(account_name, account_key)); + return options; +} + +Result AzureOptions::FromConnectionString( + const std::string& connection_string) { + AzureOptions options; + RETURN_NOT_OK(options.ConfigureConnectionStringCredentials(connection_string)); + return options; +} + +Result AzureOptions::FromServicePrincipleCredential( + const std::string& account_name, const std::string& tenant_id, + const std::string& client_id, const std::string& client_secret) { + AzureOptions options; + RETURN_NOT_OK(options.ConfigureServicePrincipleCredentials(account_name, tenant_id, + client_id, client_secret)); + return options; +} + +Result AzureOptions::FromSas(const std::string& uri) { + AzureOptions options; + RETURN_NOT_OK(options.ConfigureSasCredentials(uri)); + return options; +} + +Result AzureOptions::FromUri(const std::string& uri_string, + std::string* out_path) { + Uri uri; + RETURN_NOT_OK(uri.Parse(uri_string)); + return FromUri(uri, out_path); +} + +Result AzureOptions::FromUri(const Uri& uri, std::string* out_path) { + // uri = + // https://accountName.dfs.core.windows.net/pathToBlob/?sas_token_key=sas_token_value + AzureOptions options; + // host = accountName.dfs.core.windows.net + const auto host = uri.host(); + // path_to_blob = /pathToBlob/ + const auto path_to_blob = uri.path(); + std::string account_name; + if (host.empty()) { + return Status::IOError("Missing container in Azure Blob Storage URI: '", + uri.ToString(), "'"); + } + auto pos = host.find('.'); + if (pos == std::string::npos) { + return Status::IOError("Missing container in Azure Blob Storage URI: '", + uri.ToString(), "'"); + } + std::string full_path = path_to_blob; + // account_name = accountName + account_name = host.substr(0, pos); + if (full_path.empty()) { + full_path = account_name; + } else { + if (full_path[0] != '/') { + return Status::IOError("Azure Blob Storage URI should be absolute, not relative"); + } + // full_path = accountName/pathToBlob/ + full_path = account_name + path_to_blob; + } + if (out_path != nullptr) { + *out_path = std::string(internal::RemoveTrailingSlash(full_path)); + } + // scheme = https + options.scheme = uri.scheme(); + // query_string = sas_token_key=sas_token_value + const auto query_string = uri.query_string(); + if (!query_string.empty()) { + // Accepted Uri = + // https://accountName.dfs.core.windows.net/pathToBlob/?sas_token_key=sas_token_value + RETURN_NOT_OK(options.ConfigureSasCredentials(uri.scheme() + "://" + host + + path_to_blob + "?" + query_string)); + } else { + RETURN_NOT_OK(options.ConfigureAnonymousCredentials(account_name)); + } + return options; +} + +namespace { + +struct AzurePath { + std::string full_path; + std::string container; + std::string path_to_file; + std::vector path_to_file_parts; + + static Result FromString(const std::string& s) { + // https://synapsemladlsgen2.dfs.core.windows.net/synapsemlfs/testdir/testfile.txt + // container = synapsemlfs + // account_name = synapsemladlsgen2 + // path_to_file = testdir/testfile.txt + // path_to_file_parts = [testdir, testfile.txt] + + // Expected input here => s = synapsemlfs/testdir/testfile.txt, + // http://127.0.0.1/accountName/pathToBlob + auto src = internal::RemoveTrailingSlash(s); + if (arrow::internal::StartsWith(src, "https://127.0.0.1") || arrow::internal::StartsWith(src, "http://127.0.0.1")) { + RETURN_NOT_OK(FromLocalHostString(&src)); + } + auto input_path = std::string(src.data()); + if (internal::IsLikelyUri(input_path)) { + RETURN_NOT_OK(ExtractBlobPath(&input_path)); + src = std::string_view(input_path); + } + src = internal::RemoveLeadingSlash(src); + auto first_sep = src.find_first_of(kSep); + if (first_sep == 0) { + return Status::IOError("Path cannot start with a separator ('", input_path, "')"); + } + if (first_sep == std::string::npos) { + return AzurePath{std::string(src), std::string(src), "", {}}; + } + AzurePath path; + path.full_path = std::string(src); + path.container = std::string(src.substr(0, first_sep)); + path.path_to_file = std::string(src.substr(first_sep + 1)); + path.path_to_file_parts = internal::SplitAbstractPath(path.path_to_file); + RETURN_NOT_OK(Validate(&path)); + return path; + } + + static Status FromLocalHostString(std::string_view* src) { + // src = http://127.0.0.1:10000/accountName/pathToBlob + Uri uri; + RETURN_NOT_OK(uri.Parse(src->data())); + *src = internal::RemoveLeadingSlash(uri.path()); + if (src->empty()) { + return Status::IOError("Missing account name in Azure Blob Storage URI"); + } + auto first_sep = src->find_first_of(kSep); + if (first_sep != std::string::npos) { + *src = src->substr(first_sep + 1); + } else { + *src = ""; + } + return Status::OK(); + } + + // Removes scheme, host and port from the uri + static Status ExtractBlobPath(std::string* src) { + Uri uri; + RETURN_NOT_OK(uri.Parse(*src)); + *src = uri.path(); + return Status::OK(); + } + + static Status Validate(const AzurePath* path) { + auto status = internal::ValidateAbstractPathParts(path->path_to_file_parts); + if (!status.ok()) { + return Status::Invalid(status.message(), " in path ", path->full_path); + } else { + return status; + } + } + + AzurePath parent() const { + DCHECK(has_parent()); + auto parent = AzurePath{"", container, "", path_to_file_parts}; + parent.path_to_file_parts.pop_back(); + parent.path_to_file = internal::JoinAbstractPath(parent.path_to_file_parts); + if (parent.path_to_file.empty()) { + parent.full_path = parent.container; + } else { + parent.full_path = parent.container + kSep + parent.path_to_file; + } + return parent; + } + + bool has_parent() const { return !path_to_file.empty(); } + + bool empty() const { return container.empty() && path_to_file.empty(); } + + bool operator==(const AzurePath& other) const { + return container == other.container && path_to_file == other.path_to_file; + } +}; + +template +std::shared_ptr GetObjectMetadata(const ObjectResult& result) { + auto md = std::make_shared(); + for (auto prop : result) { + md->Append(prop.first, prop.second); + } + return md; +} + +template +Result> InitServiceClient(const AzureOptions& options, + const std::string& url) { + std::shared_ptr client; + if (options.credentials_kind == AzureCredentialsKind::StorageCredentials) { + client = std::make_shared(url, options.storage_credentials_provider); + } else if (options.credentials_kind == + AzureCredentialsKind::ServicePrincipleCredentials) { + client = std::make_shared(url, options.service_principle_credentials_provider); + } else if (options.credentials_kind == AzureCredentialsKind::ConnectionString) { + client = + std::make_shared(T::CreateFromConnectionString(options.connection_string)); + } else if (options.credentials_kind == AzureCredentialsKind::Sas) { + client = std::make_shared(url + options.sas_token); + } else { + client = std::make_shared(url); + } + return client; +} + +template +Result> InitPathClient(const AzureOptions& options, + const std::string& path, + const std::string& container, + const std::string& path_to_file) { + std::shared_ptr client; + if (options.credentials_kind == AzureCredentialsKind::StorageCredentials) { + client = std::make_shared(path, options.storage_credentials_provider); + } else if (options.credentials_kind == + AzureCredentialsKind::ServicePrincipleCredentials) { + client = std::make_shared(path, options.service_principle_credentials_provider); + } else if (options.credentials_kind == AzureCredentialsKind::ConnectionString) { + client = std::make_shared(T::CreateFromConnectionString(options.connection_string, + container, path_to_file)); + } else if (options.credentials_kind == AzureCredentialsKind::Sas) { + auto src = internal::RemoveLeadingSlash(path); + auto first_sep = src.find("dfs.core.windows.net"); + std::string p; + if (first_sep != std::string::npos) { + p = std::string(src.substr(0, first_sep)) + "blob" + + std::string(src.substr(first_sep + 3)); + client = std::make_shared(p + options.sas_token); + } else { + client = std::make_shared(path); + } + } else { + client = std::make_shared(path); + } + return client; +} + +class ObjectInputFile final : public io::RandomAccessFile { + public: + ObjectInputFile( + std::shared_ptr& path_client, + std::shared_ptr& file_client, + const io::IOContext& io_context, const AzurePath& path, int64_t size = kNoSize) + : path_client_(std::move(path_client)), + file_client_(std::move(file_client)), + io_context_(io_context), + path_(path), + content_length_(size) {} + + Status Init() { + if (content_length_ != kNoSize) { + DCHECK_GE(content_length_, 0); + return Status::OK(); + } + try { + auto properties = path_client_->GetProperties(); + if (properties.Value.IsDirectory) { + return ::arrow::fs::internal::NotAFile(path_.full_path); + } + content_length_ = properties.Value.FileSize; + metadata_ = GetObjectMetadata(properties.Value.Metadata); + return Status::OK(); + } catch (const Azure::Storage::StorageException& exception) { + return ::arrow::fs::internal::PathNotFound(path_.full_path); + } + } + + Status CheckClosed() const { + if (closed_) { + return Status::Invalid("Operation on closed stream"); + } + return Status::OK(); + } + + Status CheckPosition(int64_t position, const char* action) const { + if (position < 0) { + return Status::Invalid("Cannot ", action, " from negative position"); + } + if (position > content_length_) { + return Status::IOError("Cannot ", action, " past end of file"); + } + return Status::OK(); + } + + // RandomAccessFile APIs + + Result> ReadMetadata() override { + return metadata_; + } + + Future> ReadMetadataAsync( + const io::IOContext& io_context) override { + return metadata_; + } + + Status Close() override { + path_client_ = nullptr; + file_client_ = nullptr; + closed_ = true; + return Status::OK(); + } + + bool closed() const override { return closed_; } + + Result Tell() const override { + RETURN_NOT_OK(CheckClosed()); + return pos_; + } + + Result GetSize() override { + RETURN_NOT_OK(CheckClosed()); + return content_length_; + } + + Status Seek(int64_t position) override { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "seek")); + + pos_ = position; + return Status::OK(); + } + + Result ReadAt(int64_t position, int64_t nbytes, void* out) override { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "read")); + + nbytes = std::min(nbytes, content_length_ - position); + if (nbytes == 0) { + return 0; + } + + // Read the desired range of bytes + Azure::Storage::Blobs::DownloadBlobToOptions download_options; + Azure::Core::Http::HttpRange range; + range.Offset = position; + range.Length = nbytes; + download_options.Range = Azure::Nullable(range); + try { + auto result = + file_client_ + ->DownloadTo(reinterpret_cast(out), nbytes, download_options) + .Value; + return result.ContentRange.Length.Value(); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } + + Result> ReadAt(int64_t position, int64_t nbytes) override { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "read")); + + // No need to allocate more than the remaining number of bytes + nbytes = std::min(nbytes, content_length_ - position); + + ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes, io_context_.pool())); + if (nbytes > 0) { + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, + ReadAt(position, nbytes, buf->mutable_data())); + DCHECK_LE(bytes_read, nbytes); + RETURN_NOT_OK(buf->Resize(bytes_read)); + } + return std::move(buf); + } + + Result Read(int64_t nbytes, void* out) override { + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out)); + pos_ += bytes_read; + return bytes_read; + } + + Result> Read(int64_t nbytes) override { + ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes)); + pos_ += buffer->size(); + return std::move(buffer); + } + + protected: + std::shared_ptr path_client_; + std::shared_ptr file_client_; + const io::IOContext io_context_; + AzurePath path_; + + bool closed_ = false; + int64_t pos_ = 0; + int64_t content_length_ = kNoSize; + std::shared_ptr metadata_; +}; + +class ObjectOutputStream final : public io::OutputStream { + public: + ObjectOutputStream( + std::shared_ptr& file_client, + std::shared_ptr& blob_client, + const bool is_hierarchical_namespace_enabled, const io::IOContext& io_context, + const AzurePath& path, const std::shared_ptr& metadata) + : file_client_(std::move(file_client)), + blob_client_(std::move(blob_client)), + is_hierarchical_namespace_enabled_(is_hierarchical_namespace_enabled), + io_context_(io_context), + path_(path) {} + + ~ObjectOutputStream() override { + // For compliance with the rest of the IO stack, Close rather than Abort, + // even though it may be more expensive. + io::internal::CloseFromDestructor(this); + } + + Status Init() { + closed_ = false; + if (content_length_ != kNoSize) { + DCHECK_GE(content_length_, 0); + return Status::OK(); + } + if (is_hierarchical_namespace_enabled_) { + try { + file_client_->DeleteIfExists(); + file_client_->CreateIfNotExists(); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } else { + try { + std::string s = ""; + file_client_->UploadFrom( + const_cast(reinterpret_cast(s.data())), s.size()); + content_length_ = 0; + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } + return Status::OK(); + } + + Status Abort() override { + if (closed_) { + return Status::OK(); + } + file_client_ = nullptr; + blob_client_ = nullptr; + closed_ = true; + return Status::OK(); + } + + // OutputStream interface + + Status Close() override { + if (closed_) { + return Status::OK(); + } + file_client_ = nullptr; + blob_client_ = nullptr; + closed_ = true; + return Status::OK(); + } + + bool closed() const override { return closed_; } + + Result Tell() const override { + if (closed_) { + return Status::Invalid("Operation on closed stream"); + } + return pos_; + } + + Status Write(const std::shared_ptr& buffer) override { + return DoWrite(buffer->data(), buffer->size(), buffer); + } + + Status Write(const void* data, int64_t nbytes) override { + return DoWrite(data, nbytes); + } + + Status DoWrite(const void* data, int64_t nbytes, + std::shared_ptr owned_buffer = nullptr) { + if (closed_) { + return Status::Invalid("Operation on closed stream"); + } + if (is_hierarchical_namespace_enabled_) { + try { + auto buffer_stream = std::make_unique( + Azure::Core::IO::MemoryBodyStream( + const_cast(reinterpret_cast(data)), nbytes)); + if (buffer_stream->Length() == 0) { + return Status::OK(); + } + auto result = file_client_->Append(*buffer_stream, pos_); + pos_ += nbytes; + file_client_->Flush(pos_); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } else { + try { + auto append_data = static_cast((void*)data); + auto res = blob_client_->GetBlockList().Value; + auto size = res.CommittedBlocks.size(); + std::string block_id; + { + block_id = std::to_string(size + 1); + size_t n = 8; + int precision = n - std::min(n, block_id.size()); + block_id.insert(0, precision, '0'); + } + block_id = Azure::Core::Convert::Base64Encode( + std::vector(block_id.begin(), block_id.end())); + auto block_content = Azure::Core::IO::MemoryBodyStream( + append_data, strlen(reinterpret_cast(append_data))); + if (block_content.Length() == 0) { + return Status::OK(); + } + blob_client_->StageBlock(block_id, block_content); + std::vector block_ids; + for (auto block : res.CommittedBlocks) { + block_ids.push_back(block.Name); + } + block_ids.push_back(block_id); + blob_client_->CommitBlockList(block_ids); + pos_ += nbytes; + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } + content_length_ += nbytes; + return Status::OK(); + } + + Status Flush() override { + if (closed_) { + return Status::Invalid("Operation on closed stream"); + } + if (is_hierarchical_namespace_enabled_) { + try { + file_client_->Flush(content_length_); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } else { + try { + auto res = blob_client_->GetBlockList().Value; + std::vector block_ids; + for (auto block : res.UncommittedBlocks) { + block_ids.push_back(block.Name); + } + blob_client_->CommitBlockList(block_ids); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } + return Status::OK(); + } + + protected: + std::shared_ptr file_client_; + std::shared_ptr blob_client_; + const bool is_hierarchical_namespace_enabled_; + const io::IOContext io_context_; + const AzurePath path_; + + bool closed_ = true; + int64_t pos_ = 0; + int64_t content_length_ = kNoSize; +}; + +class ObjectAppendStream final : public io::OutputStream { + public: + ObjectAppendStream( + std::shared_ptr& path_client, + std::shared_ptr& file_client, + std::shared_ptr& blob_client, + const bool is_hierarchical_namespace_enabled, const io::IOContext& io_context, + const AzurePath& path, const std::shared_ptr& metadata) + : path_client_(std::move(path_client)), + file_client_(std::move(file_client)), + blob_client_(std::move(blob_client)), + is_hierarchical_namespace_enabled_(is_hierarchical_namespace_enabled), + io_context_(io_context), + path_(path) {} + + ~ObjectAppendStream() override { + // For compliance with the rest of the IO stack, Close rather than Abort, + // even though it may be more expensive. + io::internal::CloseFromDestructor(this); + } + + Status Init() { + closed_ = false; + if (content_length_ != kNoSize) { + DCHECK_GE(content_length_, 0); + return Status::OK(); + } + try { + auto properties = path_client_->GetProperties(); + if (properties.Value.IsDirectory) { + return ::arrow::fs::internal::NotAFile(path_.full_path); + } + content_length_ = properties.Value.FileSize; + pos_ = content_length_; + } catch (const Azure::Storage::StorageException& exception) { + // new file + if (is_hierarchical_namespace_enabled_) { + try { + file_client_->CreateIfNotExists(); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } else { + std::string s = ""; + try { + file_client_->UploadFrom( + const_cast(reinterpret_cast(s.data())), s.size()); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } + content_length_ = 0; + } + return Status::OK(); + } + + Status Abort() override { + if (closed_) { + return Status::OK(); + } + path_client_ = nullptr; + file_client_ = nullptr; + blob_client_ = nullptr; + closed_ = true; + return Status::OK(); + } + + // OutputStream interface + + Status Close() override { + if (closed_) { + return Status::OK(); + } + path_client_ = nullptr; + file_client_ = nullptr; + blob_client_ = nullptr; + closed_ = true; + return Status::OK(); + } + + bool closed() const override { return closed_; } + + Result Tell() const override { + if (closed_) { + return Status::Invalid("Operation on closed stream"); + } + return pos_; + } + + Status Write(const std::shared_ptr& buffer) override { + return DoAppend(buffer->data(), buffer->size(), buffer); + } + + Status Write(const void* data, int64_t nbytes) override { + return DoAppend(data, nbytes); + } + + Status DoAppend(const void* data, int64_t nbytes, + std::shared_ptr owned_buffer = nullptr) { + if (closed_) { + return Status::Invalid("Operation on closed stream"); + } + if (is_hierarchical_namespace_enabled_) { + try { + auto buffer_stream = std::make_unique( + Azure::Core::IO::MemoryBodyStream( + const_cast(reinterpret_cast(data)), nbytes)); + if (buffer_stream->Length() == 0) { + return Status::OK(); + } + auto result = file_client_->Append(*buffer_stream, pos_); + pos_ += nbytes; + file_client_->Flush(pos_); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } else { + try { + auto append_data = static_cast((void*)data); + auto res = blob_client_->GetBlockList().Value; + auto size = res.CommittedBlocks.size(); + std::string block_id; + { + block_id = std::to_string(size + 1); + size_t n = 8; + int precision = n - std::min(n, block_id.size()); + block_id.insert(0, precision, '0'); + } + block_id = Azure::Core::Convert::Base64Encode( + std::vector(block_id.begin(), block_id.end())); + auto block_content = Azure::Core::IO::MemoryBodyStream( + append_data, strlen(reinterpret_cast(append_data))); + if (block_content.Length() == 0) { + return Status::OK(); + } + blob_client_->StageBlock(block_id, block_content); + std::vector block_ids; + for (auto block : res.CommittedBlocks) { + block_ids.push_back(block.Name); + } + block_ids.push_back(block_id); + blob_client_->CommitBlockList(block_ids); + pos_ += nbytes; + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } + content_length_ += nbytes; + return Status::OK(); + } + + Status Flush() override { + if (closed_) { + return Status::Invalid("Operation on closed stream"); + } + if (is_hierarchical_namespace_enabled_) { + try { + file_client_->Flush(content_length_); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } else { + try { + auto res = blob_client_->GetBlockList().Value; + std::vector block_ids; + for (auto block : res.UncommittedBlocks) { + block_ids.push_back(block.Name); + } + blob_client_->CommitBlockList(block_ids); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } + return Status::OK(); + } + + protected: + std::shared_ptr path_client_; + std::shared_ptr file_client_; + std::shared_ptr blob_client_; + const bool is_hierarchical_namespace_enabled_; + const io::IOContext io_context_; + const AzurePath path_; + + bool closed_ = true; + int64_t pos_ = 0; + int64_t content_length_ = kNoSize; +}; + +TimePoint ToTimePoint(const Azure::DateTime& dt) { + return std::chrono::time_point_cast( + dt.operator std::chrono::system_clock::time_point()); +} + +void FileObjectToInfo( + const Azure::Storage::Files::DataLake::Models::PathProperties& properties, + FileInfo* info) { + info->set_type(FileType::File); + info->set_size(static_cast(properties.FileSize)); + info->set_mtime(ToTimePoint(properties.LastModified)); +} + +void PathInfoToFileInfo(const std::string& path, const FileType& type, const int64_t size, + const Azure::DateTime dt, FileInfo* info) { + info->set_type(type); + info->set_size(size); + info->set_path(path); + info->set_mtime(ToTimePoint(dt)); +} + +} // namespace + +// ----------------------------------------------------------------------- +// Azure filesystem implementation + +class AzureBlobFileSystem::Impl + : public std::enable_shared_from_this { + public: + io::IOContext io_context_; + std::shared_ptr gen1_client_; + std::shared_ptr gen2_client_; + std::string dfs_endpoint_url_; + std::string blob_endpoint_url_; + bool is_hierarchical_namespace_enabled_; + + explicit Impl(AzureOptions options, io::IOContext io_context) + : io_context_(io_context), options_(std::move(options)) {} + + Status Init() { + dfs_endpoint_url_ = options_.account_dfs_url; + blob_endpoint_url_ = options_.account_blob_url; + ARROW_ASSIGN_OR_RAISE(gen1_client_, + InitServiceClient( + options_, blob_endpoint_url_)); + ARROW_ASSIGN_OR_RAISE( + gen2_client_, + InitServiceClient( + options_, dfs_endpoint_url_)); + if (options_.is_azurite) { + // gen1Client_->GetAccountInfo().Value.IsHierarchicalNamespaceEnabled throws error + // in azurite + is_hierarchical_namespace_enabled_ = false; + } else { + try { + auto response = gen1_client_->GetAccountInfo(); + is_hierarchical_namespace_enabled_ = + response.Value.IsHierarchicalNamespaceEnabled; + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } + return Status::OK(); + } + + const AzureOptions& options() const { return options_; } + + // Create a container. Successful if container already exists. + Status CreateContainer(const std::string& container) { + auto file_system_client = gen2_client_->GetFileSystemClient(container); + try { + file_system_client.CreateIfNotExists(); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + return Status::OK(); + } + + // Tests to see if a container exists + Result ContainerExists(const std::string& container) { + auto file_system_client = gen2_client_->GetFileSystemClient(container); + try { + auto properties = file_system_client.GetProperties(); + return true; + } catch (const Azure::Storage::StorageException& exception) { + return false; + } + } + + Result DirExists(const std::string& s) { + std::string uri = s; + ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(uri)); + std::shared_ptr path_client; + ARROW_ASSIGN_OR_RAISE( + path_client, InitPathClient( + options_, uri, path.container, path.path_to_file)); + try { + auto properties = path_client->GetProperties(); + return properties.Value.IsDirectory; + } catch (const Azure::Storage::StorageException& exception) { + return false; + } + } + + Result FileExists(const std::string& s) { + std::string uri = s; + ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(uri)); + std::shared_ptr path_client; + ARROW_ASSIGN_OR_RAISE( + path_client, InitPathClient( + options_, uri, path.container, path.path_to_file)); + try { + auto properties = path_client->GetProperties(); + return !properties.Value.IsDirectory; + } catch (const Azure::Storage::StorageException& exception) { + return false; + } + } + + Status CreateEmptyDir(const std::string& container, + const std::vector& path) { + if (path.empty()) { + return CreateContainer(container); + } + auto directory_client = + gen2_client_->GetFileSystemClient(container).GetDirectoryClient(path.front()); + std::vector::const_iterator it = path.begin(); + std::advance(it, 1); + while (it != path.end()) { + directory_client = directory_client.GetSubdirectoryClient(*it); + ++it; + } + try { + directory_client.CreateIfNotExists(); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + return Status::OK(); + } + + Status DeleteContainer(const std::string& container) { + auto file_system_client = gen2_client_->GetFileSystemClient(container); + try { + file_system_client.DeleteIfExists(); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + return Status::OK(); + } + + Status DeleteDir(const std::string& container, const std::vector& path) { + if (path.empty()) { + return DeleteContainer(container); + } + auto file_system_client = gen2_client_->GetFileSystemClient(container); + auto directory_client = file_system_client.GetDirectoryClient(path.front()); + std::vector::const_iterator it = path.begin(); + std::advance(it, 1); + while (it != path.end()) { + directory_client = directory_client.GetSubdirectoryClient(*it); + ++it; + } + ARROW_ASSIGN_OR_RAISE(auto response, FileExists(directory_client.GetUrl())); + if (response) { + return ::arrow::fs::internal::NotADir(directory_client.GetUrl()); + } + ARROW_ASSIGN_OR_RAISE(response, DirExists(directory_client.GetUrl())); + if (!response) { + return ::arrow::fs::internal::NotADir(directory_client.GetUrl()); + } + try { + directory_client.DeleteRecursiveIfExists(); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + return Status::OK(); + } + + Status DeleteFile(const std::string& container, const std::vector& path) { + if (path.empty()) { + return Status::IOError("Cannot delete file, Invalid Azure Blob Storage file path"); + } + if (!is_hierarchical_namespace_enabled_) { + if (path.size() > 1) { + return Status::IOError( + "Cannot delete file, Invalid Azure Blob Storage file path," + " hierarchical namespace not enabled in storage account"); + } + auto blob_client = + gen1_client_->GetBlobContainerClient(container).GetBlobClient(path.front()); + ARROW_ASSIGN_OR_RAISE(auto response, FileExists(blob_client.GetUrl())); + if (!response) { + return ::arrow::fs::internal::NotAFile(blob_client.GetUrl()); + } + try { + blob_client.DeleteIfExists(); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + return Status::OK(); + } + auto file_system_client = gen2_client_->GetFileSystemClient(container); + if (path.size() == 1) { + auto file_client = file_system_client.GetFileClient(path.front()); + ARROW_ASSIGN_OR_RAISE(auto response, DirExists(file_client.GetUrl())); + if (response) { + return ::arrow::fs::internal::NotAFile(file_client.GetUrl()); + } + ARROW_ASSIGN_OR_RAISE(response, FileExists(file_client.GetUrl())); + if (!response) { + return ::arrow::fs::internal::NotAFile(file_client.GetUrl()); + } + try { + file_client.DeleteIfExists(); + } catch (const Azure::Storage::StorageException& exception) { + // Azurite throws an exception + if (!options_.is_azurite) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } + return Status::OK(); + } + std::string file_name = path.back(); + auto directory_client = file_system_client.GetDirectoryClient(path.front()); + std::vector::const_iterator it = path.begin(); + std::advance(it, 1); + while (it != (path.end() - 1)) { + directory_client = directory_client.GetSubdirectoryClient(*it); + ++it; + } + auto file_client = directory_client.GetFileClient(file_name); + ARROW_ASSIGN_OR_RAISE(auto response, DirExists(file_client.GetUrl())); + if (response) { + return ::arrow::fs::internal::NotAFile(file_client.GetUrl()); + } + ARROW_ASSIGN_OR_RAISE(response, FileExists(file_client.GetUrl())); + if (!response) { + return ::arrow::fs::internal::NotAFile(file_client.GetUrl()); + } + try { + file_client.DeleteIfExists(); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + return Status::OK(); + } + + Status Move(const std::string& src, const std::string& dest) { + ARROW_ASSIGN_OR_RAISE(auto src_path, AzurePath::FromString(src)); + ARROW_ASSIGN_OR_RAISE(auto dest_path, AzurePath::FromString(dest)); + + if (!is_hierarchical_namespace_enabled_) { + return Status::IOError( + "Cannot perform move operation, Hierarchical namespace not enabled in storage " + "account"); + } + if (src_path.empty() || src_path.path_to_file.empty()) { + return ::arrow::fs::internal::PathNotFound(src_path.full_path); + } + if (dest_path.empty() || dest_path.path_to_file.empty()) { + return ::arrow::fs::internal::PathNotFound(dest_path.full_path); + } + if (src_path == dest_path) { + return Status::OK(); + } + ARROW_ASSIGN_OR_RAISE(auto file_exists, + FileExists(dfs_endpoint_url_ + src_path.full_path)); + ARROW_ASSIGN_OR_RAISE(auto dir_exists, + DirExists(dfs_endpoint_url_ + src_path.full_path)); + if (file_exists) { + auto file_system_client = gen2_client_->GetFileSystemClient(src_path.container); + auto path = src_path.path_to_file_parts; + if (path.size() == 1) { + try { + file_system_client.RenameFile(path.front(), dest_path.path_to_file); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + return Status::OK(); + } + auto directory_client = file_system_client.GetDirectoryClient(path.front()); + std::vector::const_iterator it = path.begin(); + std::advance(it, 1); + while (it != path.end()) { + if ((it + 1) == path.end()) { + break; + } + directory_client = directory_client.GetSubdirectoryClient(*it); + ++it; + } + try { + directory_client.RenameFile(it->data(), dest_path.path_to_file); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } else if (dir_exists) { + auto file_system_client = gen2_client_->GetFileSystemClient(src_path.container); + auto path = src_path.path_to_file_parts; + if (path.size() == 1) { + try { + file_system_client.RenameDirectory(path.front(), dest_path.path_to_file); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + return Status::OK(); + } + auto directory_client = file_system_client.GetDirectoryClient(path.front()); + std::vector::const_iterator it = path.begin(); + std::advance(it, 1); + while (it != path.end()) { + if ((it + 1) == path.end()) { + break; + } + directory_client = directory_client.GetSubdirectoryClient(*it); + ++it; + } + try { + directory_client.RenameSubdirectory(it->data(), dest_path.path_to_file); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } else { + return ::arrow::fs::internal::PathNotFound(src_path.full_path); + } + return Status::OK(); + } + + Status CopyFile(const std::string& src, const std::string& dest) { + ARROW_ASSIGN_OR_RAISE(auto src_path, AzurePath::FromString(src)); + ARROW_ASSIGN_OR_RAISE(auto dest_path, AzurePath::FromString(dest)); + + if (src_path.empty() || src_path.path_to_file.empty()) { + return ::arrow::fs::internal::NotAFile(src_path.full_path); + } + if (dest_path.empty() || dest_path.path_to_file.empty()) { + return ::arrow::fs::internal::PathNotFound(dest_path.full_path); + } + + ARROW_ASSIGN_OR_RAISE(auto response, + FileExists(dfs_endpoint_url_ + src_path.full_path)); + if (!response) { + return ::arrow::fs::internal::NotAFile(src_path.full_path); + } + + ARROW_ASSIGN_OR_RAISE(response, DirExists(dfs_endpoint_url_ + dest_path.full_path)); + if (response) { + return Status::IOError( + "Cannot copy file, Invalid Azure Blob Storage destination path"); + } + + if (!is_hierarchical_namespace_enabled_) { + if (src_path.path_to_file_parts.size() > 1 || + dest_path.path_to_file_parts.size() > 1) { + return Status::IOError( + "Invalid Azure Blob Storage path provided, " + "hierarchical namespace not enabled in storage account"); + } + if (dest_path.empty() || dest_path.path_to_file_parts.empty()) { + return ::arrow::fs::internal::PathNotFound(dest_path.full_path); + } + if (src_path == dest_path) { + return Status::OK(); + } + auto container_client = gen1_client_->GetBlobContainerClient(dest_path.container); + auto file_client = container_client.GetBlobClient(dest_path.path_to_file); + try { + file_client.StartCopyFromUri(blob_endpoint_url_ + src); + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + return Status::OK(); + } + + if (dest_path.has_parent()) { + AzurePath parent_path = dest_path.parent(); + if (parent_path.path_to_file.empty()) { + ARROW_ASSIGN_OR_RAISE(response, ContainerExists(parent_path.container)); + if (!response) { + return Status::IOError("Cannot copy file '", src_path.full_path, + "': parent directory of destination does not exist"); + } + } else { + ARROW_ASSIGN_OR_RAISE(response, + DirExists(dfs_endpoint_url_ + parent_path.full_path)); + if (!response) { + return Status::IOError("Cannot copy file '", src_path.full_path, + "': parent directory of destination does not exist"); + } + } + } + if (src_path == dest_path) { + return Status::OK(); + } + auto container_client = gen1_client_->GetBlobContainerClient(dest_path.container); + auto file_client = container_client.GetBlobClient(dest_path.path_to_file); + try { + if (options_.credentials_kind == AzureCredentialsKind::Sas) { + file_client.StartCopyFromUri(blob_endpoint_url_ + src + options_.sas_token); + } else { + file_client.StartCopyFromUri(blob_endpoint_url_ + src); + } + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + return Status::OK(); + } + + Status ListPaths(const std::string& container, const std::string& path, + std::vector* children_dirs, + std::vector* children_files, + const bool allow_not_found = false) { + if (!is_hierarchical_namespace_enabled_) { + try { + auto paths = gen1_client_->GetBlobContainerClient(container).ListBlobs(); + for (auto p : paths.Blobs) { + std::shared_ptr + path_client; + ARROW_ASSIGN_OR_RAISE( + path_client, + InitPathClient( + options_, dfs_endpoint_url_ + container + "/" + p.Name, container, + p.Name)); + children_files->push_back(container + "/" + p.Name); + } + } catch (const Azure::Storage::StorageException& exception) { + if (!allow_not_found) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } + return Status::OK(); + } + if (path.empty()) { + try { + auto paths = gen2_client_->GetFileSystemClient(container).ListPaths(false); + for (auto p : paths.Paths) { + std::shared_ptr + path_client; + ARROW_ASSIGN_OR_RAISE( + path_client, + InitPathClient( + options_, dfs_endpoint_url_ + container + "/" + p.Name, container, + p.Name)); + if (path_client->GetProperties().Value.IsDirectory) { + children_dirs->push_back(container + "/" + p.Name); + } else { + children_files->push_back(container + "/" + p.Name); + } + } + } catch (const Azure::Storage::StorageException& exception) { + if (!allow_not_found) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } + return Status::OK(); + } + std::vector dirs = internal::SplitAbstractPath(path); + try { + Azure::Storage::Files::DataLake::DataLakeDirectoryClient dir_client = + gen2_client_->GetFileSystemClient(container).GetDirectoryClient(dirs.front()); + for (auto dir = dirs.begin() + 1; dir < dirs.end(); ++dir) { + dir_client = dir_client.GetSubdirectoryClient(*dir); + } + auto paths = dir_client.ListPaths(false); + for (auto p : paths.Paths) { + std::shared_ptr path_client; + ARROW_ASSIGN_OR_RAISE( + path_client, + InitPathClient( + options_, dfs_endpoint_url_ + container + "/" + p.Name, container, + p.Name)); + if (path_client->GetProperties().Value.IsDirectory) { + children_dirs->push_back(container + "/" + p.Name); + } else { + children_files->push_back(container + "/" + p.Name); + } + } + } catch (const Azure::Storage::StorageException& exception) { + if (!allow_not_found) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } + return Status::OK(); + } + + Status Walk(const FileSelector& select, const std::string& container, + const std::string& path, int nesting_depth, std::vector* out) { + std::vector children_dirs; + std::vector children_files; + + RETURN_NOT_OK(ListPaths(container, path, &children_dirs, &children_files, + select.allow_not_found)); + + for (const auto& child_file : children_files) { + FileInfo info; + Azure::Storage::Files::DataLake::Models::PathProperties properties; + RETURN_NOT_OK(GetProperties(dfs_endpoint_url_ + child_file, &properties)); + PathInfoToFileInfo(child_file, FileType::File, properties.FileSize, + properties.LastModified, &info); + out->push_back(std::move(info)); + } + for (const auto& child_dir : children_dirs) { + FileInfo info; + Azure::Storage::Files::DataLake::Models::PathProperties properties; + RETURN_NOT_OK(GetProperties(dfs_endpoint_url_ + child_dir, &properties)); + PathInfoToFileInfo(child_dir, FileType::Directory, -1, properties.LastModified, + &info); + out->push_back(std::move(info)); + if (select.recursive && nesting_depth < select.max_recursion) { + const auto src = internal::RemoveTrailingSlash(child_dir); + auto first_sep = src.find_first_of("/"); + std::string s = std::string(src.substr(first_sep + 1)); + RETURN_NOT_OK(Walk(select, container, s, nesting_depth + 1, out)); + } + } + return Status::OK(); + } + + Status GetProperties( + const std::string& s, + Azure::Storage::Files::DataLake::Models::PathProperties* properties) { + ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s)); + std::shared_ptr path_client; + ARROW_ASSIGN_OR_RAISE( + path_client, InitPathClient( + options_, s, path.container, path.path_to_file)); + if (path.path_to_file.empty()) { + auto file_system_client = gen2_client_->GetFileSystemClient(path.container); + try { + auto props = file_system_client.GetProperties().Value; + properties->LastModified = props.LastModified; + properties->Metadata = props.Metadata; + properties->ETag = props.ETag; + properties->FileSize = -1; + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + return Status::OK(); + } + try { + auto props = path_client->GetProperties().Value; + properties->FileSize = props.FileSize; + properties->LastModified = props.LastModified; + properties->Metadata = props.Metadata; + properties->ETag = props.ETag; + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + return Status::OK(); + } + + Status DeleteDirContents(const std::string& container, const std::string& path, + const std::vector& path_to_file_parts) { + std::vector children_dirs; + std::vector children_files; + + RETURN_NOT_OK(ListPaths(container, path, &children_dirs, &children_files)); + + for (const auto& child_file : children_files) { + ARROW_ASSIGN_OR_RAISE(auto file_path, AzurePath::FromString(child_file)); + RETURN_NOT_OK(DeleteFile(file_path.container, file_path.path_to_file_parts)); + } + for (const auto& child_dir : children_dirs) { + ARROW_ASSIGN_OR_RAISE(auto dir_path, AzurePath::FromString(child_dir)); + RETURN_NOT_OK(DeleteDir(dir_path.container, dir_path.path_to_file_parts)); + } + return Status::OK(); + } + + Result> ListContainers() { + try { + auto outcome = gen2_client_->ListFileSystems(); + std::vector containers; + for (auto container : outcome.FileSystems) { + containers.push_back(container.Name); + } + return containers; + } catch (const Azure::Storage::StorageException& exception) { + return Status::IOError(exception.RawResponse->GetReasonPhrase()); + } + } + + Result> OpenInputFile(const std::string& s, + AzureBlobFileSystem* fs) { + ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s)); + + if (path.empty()) { + return ::arrow::fs::internal::PathNotFound(path.full_path); + } + if (!is_hierarchical_namespace_enabled_) { + if (path.path_to_file_parts.size() > 1) { + return Status::IOError( + "Invalid Azure Blob Storage path provided," + " hierarchical namespace not enabled in storage account"); + } + } + ARROW_ASSIGN_OR_RAISE(auto response, FileExists(dfs_endpoint_url_ + path.full_path)); + if (!response) { + return ::arrow::fs::internal::PathNotFound(path.full_path); + } + std::shared_ptr path_client; + ARROW_ASSIGN_OR_RAISE( + path_client, InitPathClient( + options_, dfs_endpoint_url_ + path.full_path, path.container, + path.path_to_file)); + + std::shared_ptr file_client; + ARROW_ASSIGN_OR_RAISE( + file_client, InitPathClient( + options_, dfs_endpoint_url_ + path.full_path, path.container, + path.path_to_file)); + + auto ptr = std::make_shared(path_client, file_client, + fs->io_context(), path); + RETURN_NOT_OK(ptr->Init()); + return ptr; + } + + Result> OpenOutputStream( + const std::string& s, const std::shared_ptr& metadata, + AzureBlobFileSystem* fs) { + ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s)); + + if (path.empty() || path.path_to_file.empty()) { + return ::arrow::fs::internal::PathNotFound(path.full_path); + } + std::string endpoint_url = dfs_endpoint_url_; + if (!is_hierarchical_namespace_enabled_) { + if (path.path_to_file_parts.size() > 1) { + return Status::IOError( + "Invalid path provided," + " hierarchical namespace not enabled"); + } + endpoint_url = blob_endpoint_url_; + } + ARROW_ASSIGN_OR_RAISE(auto response, DirExists(dfs_endpoint_url_ + path.full_path)); + if (response) { + return ::arrow::fs::internal::PathNotFound(path.full_path); + } + std::shared_ptr file_client; + ARROW_ASSIGN_OR_RAISE( + file_client, + InitPathClient( + options_, endpoint_url + path.full_path, path.container, path.path_to_file)); + + std::shared_ptr blob_client; + ARROW_ASSIGN_OR_RAISE( + blob_client, + InitPathClient( + options_, endpoint_url + path.full_path, path.container, path.path_to_file)); + + if (path.has_parent()) { + AzurePath parent_path = path.parent(); + if (parent_path.path_to_file.empty()) { + ARROW_ASSIGN_OR_RAISE(response, ContainerExists(parent_path.container)); + if (!response) { + return Status::IOError("Cannot write to file '", path.full_path, + "': parent directory does not exist"); + } + } else { + ARROW_ASSIGN_OR_RAISE(response, + DirExists(dfs_endpoint_url_ + parent_path.full_path)); + if (!response) { + return Status::IOError("Cannot write to file '", path.full_path, + "': parent directory does not exist"); + } + } + } + auto ptr = std::make_shared(file_client, blob_client, + is_hierarchical_namespace_enabled_, + fs->io_context(), path, metadata); + RETURN_NOT_OK(ptr->Init()); + return ptr; + } + + Result> OpenAppendStream( + const std::string& s, const std::shared_ptr& metadata, + AzureBlobFileSystem* fs) { + ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s)); + + if (path.empty() || path.path_to_file.empty()) { + return ::arrow::fs::internal::PathNotFound(path.full_path); + } + std::string endpoint_url = dfs_endpoint_url_; + if (!is_hierarchical_namespace_enabled_) { + if (path.path_to_file_parts.size() > 1) { + return Status::IOError( + "Invalid Azure Blob Storage path provided," + " hierarchical namespace not enabled in storage account"); + } + endpoint_url = blob_endpoint_url_; + } + ARROW_ASSIGN_OR_RAISE(auto response, DirExists(dfs_endpoint_url_ + path.full_path)); + if (response) { + return ::arrow::fs::internal::PathNotFound(path.full_path); + } + std::shared_ptr path_client; + ARROW_ASSIGN_OR_RAISE( + path_client, + InitPathClient( + options_, endpoint_url + path.full_path, path.container, path.path_to_file)); + + std::shared_ptr file_client; + ARROW_ASSIGN_OR_RAISE( + file_client, + InitPathClient( + options_, endpoint_url + path.full_path, path.container, path.path_to_file)); + + std::shared_ptr blob_client; + ARROW_ASSIGN_OR_RAISE( + blob_client, + InitPathClient( + options_, endpoint_url + path.full_path, path.container, path.path_to_file)); + + if (path.has_parent()) { + AzurePath parent_path = path.parent(); + if (parent_path.path_to_file.empty()) { + ARROW_ASSIGN_OR_RAISE(response, ContainerExists(parent_path.container)); + if (!response) { + return Status::IOError("Cannot write to file '", path.full_path, + "': parent directory does not exist"); + } + } else { + ARROW_ASSIGN_OR_RAISE(response, + DirExists(dfs_endpoint_url_ + parent_path.full_path)); + if (!response) { + return Status::IOError("Cannot write to file '", path.full_path, + "': parent directory does not exist"); + } + } + } + auto ptr = std::make_shared(path_client, file_client, blob_client, + is_hierarchical_namespace_enabled_, + fs->io_context(), path, metadata); + RETURN_NOT_OK(ptr->Init()); + return ptr; + } + + Result> OpenInputFile(const FileInfo& info, + AzureBlobFileSystem* fs) { + if (info.type() == FileType::NotFound) { + return ::arrow::fs::internal::PathNotFound(info.path()); + } + if (info.type() != FileType::File && info.type() != FileType::Unknown) { + return ::arrow::fs::internal::NotAFile(info.path()); + } + + ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(info.path())); + + if (!is_hierarchical_namespace_enabled_) { + if (path.path_to_file_parts.size() > 1) { + return Status::IOError( + "Invalid Azure Blob Storage path provided, hierarchical namespace" + " not enabled in storage account"); + } + } + ARROW_ASSIGN_OR_RAISE(auto response, FileExists(dfs_endpoint_url_ + info.path())); + if (!response) { + return ::arrow::fs::internal::PathNotFound(info.path()); + } + std::shared_ptr path_client; + ARROW_ASSIGN_OR_RAISE( + path_client, InitPathClient( + options_, dfs_endpoint_url_ + info.path(), path.container, + path.path_to_file)); + + std::shared_ptr file_client; + ARROW_ASSIGN_OR_RAISE( + file_client, InitPathClient( + options_, dfs_endpoint_url_ + info.path(), path.container, + path.path_to_file)); + + auto ptr = std::make_shared(path_client, file_client, + fs->io_context(), path, info.size()); + RETURN_NOT_OK(ptr->Init()); + return ptr; + } + + protected: + AzureOptions options_; +}; + +AzureBlobFileSystem::AzureBlobFileSystem(const AzureOptions& options, + const io::IOContext& io_context) + : FileSystem(io_context), impl_(std::make_shared(options, io_context)) { + default_async_is_sync_ = false; +} + +AzureBlobFileSystem::~AzureBlobFileSystem() {} + +Result> AzureBlobFileSystem::Make( + const AzureOptions& options, const io::IOContext& io_context) { + std::shared_ptr ptr(new AzureBlobFileSystem(options, io_context)); + RETURN_NOT_OK(ptr->impl_->Init()); + return ptr; +} + +bool AzureBlobFileSystem::Equals(const FileSystem& other) const { + if (this == &other) { + return true; + } + if (other.type_name() != type_name()) { + return false; + } + const auto& azure_fs = + ::arrow::internal::checked_cast(other); + return options().Equals(azure_fs.options()); +} + +AzureOptions AzureBlobFileSystem::options() const { return impl_->options(); } + +Result AzureBlobFileSystem::GetFileInfo(const std::string& s) { + ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s)); + FileInfo info; + info.set_path(s); + + if (!impl_->is_hierarchical_namespace_enabled_) { + if (path.path_to_file_parts.size() > 1) { + info.set_type(FileType::NotFound); + return info; + } + } + + if (path.empty()) { + // It's the root path "" + info.set_type(FileType::Directory); + return info; + } else if (path.path_to_file.empty()) { + // It's a container + ARROW_ASSIGN_OR_RAISE(bool container_exists, impl_->ContainerExists(path.container)); + if (!container_exists) { + info.set_type(FileType::NotFound); + return info; + } + info.set_type(FileType::Directory); + return info; + } else { + // It's an object + ARROW_ASSIGN_OR_RAISE(bool file_exists, + impl_->FileExists(impl_->dfs_endpoint_url_ + path.full_path)); + if (file_exists) { + // "File" object found + Azure::Storage::Files::DataLake::Models::PathProperties properties; + RETURN_NOT_OK( + impl_->GetProperties(impl_->dfs_endpoint_url_ + path.full_path, &properties)); + FileObjectToInfo(properties, &info); + return info; + } + // Not found => perhaps it's a "directory" + ARROW_ASSIGN_OR_RAISE(auto is_dir, + impl_->DirExists(impl_->dfs_endpoint_url_ + path.full_path)); + if (is_dir) { + info.set_type(FileType::Directory); + } else { + info.set_type(FileType::NotFound); + } + return info; + } +} + +Result AzureBlobFileSystem::GetFileInfo(const FileSelector& select) { + ARROW_ASSIGN_OR_RAISE(auto base_path, AzurePath::FromString(select.base_dir)); + + FileInfoVector results; + + if (base_path.empty()) { + // List all containers + ARROW_ASSIGN_OR_RAISE(auto containers, impl_->ListContainers()); + for (const auto& container : containers) { + FileInfo info; + Azure::Storage::Files::DataLake::Models::PathProperties properties; + RETURN_NOT_OK( + impl_->GetProperties(impl_->dfs_endpoint_url_ + container, &properties)); + PathInfoToFileInfo(container, FileType::Directory, -1, properties.LastModified, + &info); + results.push_back(std::move(info)); + if (select.recursive) { + RETURN_NOT_OK(impl_->Walk(select, container, "", 0, &results)); + } + } + return results; + } + + if (!impl_->is_hierarchical_namespace_enabled_) { + if (base_path.path_to_file_parts.size() > 1) { + if (!select.allow_not_found) { + return Status::IOError( + "Invalid Azure Blob Storage path provided, hierarchical namespace not" + " enabled in storage account"); + } + return results; + } + } + + ARROW_ASSIGN_OR_RAISE(auto response, impl_->ContainerExists(base_path.container)); + if (base_path.path_to_file.empty() && !response) { + if (!select.allow_not_found) { + return ::arrow::fs::internal::PathNotFound(base_path.container); + } + return results; + } + + ARROW_ASSIGN_OR_RAISE( + response, impl_->FileExists(impl_->dfs_endpoint_url_ + base_path.full_path)); + if (response) { + return ::arrow::fs::internal::PathNotFound(base_path.full_path); + } + + ARROW_ASSIGN_OR_RAISE(response, + impl_->DirExists(impl_->dfs_endpoint_url_ + base_path.full_path)); + if (!(base_path.path_to_file.empty()) && !response) { + if (!select.allow_not_found) { + return ::arrow::fs::internal::PathNotFound(base_path.full_path); + } + return results; + } + + // Nominal case -> walk a single container + RETURN_NOT_OK( + impl_->Walk(select, base_path.container, base_path.path_to_file, 0, &results)); + return results; +} + +Status AzureBlobFileSystem::CreateDir(const std::string& s, bool recursive) { + ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s)); + + if (path.empty()) { + return Status::IOError("Cannot create directory, root path given"); + } + ARROW_ASSIGN_OR_RAISE(auto response, + impl_->FileExists(impl_->dfs_endpoint_url_ + path.full_path)); + if (response) { + return Status::IOError("Cannot create directory, file exists at the specified path"); + } + if (path.path_to_file.empty()) { + // Create container + return impl_->CreateContainer(path.container); + } + // Hierarchical namespace not enabled type storage accounts + if (!impl_->is_hierarchical_namespace_enabled_) { + if (!path.path_to_file.empty()) { + return Status::IOError( + "Cannot create directory, " + "storage account doesn't have hierarchical namespace enabled"); + } + } + if (recursive) { + // Ensure container exists + ARROW_ASSIGN_OR_RAISE(bool container_exists, impl_->ContainerExists(path.container)); + if (!container_exists) { + RETURN_NOT_OK(impl_->CreateContainer(path.container)); + } + std::vector parent_path_to_file; + + for (const auto& part : path.path_to_file_parts) { + parent_path_to_file.push_back(part); + RETURN_NOT_OK(impl_->CreateEmptyDir(path.container, parent_path_to_file)); + } + return Status::OK(); + } else { + // Check parent dir exists + if (path.has_parent()) { + auto parent_path = path.parent(); + if (parent_path.path_to_file.empty()) { + ARROW_ASSIGN_OR_RAISE(auto exists, impl_->ContainerExists(parent_path.container)); + if (!exists) { + return Status::IOError("Cannot create directory '", path.full_path, + "': parent directory does not exist"); + } + } else { + ARROW_ASSIGN_OR_RAISE(auto exists, impl_->DirExists(impl_->dfs_endpoint_url_ + + parent_path.full_path)); + if (!exists) { + return Status::IOError("Cannot create directory '", path.full_path, + "': parent directory does not exist"); + } + } + } + return impl_->CreateEmptyDir(path.container, path.path_to_file_parts); + } +} + +Status AzureBlobFileSystem::DeleteDir(const std::string& s) { + ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s)); + if (path.empty()) { + return Status::NotImplemented("Cannot delete all Azure Containers"); + } + if (path.path_to_file.empty()) { + return impl_->DeleteContainer(path.container); + } + ARROW_ASSIGN_OR_RAISE(auto response, + impl_->FileExists(impl_->dfs_endpoint_url_ + path.full_path)); + if (response) { + return Status::IOError("Cannot delete directory, file exists at the specified path"); + } + + // Hierarchical namespace not enabled type storage accounts + if (!impl_->is_hierarchical_namespace_enabled_) { + if (!path.path_to_file.empty()) { + return Status::IOError( + "Cannot delete directory, storage" + "account doesn't have hierarchical namespace enabled"); + } + } + return impl_->DeleteDir(path.container, path.path_to_file_parts); +} + +Status AzureBlobFileSystem::DeleteDirContents(const std::string& s, bool missing_dir_ok) { + ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s)); + + if (path.empty()) { + if (missing_dir_ok) { + return Status::OK(); + } + return ::arrow::fs::internal::PathNotFound(path.full_path); + } + + ARROW_ASSIGN_OR_RAISE(auto response, impl_->ContainerExists(path.container)); + if (path.path_to_file.empty() && !response) { + if (missing_dir_ok) { + return Status::OK(); + } + return ::arrow::fs::internal::PathNotFound(path.container); + } + + ARROW_ASSIGN_OR_RAISE(response, + impl_->FileExists(impl_->dfs_endpoint_url_ + path.full_path)); + if (response) { + if (missing_dir_ok) { + return Status::OK(); + } + return ::arrow::fs::internal::PathNotFound(path.full_path); + } + + ARROW_ASSIGN_OR_RAISE(response, + impl_->DirExists(impl_->dfs_endpoint_url_ + path.full_path)); + if (!(path.path_to_file.empty()) && !response) { + if (missing_dir_ok) { + return Status::OK(); + } + return ::arrow::fs::internal::PathNotFound(path.full_path); + } + + return impl_->DeleteDirContents(path.container, path.path_to_file, + path.path_to_file_parts); +} + +Status AzureBlobFileSystem::DeleteRootDirContents() { + return Status::NotImplemented("Cannot delete all Azure Containers"); +} + +Status AzureBlobFileSystem::DeleteFile(const std::string& s) { + ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s)); + return impl_->DeleteFile(path.container, path.path_to_file_parts); +} + +Status AzureBlobFileSystem::Move(const std::string& src, const std::string& dest) { + return impl_->Move(src, dest); +} + +Status AzureBlobFileSystem::CopyFile(const std::string& src, const std::string& dest) { + return impl_->CopyFile(src, dest); +} + +Result> AzureBlobFileSystem::OpenInputStream( + const std::string& s) { + return impl_->OpenInputFile(s, this); +} + +Result> AzureBlobFileSystem::OpenInputStream( + const FileInfo& info) { + return impl_->OpenInputFile(info, this); +} + +Result> AzureBlobFileSystem::OpenInputFile( + const std::string& s) { + return impl_->OpenInputFile(s, this); +} + +Result> AzureBlobFileSystem::OpenInputFile( + const FileInfo& info) { + return impl_->OpenInputFile(info, this); +} + +Result> AzureBlobFileSystem::OpenOutputStream( + const std::string& path, const std::shared_ptr& metadata) { + return impl_->OpenOutputStream(path, metadata, this); +} + +Result> AzureBlobFileSystem::OpenAppendStream( + const std::string& path, const std::shared_ptr& metadata) { + return impl_->OpenAppendStream(path, metadata, this); +} +} // namespace fs +} // namespace arrow diff --git a/cpp/src/arrow/filesystem/azurefs.h b/cpp/src/arrow/filesystem/azurefs.h new file mode 100644 index 00000000000..d58b76ab374 --- /dev/null +++ b/cpp/src/arrow/filesystem/azurefs.h @@ -0,0 +1,176 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "arrow/filesystem/filesystem.h" +#include "arrow/util/macros.h" +#include "arrow/util/uri.h" + +namespace Azure { +namespace Core { +namespace Credentials { + +class TokenCredential; + +} // namespace Credentials +} // namespace Core +namespace Storage { + +class StorageSharedKeyCredential; + +} // namespace Storage +} // namespace Azure + +namespace arrow { +namespace fs { + +enum class AzureCredentialsKind : int8_t { + /// Anonymous access (no credentials used), public + Anonymous, + /// Use explicitly-provided access key pair + StorageCredentials, + /// Use ServicePrincipleCredentials + ServicePrincipleCredentials, + /// Use Sas Token to authenticate + Sas, + /// Use Connection String + ConnectionString +}; + +/// Options for the AzureFileSystem implementation. +struct ARROW_EXPORT AzureOptions { + std::string scheme; + std::string account_dfs_url; + std::string account_blob_url; + bool is_azurite = false; + AzureCredentialsKind credentials_kind = AzureCredentialsKind::Anonymous; + + std::string sas_token; + std::string connection_string; + std::shared_ptr + storage_credentials_provider; + std::shared_ptr + service_principle_credentials_provider; + + AzureOptions(); + + Result GetAccountNameFromConnectionString( + const std::string& connection_string); + + Status ConfigureAnonymousCredentials(const std::string& account_name); + + Status ConfigureAccountKeyCredentials(const std::string& account_name, + const std::string& account_key); + + Status ConfigureConnectionStringCredentials(const std::string& connection_string); + + Status ConfigureServicePrincipleCredentials(const std::string& account_name, + const std::string& tenant_id, + const std::string& client_id, + const std::string& client_secret); + + Status ConfigureSasCredentials(const std::string& sas_token); + + bool Equals(const AzureOptions& other) const; + + static Result FromAnonymous(const std::string& account_name); + + static Result FromAccountKey(const std::string& account_name, + const std::string& account_key); + + // https://docs.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string + static Result FromConnectionString(const std::string& connection_string); + + static Result FromServicePrincipleCredential( + const std::string& account_name, const std::string& tenant_id, + const std::string& client_id, const std::string& client_secret); + + static Result FromSas(const std::string& uri); + + static Result FromUri(const ::arrow::internal::Uri& uri, + std::string* out_path = NULLPTR); + static Result FromUri(const std::string& uri, + std::string* out_path = NULLPTR); +}; + +class ARROW_EXPORT AzureBlobFileSystem : public FileSystem { + public: + ~AzureBlobFileSystem() override; + + std::string type_name() const override { return "abfs"; } + + /// Return the original Azure options when constructing the filesystem + AzureOptions options() const; + + bool Equals(const FileSystem& other) const override; + + /// \cond FALSE + using FileSystem::GetFileInfo; + /// \endcond + Result GetFileInfo(const std::string& path) override; + Result> GetFileInfo(const FileSelector& select) override; + + /// FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override; + + Status CreateDir(const std::string& path, bool recursive = true) override; + + Status DeleteDir(const std::string& path) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; + Status DeleteRootDirContents() override; + + Status DeleteFile(const std::string& path) override; + + Status Move(const std::string& src, const std::string& dest) override; + + Status CopyFile(const std::string& src, const std::string& dest) override; + + Result> OpenInputStream( + const std::string& path) override; + + Result> OpenInputStream(const FileInfo& info) override; + + Result> OpenInputFile( + const std::string& path) override; + + Result> OpenInputFile( + const FileInfo& info) override; + + Result> OpenOutputStream( + const std::string& path, + const std::shared_ptr& metadata = {}) override; + + Result> OpenAppendStream( + const std::string& path, + const std::shared_ptr& metadata = {}) override; + + static Result> Make( + const AzureOptions& options, const io::IOContext& = io::default_io_context()); + + protected: + explicit AzureBlobFileSystem(const AzureOptions& options, const io::IOContext&); + + class Impl; + std::shared_ptr impl_; +}; + +} // namespace fs +} // namespace arrow diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc new file mode 100644 index 00000000000..6a9429ca058 --- /dev/null +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -0,0 +1,531 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/filesystem/azurefs.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/filesystem/test_util.h" +#include "arrow/testing/future_util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/util.h" +#include "arrow/util/io_util.h" +#include "arrow/util/key_value_metadata.h" +#include "arrow/util/logging.h" +#include "arrow/util/uri.h" + +namespace arrow { + +using internal::Uri; + +namespace fs { +namespace internal { + +namespace bp = boost::process; + +using ::arrow::internal::TemporaryDir; +using ::testing::IsEmpty; +using ::testing::NotNull; + +class AzuriteEnv : public ::testing::Environment { + public: + AzuriteEnv() { + account_name_ = "devstoreaccount1"; + account_key_ = + "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/" + "KBHBeksoGMGw=="; + auto exe_path = bp::search_path("azurite"); + if (exe_path.empty()) { + auto error = std::string("Could not find Azurite emulator."); + status_ = Status::Invalid(error); + return; + } + auto temp_dir_ = TemporaryDir::Make("azurefs-test-").ValueOrDie(); + server_process_ = bp::child(boost::this_process::environment(), exe_path, "--silent", + "--location", temp_dir_->path().ToString(), "--debug", + temp_dir_->path().ToString() + "/debug.log"); + if (!(server_process_.valid() && server_process_.running())) { + auto error = "Could not start Azurite emulator."; + server_process_.terminate(); + server_process_.wait(); + status_ = Status::Invalid(error); + return; + } + status_ = Status::OK(); + } + + ~AzuriteEnv() override { + server_process_.terminate(); + server_process_.wait(); + } + + const std::string& account_name() const { return account_name_; } + const std::string& account_key() const { return account_key_; } + const Status status() const { return status_; } + + private: + std::string account_name_; + std::string account_key_; + bp::child server_process_; + Status status_; + std::unique_ptr temp_dir_; +}; + +auto* azurite_env = ::testing::AddGlobalTestEnvironment(new AzuriteEnv); + +AzuriteEnv* GetAzuriteEnv() { + return ::arrow::internal::checked_cast(azurite_env); +} + +class TestAzureFileSystem : public ::testing::Test { + public: + std::shared_ptr fs_; + std::shared_ptr gen2_client_; + AzureOptions options_; + + void MakeFileSystem() { + const std::string& account_name = GetAzuriteEnv()->account_name(); + const std::string& account_key = GetAzuriteEnv()->account_key(); + options_.is_azurite = true; + options_.ConfigureAccountKeyCredentials(account_name, account_key); + gen2_client_ = + std::make_shared( + options_.account_dfs_url, options_.storage_credentials_provider); + ASSERT_OK_AND_ASSIGN(fs_, AzureBlobFileSystem::Make(options_)); + } + + void SetUp() override { + ASSERT_THAT(GetAzuriteEnv(), NotNull()); + ASSERT_OK(GetAzuriteEnv()->status()); + + MakeFileSystem(); + auto file_system_client = gen2_client_->GetFileSystemClient("container"); + file_system_client.CreateIfNotExists(); + file_system_client = gen2_client_->GetFileSystemClient("empty-container"); + file_system_client.CreateIfNotExists(); + auto file_client = + std::make_shared( + options_.account_blob_url + "container/somefile", + options_.storage_credentials_provider); + std::string s = "some data"; + file_client->UploadFrom( + const_cast(reinterpret_cast(s.data())), s.size()); + } + + void TearDown() override { + auto containers = gen2_client_->ListFileSystems(); + for (auto container : containers.FileSystems) { + auto file_system_client = gen2_client_->GetFileSystemClient(container.Name); + file_system_client.DeleteIfExists(); + } + } + + void AssertObjectContents( + Azure::Storage::Files::DataLake::DataLakeServiceClient* client, + const std::string& container, const std::string& path_to_file, + const std::string& expected) { + auto path_client = + std::make_shared( + client->GetUrl() + container + "/" + path_to_file, + options_.storage_credentials_provider); + auto size = path_client->GetProperties().Value.FileSize; + if (size == 0) { + ASSERT_EQ(expected, ""); + return; + } + auto buf = AllocateBuffer(size, fs_->io_context().pool()); + Azure::Storage::Blobs::DownloadBlobToOptions download_options; + Azure::Core::Http::HttpRange range; + range.Offset = 0; + range.Length = size; + download_options.Range = Azure::Nullable(range); + auto file_client = + std::make_shared( + client->GetUrl() + container + "/" + path_to_file, + options_.storage_credentials_provider); + auto result = file_client + ->DownloadTo(reinterpret_cast(buf->get()->mutable_data()), + size, download_options) + .Value; + auto buf_data = std::move(buf->get()); + auto expected_data = std::make_shared( + reinterpret_cast(expected.data()), expected.size()); + AssertBufferEqual(*buf_data, *expected_data); + } +}; + +TEST_F(TestAzureFileSystem, FromUri) { + Uri uri; + + // Public container + ASSERT_OK(uri.Parse("https://testcontainer.dfs.core.windows.net/")); + ASSERT_OK_AND_ASSIGN(auto options, AzureOptions::FromUri(uri)); + ASSERT_EQ(options.credentials_kind, arrow::fs::AzureCredentialsKind::Anonymous); + ASSERT_EQ(options.account_dfs_url, "https://testcontainer.dfs.core.windows.net/"); + + // Sas Token + ASSERT_OK(uri.Parse( + "https://testblobcontainer.blob.core.windows.net/?dummy_sas_key=dummy_value")); + ASSERT_OK_AND_ASSIGN(options, AzureOptions::FromUri(uri)); + ASSERT_EQ(options.credentials_kind, arrow::fs::AzureCredentialsKind::Sas); + ASSERT_EQ(options.account_dfs_url, "https://testblobcontainer.dfs.core.windows.net/"); + ASSERT_EQ(options.account_blob_url, "https://testblobcontainer.blob.core.windows.net/"); + ASSERT_EQ(options.sas_token, "?dummy_sas_key=dummy_value"); +} + +TEST_F(TestAzureFileSystem, FromAccountKey) { + auto options = AzureOptions::FromAccountKey(GetAzuriteEnv()->account_name(), + GetAzuriteEnv()->account_key()) + .ValueOrDie(); + ASSERT_EQ(options.credentials_kind, + arrow::fs::AzureCredentialsKind::StorageCredentials); + ASSERT_NE(options.storage_credentials_provider, nullptr); +} + +TEST_F(TestAzureFileSystem, FromConnectionString) { + auto options = + AzureOptions::FromConnectionString( + "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=" + "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/" + "KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;") + .ValueOrDie(); + ASSERT_EQ(options.credentials_kind, arrow::fs::AzureCredentialsKind::ConnectionString); + ASSERT_NE(options.connection_string, ""); +} + +TEST_F(TestAzureFileSystem, FromServicePrincipleCredential) { + auto options = AzureOptions::FromServicePrincipleCredential( + "dummy_account_name", "dummy_tenant_id", "dummy_client_id", + "dummy_client_secret") + .ValueOrDie(); + ASSERT_EQ(options.credentials_kind, + arrow::fs::AzureCredentialsKind::ServicePrincipleCredentials); + ASSERT_NE(options.service_principle_credentials_provider, nullptr); +} + +TEST_F(TestAzureFileSystem, FromSas) { + auto options = + AzureOptions::FromSas( + "https://testcontainer.blob.core.windows.net/?dummy_sas_key=dummy_value") + .ValueOrDie(); + ASSERT_EQ(options.credentials_kind, arrow::fs::AzureCredentialsKind::Sas); + ASSERT_EQ(options.sas_token, "?dummy_sas_key=dummy_value"); +} + +TEST_F(TestAzureFileSystem, CreateDir) { + // New container + AssertFileInfo(fs_.get(), "new-container", FileType::NotFound); + ASSERT_OK(fs_->CreateDir("new-container")); + AssertFileInfo(fs_.get(), "new-container", FileType::Directory); + + // Existing container + ASSERT_OK(fs_->CreateDir("container")); + AssertFileInfo(fs_.get(), "container", FileType::Directory); + + ASSERT_RAISES(IOError, fs_->CreateDir("")); + + // Existing "file", should fail + ASSERT_RAISES(IOError, fs_->CreateDir("container/somefile")); + + // recursive, false + ASSERT_RAISES(IOError, fs_->CreateDir("container/newdir/newsub/newsubsub", false)); + + // recursive, true + ASSERT_RAISES(IOError, fs_->CreateDir("container/newdir/newsub/newsubsub", true)); +} + +TEST_F(TestAzureFileSystem, DeleteDir) { + // Container + ASSERT_OK(fs_->DeleteDir("container")); + AssertFileInfo(fs_.get(), "container", FileType::NotFound); + + // Nonexistent-Container + ASSERT_OK(fs_->DeleteDir("nonexistent-container")); + AssertFileInfo(fs_.get(), "nonexistent-container", FileType::NotFound); + + // root + ASSERT_RAISES(NotImplemented, fs_->DeleteDir("")); + + // Container/File + ASSERT_RAISES(IOError, fs_->DeleteDir("container/somefile")); + + // Container/Nonexistent-File + ASSERT_RAISES(IOError, fs_->DeleteDir("container/nonexistent-file")); +} + +TEST_F(TestAzureFileSystem, DeleteFile) { + // Container + ASSERT_RAISES(IOError, fs_->DeleteFile("container")); + + // Nonexistent-Container + ASSERT_RAISES(IOError, fs_->DeleteFile("nonexistent-container")); + + // root + ASSERT_RAISES(IOError, fs_->DeleteFile("")); + + // Container/File + ASSERT_OK(fs_->DeleteFile("container/somefile")); + + // Container/Nonexistent-File + ASSERT_RAISES(IOError, fs_->DeleteFile("container/somefile")); + + // Container/Directory/Directory + ASSERT_RAISES(IOError, fs_->DeleteFile("container/somedir/subdir")); +} + +TEST_F(TestAzureFileSystem, GetFileInfo) { + // Containers + AssertFileInfo(fs_.get(), "container", FileType::Directory); + AssertFileInfo(fs_.get(), "nonexistent-container", FileType::NotFound); + + AssertFileInfo(fs_.get(), "", FileType::Directory); + + // "Files" + AssertFileInfo(fs_.get(), "container/somefile", FileType::File); + AssertFileInfo(fs_.get(), "container/nonexistent-file.txt", FileType::NotFound); +} + +TEST_F(TestAzureFileSystem, GetFileInfoSelector) { + FileSelector select; + + // Non-empty container + select.base_dir = "container"; + ASSERT_OK_AND_ASSIGN(auto infos, fs_->GetFileInfo(select)); + ASSERT_EQ(infos.size(), 1); + + // Nonexistent-Container + select.base_dir = "nonexistent-container"; + ASSERT_RAISES(IOError, fs_->GetFileInfo(select)); + select.allow_not_found = true; + ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select)); + ASSERT_EQ(infos.size(), 0); + select.allow_not_found = false; + + // Root dir + select.base_dir = ""; + ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select)); + ASSERT_EQ(infos.size(), 2); + + // Container/File + select.base_dir = "container/somefile"; + ASSERT_RAISES(IOError, fs_->GetFileInfo(select)); +} + +TEST_F(TestAzureFileSystem, Move) { + ASSERT_RAISES(IOError, fs_->Move("container", "container/non-existentdir")); + ASSERT_RAISES(IOError, + fs_->Move("container/somedir/subdir", "container/newdir/newsub")); + ASSERT_RAISES(IOError, fs_->Move("container/emptydir", "container/base.txt")); + ASSERT_RAISES(IOError, + fs_->Move("container/nonexistent-directory", "container/base.txt")); + ASSERT_OK_AND_ASSIGN(auto res, fs_->OpenOutputStream("container/somefile")); + ASSERT_OK(res->Write("Changed the data")); + ASSERT_RAISES(IOError, fs_->Move("container/base.txt", "container/somefile")); + ASSERT_RAISES(IOError, fs_->Move("container/somefile", "container/base.txt")); + ASSERT_RAISES(IOError, + fs_->Move("container/nonexistent-file.txt", "container/non-existentdir")); +} + +TEST_F(TestAzureFileSystem, CopyFile) { + ASSERT_RAISES(IOError, fs_->CopyFile("container", "container/newfile")); + ASSERT_RAISES(IOError, fs_->CopyFile("container/somedir", "container/newfile")); + ASSERT_RAISES(IOError, fs_->CopyFile("container/somedir/subdir", "container/newfile")); + ASSERT_RAISES(IOError, fs_->CopyFile("container/base.txt", "container")); + ASSERT_RAISES(IOError, fs_->CopyFile("container/base.txt", "nonexistent-container")); + ASSERT_RAISES(IOError, fs_->CopyFile("container/base.txt", "")); + ASSERT_RAISES(IOError, fs_->CopyFile("container/base.txt", "container/somedir/subdir")); + + ASSERT_OK(fs_->CopyFile("container/somefile", "container/base.txt")); + ASSERT_OK(fs_->CopyFile("container/base.txt", "container/nonexistent-file")); + ASSERT_RAISES(IOError, fs_->CopyFile("container/somedir/subdir/subfile", "container")); + ASSERT_RAISES(IOError, fs_->CopyFile("container/somedir/subdir/subfile", + "nonexistent-container")); + ASSERT_RAISES(IOError, fs_->CopyFile("container/somedir/subdir/subfile", "")); + ASSERT_RAISES(IOError, + fs_->CopyFile("container/somedir/subdir/subfile", "container/somedir")); + ASSERT_RAISES(IOError, fs_->CopyFile("container/somedir/subdir/subfile", + "container/non-existentdir")); + ASSERT_RAISES(IOError, fs_->CopyFile("container/somedir/subdir/subfile", + "container/somedir/subdir")); +} + +TEST_F(TestAzureFileSystem, OpenInputStream) { + ASSERT_RAISES(IOError, fs_->OpenInputStream("container")); + ASSERT_RAISES(IOError, fs_->OpenInputStream("nonexistent-container")); + ASSERT_RAISES(IOError, fs_->OpenInputStream("")); + ASSERT_RAISES(IOError, fs_->OpenInputStream("container/somedir")); + ASSERT_RAISES(IOError, fs_->OpenInputStream("container/non-existentdir")); + ASSERT_RAISES(IOError, fs_->OpenInputStream("container/somedir/subdir")); + + // "Files" + ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenInputStream("container/somefile")); + ASSERT_OK_AND_ASSIGN(auto buf, stream->Read(2)); + AssertBufferEqual(*buf, "so"); + ASSERT_OK_AND_ASSIGN(buf, stream->Read(5)); + AssertBufferEqual(*buf, "me da"); + ASSERT_OK_AND_ASSIGN(buf, stream->Read(5)); + AssertBufferEqual(*buf, "ta"); + ASSERT_OK_AND_ASSIGN(buf, stream->Read(5)); + AssertBufferEqual(*buf, ""); +} + +TEST_F(TestAzureFileSystem, OpenInputFile) { + ASSERT_RAISES(IOError, fs_->OpenInputFile("container")); + ASSERT_RAISES(IOError, fs_->OpenInputFile("nonexistent-container")); + ASSERT_RAISES(IOError, fs_->OpenInputFile("")); + ASSERT_RAISES(IOError, fs_->OpenInputFile("container/somedir")); + ASSERT_RAISES(IOError, fs_->OpenInputFile("container/non-existentdir")); + ASSERT_RAISES(IOError, fs_->OpenInputFile("container/somedir/subdir")); + + // "Files" + ASSERT_OK_AND_ASSIGN(auto file, fs_->OpenInputFile("container/somefile")); + ASSERT_OK_AND_EQ(9, file->GetSize()); + ASSERT_OK_AND_ASSIGN(auto buf, file->Read(4)); + AssertBufferEqual(*buf, "some"); + ASSERT_OK_AND_EQ(9, file->GetSize()); + ASSERT_OK_AND_EQ(4, file->Tell()); + + ASSERT_OK_AND_ASSIGN(buf, file->ReadAt(2, 5)); + AssertBufferEqual(*buf, "me da"); + ASSERT_OK_AND_EQ(4, file->Tell()); + ASSERT_OK_AND_ASSIGN(buf, file->ReadAt(5, 20)); + AssertBufferEqual(*buf, "data"); + ASSERT_OK_AND_ASSIGN(buf, file->ReadAt(9, 20)); + AssertBufferEqual(*buf, ""); + + char result[20]; + ASSERT_OK_AND_EQ(5, file->ReadAt(2, 5, &result)); + ASSERT_OK_AND_EQ(4, file->ReadAt(5, 20, &result)); + ASSERT_OK_AND_EQ(0, file->ReadAt(9, 0, &result)); + + // Reading past end of file + ASSERT_RAISES(IOError, file->ReadAt(10, 20)); + + ASSERT_OK(file->Seek(5)); + ASSERT_OK_AND_ASSIGN(buf, file->Read(2)); + AssertBufferEqual(*buf, "da"); + ASSERT_OK(file->Seek(9)); + ASSERT_OK_AND_ASSIGN(buf, file->Read(2)); + AssertBufferEqual(*buf, ""); + // Seeking past end of file + ASSERT_RAISES(IOError, file->Seek(10)); +} + +TEST_F(TestAzureFileSystem, OpenOutputStream) { + ASSERT_RAISES(IOError, fs_->OpenOutputStream("container")); + ASSERT_RAISES(IOError, fs_->OpenOutputStream("nonexistent-container")); + ASSERT_RAISES(IOError, fs_->OpenOutputStream("")); + ASSERT_RAISES(IOError, fs_->OpenOutputStream("container/somedir/subdir")); + + // Create new empty file + ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenOutputStream("container/newfile1")); + ASSERT_OK(stream->Close()); + AssertObjectContents(gen2_client_.get(), "container", "newfile1", ""); + + // Create new file with 1 small write + ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("container/newfile2")); + ASSERT_OK(stream->Write("some data")); + ASSERT_OK(stream->Close()); + AssertObjectContents(gen2_client_.get(), "container", "newfile2", "some data"); + + // Create new file with 3 small writes + ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("container/newfile3")); + ASSERT_OK(stream->Write("some ")); + ASSERT_OK(stream->Write("")); + ASSERT_OK(stream->Write("new data")); + ASSERT_OK(stream->Close()); + AssertObjectContents(gen2_client_.get(), "container", "newfile3", "some new data"); + + // Overwrite + ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("container/newfile1")); + ASSERT_OK(stream->Write("overwritten data")); + ASSERT_OK(stream->Close()); + AssertObjectContents(gen2_client_.get(), "container", "newfile1", "overwritten data"); + + // Overwrite and make empty + ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("container/newfile1")); + ASSERT_OK(stream->Close()); + AssertObjectContents(gen2_client_.get(), "container", "newfile1", ""); +} + +TEST_F(TestAzureFileSystem, OpenAppendStream) { + ASSERT_RAISES(IOError, fs_->OpenAppendStream("container")); + ASSERT_RAISES(IOError, fs_->OpenAppendStream("nonexistent-container")); + ASSERT_RAISES(IOError, fs_->OpenAppendStream("")); + ASSERT_RAISES(IOError, fs_->OpenAppendStream("container/somedir/subdir")); + + // Create new empty file + ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenAppendStream("container/newfile1")); + ASSERT_OK(stream->Close()); + AssertObjectContents(gen2_client_.get(), "container", "newfile1", ""); + + // Create new file with 1 small write + ASSERT_OK_AND_ASSIGN(stream, fs_->OpenAppendStream("container/newfile2")); + ASSERT_OK(stream->Write("some data")); + ASSERT_OK(stream->Close()); + AssertObjectContents(gen2_client_.get(), "container", "newfile2", "some data"); + + // Create new file with 3 small writes + ASSERT_OK_AND_ASSIGN(stream, fs_->OpenAppendStream("container/newfile3")); + ASSERT_OK(stream->Write("some ")); + ASSERT_OK(stream->Write("")); + ASSERT_OK(stream->Write("new data")); + ASSERT_OK(stream->Close()); + AssertObjectContents(gen2_client_.get(), "container", "newfile3", "some new data"); + + // Append to empty file + ASSERT_OK_AND_ASSIGN(stream, fs_->OpenAppendStream("container/newfile1")); + ASSERT_OK(stream->Write("append data")); + ASSERT_OK(stream->Close()); + AssertObjectContents(gen2_client_.get(), "container", "newfile1", "append data"); + + // Append to non-empty file + ASSERT_OK_AND_ASSIGN(stream, fs_->OpenAppendStream("container/newfile1")); + ASSERT_OK(stream->Write(", more data")); + ASSERT_OK(stream->Close()); + AssertObjectContents(gen2_client_.get(), "container", "newfile1", + "append data, more data"); +} + +TEST_F(TestAzureFileSystem, DeleteDirContents) { + // Container + ASSERT_OK(fs_->DeleteDirContents("container")); + AssertFileInfo(fs_.get(), "container", FileType::Directory); + + // Nonexistent-Container + ASSERT_RAISES(IOError, fs_->DeleteDirContents("nonexistent-container")); + AssertFileInfo(fs_.get(), "container3", FileType::NotFound); + + // root + ASSERT_RAISES(IOError, fs_->DeleteDirContents("")); + + // Container/File + ASSERT_OK_AND_ASSIGN(auto res, fs_->OpenOutputStream("container/somefile")); + ASSERT_OK(res->Write("some data")); + ASSERT_RAISES(IOError, fs_->DeleteDirContents("container/somefile")); + AssertFileInfo(fs_.get(), "container/somefile", FileType::File); +} + +} // namespace internal +} // namespace fs +} // namespace arrow diff --git a/cpp/thirdparty/versions.txt b/cpp/thirdparty/versions.txt index 5c04756d46c..2e87bc6d828 100644 --- a/cpp/thirdparty/versions.txt +++ b/cpp/thirdparty/versions.txt @@ -33,6 +33,16 @@ ARROW_AWS_C_COMMON_BUILD_VERSION=v0.6.9 ARROW_AWS_C_COMMON_BUILD_SHA256_CHECKSUM=928a3e36f24d1ee46f9eec360ec5cebfe8b9b8994fe39d4fa74ff51aebb12717 ARROW_AWS_C_EVENT_STREAM_BUILD_VERSION=v0.1.5 ARROW_AWS_C_EVENT_STREAM_BUILD_SHA256_CHECKSUM=f1b423a487b5d6dca118bfc0d0c6cc596dc476b282258a3228e73a8f730422d4 +ARROW_AZURE_CORE_BUILD_VERSION=1.7.1 +ARROW_AZURE_CORE_BUILD_SHA256_CHECKSUM=ae6f03e65d9773d11cf3b9619d0bc7f567272974cf31b9e1c8ca2fa0ea4fb4c6 +ARROW_AZURE_IDENTITY_BUILD_VERSION=1.3.0 +ARROW_AZURE_IDENTITY_BUILD_SHA256_CHECKSUM=46701acd8000f317d1c4b33263d5d3203924fadcfa5af4860ae9187046a72c45 +ARROW_AZURE_STORAGE_BLOBS_BUILD_VERSION=12.5.0 +ARROW_AZURE_STORAGE_BLOBS_BUILD_SHA256_CHECKSUM=12394d864144ced9fc3562ad48cfe3426604e871b5aa72853ca398e086f0c594 +ARROW_AZURE_STORAGE_COMMON_BUILD_VERSION=12.2.4 +ARROW_AZURE_STORAGE_COMMON_BUILD_SHA256_CHECKSUM=7644b4355b492ba2039236b9fd56c3e7bb80aad983d8bac6a731d74aaf64e03f +ARROW_AZURE_STORAGE_FILES_DATALAKE_BUILD_VERSION=12.3.1 +ARROW_AZURE_STORAGE_FILES_DATALAKE_BUILD_SHA256_CHECKSUM=a5b74076a751d7cfaf7c56674a40ce2792c4fab9add18758fab1fe091d00baff ARROW_BOOST_BUILD_VERSION=1.75.0 ARROW_BOOST_BUILD_SHA256_CHECKSUM=267e04a7c0bfe85daf796dedc789c3a27a76707e1c968f0a2a87bb96331e2b61 ARROW_BROTLI_BUILD_VERSION=v1.0.9 diff --git a/cpp/vcpkg.json b/cpp/vcpkg.json index 915777f0899..88ad19acf5d 100644 --- a/cpp/vcpkg.json +++ b/cpp/vcpkg.json @@ -14,6 +14,11 @@ "transfer" ] }, + "azure-core-cpp", + "azure-identity-cpp", + "azure-storage-blobs-cpp", + "azure-storage-common-cpp", + "azure-storage-files-datalake-cpp", "benchmark", "boost-filesystem", "boost-multiprecision",