Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions cpp/examples/arrow/dataset-parquet-scan-example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include <arrow/dataset/file_parquet.h>
#include <arrow/dataset/filter.h>
#include <arrow/dataset/scanner.h>
#include <arrow/filesystem/localfs.h>
#include <arrow/filesystem/filesystem.h>

using arrow::field;
using arrow::int16;
Expand Down Expand Up @@ -65,6 +65,10 @@ struct Configuration {
std::shared_ptr<ds::Expression> filter = ("total_amount"_ > 1000.0f).Copy();
} conf;

std::shared_ptr<fs::FileSystem> GetFileSystemFromUri(const std::string& uri, std::string* path) {
return fs::FileSystemFromUri(uri, path).ValueOrDie();
}

std::shared_ptr<ds::Dataset> GetDatasetFromPath(std::shared_ptr<fs::FileSystem> fs,
std::shared_ptr<ds::FileFormat> format,
std::string path) {
Expand Down Expand Up @@ -113,15 +117,17 @@ std::shared_ptr<Table> GetTableFromScanner(std::shared_ptr<ds::Scanner> scanner)
}

int main(int argc, char** argv) {
auto fs = std::make_shared<fs::LocalFileSystem>();
auto format = std::make_shared<ds::ParquetFileFormat>();

if (argc != 2) {
// Fake success for CI purposes.
return EXIT_SUCCESS;
}

auto dataset = GetDatasetFromPath(fs, format, argv[1]);
std::string path;
auto fs = GetFileSystemFromUri(argv[1], &path);

auto dataset = GetDatasetFromPath(fs, format, path);

auto scanner = GetScannerFromDataset(dataset, conf.projected_columns, conf.filter,
conf.use_threads);
Expand Down
15 changes: 14 additions & 1 deletion cpp/src/arrow/filesystem/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
#ifdef ARROW_HDFS
#include "arrow/filesystem/hdfs.h"
#endif
#ifdef ARROW_S3
#include "arrow/filesystem/s3fs.h"
#endif
#include "arrow/filesystem/localfs.h"
#include "arrow/filesystem/mockfs.h"
#include "arrow/filesystem/path_util.h"
Expand Down Expand Up @@ -386,7 +389,17 @@ Result<std::shared_ptr<FileSystem>> FileSystemFromUriReal(const FileSystemUri& f
ARROW_ASSIGN_OR_RAISE(auto hdfs, HadoopFileSystem::Make(options));
return hdfs;
#else
return Status::NotImplemented("Arrow compiled without HDFS support");
return Status::NotImplemented("Got HDFS URI but Arrow compiled without HDFS support");
#endif
}
if (fsuri.scheme == "s3") {
#ifdef ARROW_S3
RETURN_NOT_OK(EnsureS3Initialized());
ARROW_ASSIGN_OR_RAISE(auto options, S3Options::FromUri(fsuri.uri, out_path));
ARROW_ASSIGN_OR_RAISE(auto s3fs, S3FileSystem::Make(options));
return s3fs;
#else
return Status::NotImplemented("Got S3 URI but Arrow compiled without S3 support");
#endif
}

Expand Down
90 changes: 85 additions & 5 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <condition_variable>
#include <mutex>
#include <sstream>
#include <unordered_map>
#include <utility>

#ifdef _WIN32
Expand Down Expand Up @@ -71,6 +72,9 @@
#include "arrow/util/logging.h"

namespace arrow {

using internal::Uri;

namespace fs {

using ::Aws::Client::AWSError;
Expand All @@ -91,12 +95,13 @@ const char* kS3DefaultRegion = "us-east-1";

static const char kSep = '/';

static std::mutex aws_init_lock;
static Aws::SDKOptions aws_options;
static std::atomic<bool> aws_initialized(false);
namespace {

std::mutex aws_init_lock;
Aws::SDKOptions aws_options;
std::atomic<bool> aws_initialized(false);

Status InitializeS3(const S3GlobalOptions& options) {
std::lock_guard<std::mutex> lock(aws_init_lock);
Status DoInitializeS3(const S3GlobalOptions& options) {
Aws::Utils::Logging::LogLevel aws_log_level;

#define LOG_LEVEL_CASE(level_name) \
Expand Down Expand Up @@ -128,13 +133,29 @@ Status InitializeS3(const S3GlobalOptions& options) {
return Status::OK();
}

} // namespace

Status InitializeS3(const S3GlobalOptions& options) {
std::lock_guard<std::mutex> lock(aws_init_lock);
return DoInitializeS3(options);
}

Status FinalizeS3() {
std::lock_guard<std::mutex> lock(aws_init_lock);
Aws::ShutdownAPI(aws_options);
aws_initialized.store(false);
return Status::OK();
}

Status EnsureS3Initialized() {
std::lock_guard<std::mutex> lock(aws_init_lock);
if (!aws_initialized.load()) {
S3GlobalOptions options{S3LogLevel::Fatal};
return DoInitializeS3(options);
}
return Status::OK();
}

void S3Options::ConfigureDefaultCredentials() {
credentials_provider =
std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
Expand All @@ -159,6 +180,65 @@ S3Options S3Options::FromAccessKey(const std::string& access_key,
return options;
}

Result<S3Options> S3Options::FromUri(const Uri& uri, std::string* out_path) {
S3Options options;

const auto bucket = uri.host();
auto path = uri.path();
if (bucket.empty()) {
if (!path.empty()) {
return Status::Invalid("Missing bucket name in S3 URI");
}
} else {
if (path.empty()) {
path = bucket;
} else {
if (path[0] != '/') {
return Status::Invalid("S3 URI should absolute, not relative");
}
path = bucket + path;
}
}
if (out_path != nullptr) {
*out_path = std::string(internal::RemoveTrailingSlash(path));
}

std::unordered_map<std::string, std::string> options_map;
ARROW_ASSIGN_OR_RAISE(const auto options_items, uri.query_items());
for (const auto& kv : options_items) {
options_map.emplace(kv.first, kv.second);
}

const auto username = uri.username();
if (!username.empty()) {
options.ConfigureAccessKey(username, uri.password());
} else {
options.ConfigureDefaultCredentials();
}

auto it = options_map.find("region");
if (it != options_map.end()) {
options.region = it->second;
}
it = options_map.find("scheme");
if (it != options_map.end()) {
options.scheme = it->second;
}
it = options_map.find("endpoint_override");
if (it != options_map.end()) {
options.endpoint_override = it->second;
}

return options;
}

Result<S3Options> S3Options::FromUri(const std::string& uri_string,
std::string* out_path) {
Uri uri;
RETURN_NOT_OK(uri.Parse(uri_string));
return FromUri(uri, out_path);
}

namespace {

Status CheckS3Initialized() {
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/arrow/filesystem/s3fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "arrow/filesystem/filesystem.h"
#include "arrow/util/macros.h"
#include "arrow/util/uri.h"

namespace Aws {
namespace Auth {
Expand Down Expand Up @@ -68,6 +69,11 @@ struct ARROW_EXPORT S3Options {
/// \brief Initialize with explicit access and secret key
static S3Options FromAccessKey(const std::string& access_key,
const std::string& secret_key);

static Result<S3Options> FromUri(const ::arrow::internal::Uri& uri,
std::string* out_path = NULLPTR);
static Result<S3Options> FromUri(const std::string& uri,
std::string* out_path = NULLPTR);
};

/// S3-backed FileSystem implementation.
Expand Down Expand Up @@ -145,6 +151,11 @@ struct ARROW_EXPORT S3GlobalOptions {
ARROW_EXPORT
Status InitializeS3(const S3GlobalOptions& options);

/// Ensure the S3 APIs are initialized, but only if not already done.
/// If necessary, this will call InitializeS3() with some default options.
ARROW_EXPORT
Status EnsureS3Initialized();

/// Shutdown the S3 APIs.
ARROW_EXPORT
Status FinalizeS3();
Expand Down
64 changes: 64 additions & 0 deletions cpp/src/arrow/filesystem/s3fs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

#include <aws/core/Aws.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/core/auth/AWSCredentialsProvider.h>
#include <aws/core/client/RetryStrategy.h>
#include <aws/core/utils/logging/ConsoleLogSystem.h>
#include <aws/s3/S3Client.h>
Expand All @@ -68,6 +69,7 @@ namespace fs {

using ::arrow::internal::PlatformFilename;
using ::arrow::internal::TemporaryDir;
using ::arrow::internal::UriEscape;

using ::arrow::fs::internal::ConnectRetryStrategy;
using ::arrow::fs::internal::ErrorToStatus;
Expand Down Expand Up @@ -237,6 +239,54 @@ void AssertObjectContents(Aws::S3::S3Client* client, const std::string& bucket,
AssertGetObject(result, expected);
}

////////////////////////////////////////////////////////////////////////////
// S3Options tests

TEST(S3Options, FromUri) {
std::string path;
S3Options options;

ASSERT_OK_AND_ASSIGN(options, S3Options::FromUri("s3://", &path));
ASSERT_EQ(options.region, kS3DefaultRegion);
ASSERT_EQ(options.scheme, "https");
ASSERT_EQ(options.endpoint_override, "");
ASSERT_EQ(path, "");

ASSERT_OK_AND_ASSIGN(options, S3Options::FromUri("s3:", &path));
ASSERT_EQ(path, "");

ASSERT_OK_AND_ASSIGN(options, S3Options::FromUri("s3://access:secret@mybucket", &path));
ASSERT_EQ(path, "mybucket");
const auto creds = options.credentials_provider->GetAWSCredentials();
ASSERT_EQ(creds.GetAWSAccessKeyId(), "access");
ASSERT_EQ(creds.GetAWSSecretKey(), "secret");

ASSERT_OK_AND_ASSIGN(options, S3Options::FromUri("s3://mybucket/", &path));
ASSERT_EQ(options.region, kS3DefaultRegion);
ASSERT_EQ(options.scheme, "https");
ASSERT_EQ(options.endpoint_override, "");
ASSERT_EQ(path, "mybucket");

ASSERT_OK_AND_ASSIGN(options, S3Options::FromUri("s3://mybucket/foo/bar/", &path));
ASSERT_EQ(options.region, kS3DefaultRegion);
ASSERT_EQ(options.scheme, "https");
ASSERT_EQ(options.endpoint_override, "");
ASSERT_EQ(path, "mybucket/foo/bar");

ASSERT_OK_AND_ASSIGN(
options,
S3Options::FromUri(
"s3://mybucket/foo/bar/?region=utopia&endpoint_override=localhost&scheme=http",
&path));
ASSERT_EQ(options.region, "utopia");
ASSERT_EQ(options.scheme, "http");
ASSERT_EQ(options.endpoint_override, "localhost");
ASSERT_EQ(path, "mybucket/foo/bar");

// Missing bucket name
ASSERT_RAISES(Invalid, S3Options::FromUri("s3:///foo/bar/", &path));
}

////////////////////////////////////////////////////////////////////////////
// Basic test for the Minio test server.

Expand Down Expand Up @@ -758,6 +808,20 @@ TEST_F(TestS3FS, OpenOutputStreamDestructorSyncWrite) {
TestOpenOutputStreamDestructor();
}

TEST_F(TestS3FS, FileSystemFromUri) {
std::stringstream ss;
ss << "s3://" << minio_.access_key() << ":" << minio_.secret_key()
<< "@bucket/somedir/subdir/subfile"
<< "?scheme=http&endpoint_override=" << UriEscape(minio_.connect_string());

std::string path;
ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri(ss.str(), &path));
ASSERT_EQ(path, "bucket/somedir/subdir/subfile");

// Check the filesystem has the right connection parameters
AssertFileStats(fs.get(), path, FileType::File, 8);
}

////////////////////////////////////////////////////////////////////////////
// Generic S3 tests

Expand Down
30 changes: 30 additions & 0 deletions cpp/src/arrow/util/uri.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ bool IsTextRangeSet(const UriTextRangeStructA& range) { return range.first != nu

} // namespace

std::string UriEscape(const std::string& s) {
std::string escaped;
escaped.resize(3 * s.length());

auto end = uriEscapeExA(s.data(), s.data() + s.length(), &escaped[0],
/*spaceToPlus=*/URI_FALSE, /*normalizeBreaks=*/URI_FALSE);
escaped.resize(end - &escaped[0]);
return escaped;
}

struct Uri::Impl {
Impl() : string_rep_(""), port_(-1) { memset(&uri_, 0, sizeof(uri_)); }

Expand Down Expand Up @@ -95,6 +105,26 @@ std::string Uri::port_text() const { return TextRangeToString(impl_->uri_.portTe

int32_t Uri::port() const { return impl_->port_; }

std::string Uri::username() const {
auto userpass = TextRangeToView(impl_->uri_.userInfo);
auto sep_pos = userpass.find_first_of(':');
if (sep_pos == util::string_view::npos) {
return std::string(userpass);
} else {
return std::string(userpass.substr(0, sep_pos));
}
}

std::string Uri::password() const {
auto userpass = TextRangeToView(impl_->uri_.userInfo);
auto sep_pos = userpass.find_first_of(':');
if (sep_pos == util::string_view::npos) {
return std::string();
} else {
return std::string(userpass.substr(sep_pos + 1));
}
}

std::string Uri::path() const {
// Gather path segments
std::vector<util::string_view> segments;
Expand Down
Loading