From 0fd17cee7c7e75e2fbf510861bbff932143b24e7 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 10 Feb 2020 17:30:06 +0100 Subject: [PATCH 1/3] ARROW-7761: [C++][Python] Support S3 URIs Allow instantiating an S3 filesystem instance from a `s3://user:pass@bucket/path?params...` URI. --- cpp/src/arrow/filesystem/filesystem.cc | 14 +++++- cpp/src/arrow/filesystem/s3fs.cc | 63 +++++++++++++++++++++++++ cpp/src/arrow/filesystem/s3fs.h | 6 +++ cpp/src/arrow/filesystem/s3fs_test.cc | 64 ++++++++++++++++++++++++++ cpp/src/arrow/util/uri.cc | 30 ++++++++++++ cpp/src/arrow/util/uri.h | 14 ++++++ cpp/src/arrow/util/uri_test.cc | 48 +++++++++++++++++++ python/pyarrow/tests/test_fs.py | 20 ++++++++ 8 files changed, 258 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/filesystem/filesystem.cc b/cpp/src/arrow/filesystem/filesystem.cc index 737cd9360b8..a4345557ea0 100644 --- a/cpp/src/arrow/filesystem/filesystem.cc +++ b/cpp/src/arrow/filesystem/filesystem.cc @@ -22,6 +22,9 @@ #ifdef ARROW_HDFS #include "arrow/filesystem/hdfs.h" #endif +#ifdef ARROW_S3 +#include "arrow/filesystem/s3fs.h" +#endif #include "arrow/filesystem/localfs.h" #include "arrow/filesystem/mockfs.h" #include "arrow/filesystem/path_util.h" @@ -386,7 +389,16 @@ Result> FileSystemFromUriReal(const FileSystemUri& f ARROW_ASSIGN_OR_RAISE(auto hdfs, HadoopFileSystem::Make(options)); return hdfs; #else - return Status::NotImplemented("Arrow compiled without HDFS support"); + return Status::NotImplemented("Got HDFS URI but Arrow compiled without HDFS support"); +#endif + } + if (fsuri.scheme == "s3") { +#ifdef ARROW_S3 + ARROW_ASSIGN_OR_RAISE(auto options, S3Options::FromUri(fsuri.uri, out_path)); + ARROW_ASSIGN_OR_RAISE(auto s3fs, S3FileSystem::Make(options)); + return s3fs; +#else + return Status::NotImplemented("Got S3 URI but Arrow compiled without S3 support"); #endif } diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index b3d2939aa11..b7d02ddb08c 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -22,6 +22,7 @@ #include #include #include +#include #include #ifdef _WIN32 @@ -71,6 +72,9 @@ #include "arrow/util/logging.h" namespace arrow { + +using internal::Uri; + namespace fs { using ::Aws::Client::AWSError; @@ -159,6 +163,65 @@ S3Options S3Options::FromAccessKey(const std::string& access_key, return options; } +Result S3Options::FromUri(const Uri& uri, std::string* out_path) { + S3Options options; + + const auto bucket = uri.host(); + auto path = uri.path(); + if (bucket.empty()) { + if (!path.empty()) { + return Status::Invalid("Missing bucket name in S3 URI"); + } + } else { + if (path.empty()) { + path = bucket; + } else { + if (path[0] != '/') { + return Status::Invalid("S3 URI should absolute, not relative"); + } + path = bucket + path; + } + } + if (out_path != nullptr) { + *out_path = std::string(internal::RemoveTrailingSlash(path)); + } + + std::unordered_map options_map; + ARROW_ASSIGN_OR_RAISE(const auto options_items, uri.query_items()); + for (const auto& kv : options_items) { + options_map.emplace(kv.first, kv.second); + } + + const auto username = uri.username(); + if (!username.empty()) { + options.ConfigureAccessKey(username, uri.password()); + } else { + options.ConfigureDefaultCredentials(); + } + + auto it = options_map.find("region"); + if (it != options_map.end()) { + options.region = it->second; + } + it = options_map.find("scheme"); + if (it != options_map.end()) { + options.scheme = it->second; + } + it = options_map.find("endpoint_override"); + if (it != options_map.end()) { + options.endpoint_override = it->second; + } + + return options; +} + +Result S3Options::FromUri(const std::string& uri_string, + std::string* out_path) { + Uri uri; + RETURN_NOT_OK(uri.Parse(uri_string)); + return FromUri(uri, out_path); +} + namespace { Status CheckS3Initialized() { diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h index a4676520e52..c74f3261789 100644 --- a/cpp/src/arrow/filesystem/s3fs.h +++ b/cpp/src/arrow/filesystem/s3fs.h @@ -23,6 +23,7 @@ #include "arrow/filesystem/filesystem.h" #include "arrow/util/macros.h" +#include "arrow/util/uri.h" namespace Aws { namespace Auth { @@ -68,6 +69,11 @@ struct ARROW_EXPORT S3Options { /// \brief Initialize with explicit access and secret key static S3Options FromAccessKey(const std::string& access_key, const std::string& secret_key); + + static Result FromUri(const ::arrow::internal::Uri& uri, + std::string* out_path = NULLPTR); + static Result FromUri(const std::string& uri, + std::string* out_path = NULLPTR); }; /// S3-backed FileSystem implementation. diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc index 4183858a5e2..218e6419740 100644 --- a/cpp/src/arrow/filesystem/s3fs_test.cc +++ b/cpp/src/arrow/filesystem/s3fs_test.cc @@ -44,6 +44,7 @@ #include #include +#include #include #include #include @@ -68,6 +69,7 @@ namespace fs { using ::arrow::internal::PlatformFilename; using ::arrow::internal::TemporaryDir; +using ::arrow::internal::UriEscape; using ::arrow::fs::internal::ConnectRetryStrategy; using ::arrow::fs::internal::ErrorToStatus; @@ -237,6 +239,54 @@ void AssertObjectContents(Aws::S3::S3Client* client, const std::string& bucket, AssertGetObject(result, expected); } +//////////////////////////////////////////////////////////////////////////// +// S3Options tests + +TEST(S3Options, FromUri) { + std::string path; + S3Options options; + + ASSERT_OK_AND_ASSIGN(options, S3Options::FromUri("s3://", &path)); + ASSERT_EQ(options.region, kS3DefaultRegion); + ASSERT_EQ(options.scheme, "https"); + ASSERT_EQ(options.endpoint_override, ""); + ASSERT_EQ(path, ""); + + ASSERT_OK_AND_ASSIGN(options, S3Options::FromUri("s3:", &path)); + ASSERT_EQ(path, ""); + + ASSERT_OK_AND_ASSIGN(options, S3Options::FromUri("s3://access:secret@mybucket", &path)); + ASSERT_EQ(path, "mybucket"); + const auto creds = options.credentials_provider->GetAWSCredentials(); + ASSERT_EQ(creds.GetAWSAccessKeyId(), "access"); + ASSERT_EQ(creds.GetAWSSecretKey(), "secret"); + + ASSERT_OK_AND_ASSIGN(options, S3Options::FromUri("s3://mybucket/", &path)); + ASSERT_EQ(options.region, kS3DefaultRegion); + ASSERT_EQ(options.scheme, "https"); + ASSERT_EQ(options.endpoint_override, ""); + ASSERT_EQ(path, "mybucket"); + + ASSERT_OK_AND_ASSIGN(options, S3Options::FromUri("s3://mybucket/foo/bar/", &path)); + ASSERT_EQ(options.region, kS3DefaultRegion); + ASSERT_EQ(options.scheme, "https"); + ASSERT_EQ(options.endpoint_override, ""); + ASSERT_EQ(path, "mybucket/foo/bar"); + + ASSERT_OK_AND_ASSIGN( + options, + S3Options::FromUri( + "s3://mybucket/foo/bar/?region=utopia&endpoint_override=localhost&scheme=http", + &path)); + ASSERT_EQ(options.region, "utopia"); + ASSERT_EQ(options.scheme, "http"); + ASSERT_EQ(options.endpoint_override, "localhost"); + ASSERT_EQ(path, "mybucket/foo/bar"); + + // Missing bucket name + ASSERT_RAISES(Invalid, S3Options::FromUri("s3:///foo/bar/", &path)); +} + //////////////////////////////////////////////////////////////////////////// // Basic test for the Minio test server. @@ -758,6 +808,20 @@ TEST_F(TestS3FS, OpenOutputStreamDestructorSyncWrite) { TestOpenOutputStreamDestructor(); } +TEST_F(TestS3FS, FileSystemFromUri) { + std::stringstream ss; + ss << "s3://" << minio_.access_key() << ":" << minio_.secret_key() + << "@bucket/somedir/subdir/subfile" + << "?scheme=http&endpoint_override=" << UriEscape(minio_.connect_string()); + + std::string path; + ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri(ss.str(), &path)); + ASSERT_EQ(path, "bucket/somedir/subdir/subfile"); + + // Check the filesystem has the right connection parameters + AssertFileStats(fs.get(), path, FileType::File, 8); +} + //////////////////////////////////////////////////////////////////////////// // Generic S3 tests diff --git a/cpp/src/arrow/util/uri.cc b/cpp/src/arrow/util/uri.cc index 8fbe96babad..56c6d85191d 100644 --- a/cpp/src/arrow/util/uri.cc +++ b/cpp/src/arrow/util/uri.cc @@ -50,6 +50,16 @@ bool IsTextRangeSet(const UriTextRangeStructA& range) { return range.first != nu } // namespace +std::string UriEscape(const std::string& s) { + std::string escaped; + escaped.resize(3 * s.length()); + + auto end = uriEscapeExA(s.data(), s.data() + s.length(), &escaped[0], + /*spaceToPlus=*/URI_FALSE, /*normalizeBreaks=*/URI_FALSE); + escaped.resize(end - &escaped[0]); + return escaped; +} + struct Uri::Impl { Impl() : string_rep_(""), port_(-1) { memset(&uri_, 0, sizeof(uri_)); } @@ -95,6 +105,26 @@ std::string Uri::port_text() const { return TextRangeToString(impl_->uri_.portTe int32_t Uri::port() const { return impl_->port_; } +std::string Uri::username() const { + auto userpass = TextRangeToView(impl_->uri_.userInfo); + auto sep_pos = userpass.find_first_of(':'); + if (sep_pos == util::string_view::npos) { + return std::string(userpass); + } else { + return std::string(userpass.substr(0, sep_pos)); + } +} + +std::string Uri::password() const { + auto userpass = TextRangeToView(impl_->uri_.userInfo); + auto sep_pos = userpass.find_first_of(':'); + if (sep_pos == util::string_view::npos) { + return std::string(); + } else { + return std::string(userpass.substr(sep_pos + 1)); + } +} + std::string Uri::path() const { // Gather path segments std::vector segments; diff --git a/cpp/src/arrow/util/uri.h b/cpp/src/arrow/util/uri.h index 3ee71fab6f7..6ef54900020 100644 --- a/cpp/src/arrow/util/uri.h +++ b/cpp/src/arrow/util/uri.h @@ -43,6 +43,7 @@ class ARROW_EXPORT Uri { /// The URI scheme, such as "http", or the empty string if the URI has no /// explicit scheme. std::string scheme() const; + /// Whether the URI has an explicit host name. This may return true if /// the URI has an empty host (e.g. "file:///tmp/foo"), while it returns /// false is the URI has not host component at all (e.g. "file:/tmp/foo"). @@ -50,16 +51,25 @@ class ARROW_EXPORT Uri { /// The URI host name, such as "localhost", "127.0.0.1" or "::1", or the empty /// string is the URI does not have a host component. std::string host() const; + /// The URI port number, as a string such as "80", or the empty string is the URI /// does not have a port number component. std::string port_text() const; /// The URI port parsed as an integer, or -1 if the URI does not have a port /// number component. int32_t port() const; + + /// The username specified in the URI. + std::string username() const; + /// The password specified in the URI. + std::string password() const; + /// The URI path component. std::string path() const; + /// The URI query string std::string query_string() const; + /// The URI query items /// /// Note this API doesn't allow differentiating between an empty value @@ -77,5 +87,9 @@ class ARROW_EXPORT Uri { std::unique_ptr impl_; }; +/// Percent-encode the input string, for use e.g. as a URI query parameter. +ARROW_EXPORT +std::string UriEscape(const std::string& s); + } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/uri_test.cc b/cpp/src/arrow/util/uri_test.cc index d06e3265867..b94b46868b8 100644 --- a/cpp/src/arrow/util/uri_test.cc +++ b/cpp/src/arrow/util/uri_test.cc @@ -29,6 +29,12 @@ namespace arrow { namespace internal { +TEST(UriEscape, Basics) { + ASSERT_EQ(UriEscape(""), ""); + ASSERT_EQ(UriEscape("foo123"), "foo123"); + ASSERT_EQ(UriEscape("/El NiƱo/"), "%2FEl%20Ni%C3%B1o%2F"); +} + TEST(Uri, Empty) { Uri uri; ASSERT_EQ(uri.scheme(), ""); @@ -128,42 +134,84 @@ TEST(Uri, ParseHostPort) { ASSERT_EQ(uri.host(), "localhost"); ASSERT_EQ(uri.port_text(), "80"); ASSERT_EQ(uri.port(), 80); + ASSERT_EQ(uri.username(), ""); + ASSERT_EQ(uri.password(), ""); ASSERT_OK(uri.Parse("http://1.2.3.4")); ASSERT_EQ(uri.scheme(), "http"); ASSERT_EQ(uri.host(), "1.2.3.4"); ASSERT_EQ(uri.port_text(), ""); ASSERT_EQ(uri.port(), -1); + ASSERT_EQ(uri.username(), ""); + ASSERT_EQ(uri.password(), ""); ASSERT_OK(uri.Parse("http://1.2.3.4:")); ASSERT_EQ(uri.scheme(), "http"); ASSERT_EQ(uri.host(), "1.2.3.4"); ASSERT_EQ(uri.port_text(), ""); ASSERT_EQ(uri.port(), -1); + ASSERT_EQ(uri.username(), ""); + ASSERT_EQ(uri.password(), ""); ASSERT_OK(uri.Parse("http://1.2.3.4:80")); ASSERT_EQ(uri.scheme(), "http"); ASSERT_EQ(uri.host(), "1.2.3.4"); ASSERT_EQ(uri.port_text(), "80"); ASSERT_EQ(uri.port(), 80); + ASSERT_EQ(uri.username(), ""); + ASSERT_EQ(uri.password(), ""); ASSERT_OK(uri.Parse("http://[::1]")); ASSERT_EQ(uri.scheme(), "http"); ASSERT_EQ(uri.host(), "::1"); ASSERT_EQ(uri.port_text(), ""); ASSERT_EQ(uri.port(), -1); + ASSERT_EQ(uri.username(), ""); + ASSERT_EQ(uri.password(), ""); ASSERT_OK(uri.Parse("http://[::1]:")); ASSERT_EQ(uri.scheme(), "http"); ASSERT_EQ(uri.host(), "::1"); ASSERT_EQ(uri.port_text(), ""); ASSERT_EQ(uri.port(), -1); + ASSERT_EQ(uri.username(), ""); + ASSERT_EQ(uri.password(), ""); ASSERT_OK(uri.Parse("http://[::1]:80")); ASSERT_EQ(uri.scheme(), "http"); ASSERT_EQ(uri.host(), "::1"); ASSERT_EQ(uri.port_text(), "80"); ASSERT_EQ(uri.port(), 80); + ASSERT_EQ(uri.username(), ""); + ASSERT_EQ(uri.password(), ""); +} + +TEST(Uri, ParseUserPass) { + Uri uri; + + ASSERT_OK(uri.Parse("http://someuser@localhost:80")); + ASSERT_EQ(uri.scheme(), "http"); + ASSERT_EQ(uri.host(), "localhost"); + ASSERT_EQ(uri.username(), "someuser"); + ASSERT_EQ(uri.password(), ""); + + ASSERT_OK(uri.Parse("http://someuser:@localhost:80")); + ASSERT_EQ(uri.scheme(), "http"); + ASSERT_EQ(uri.host(), "localhost"); + ASSERT_EQ(uri.username(), "someuser"); + ASSERT_EQ(uri.password(), ""); + + ASSERT_OK(uri.Parse("http://someuser:somepass@localhost:80")); + ASSERT_EQ(uri.scheme(), "http"); + ASSERT_EQ(uri.host(), "localhost"); + ASSERT_EQ(uri.username(), "someuser"); + ASSERT_EQ(uri.password(), "somepass"); + + ASSERT_OK(uri.Parse("http://someuser:somepass@localhost")); + ASSERT_EQ(uri.scheme(), "http"); + ASSERT_EQ(uri.host(), "localhost"); + ASSERT_EQ(uri.username(), "someuser"); + ASSERT_EQ(uri.password(), "somepass"); } TEST(Uri, ParseError) { diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py index 3a3b6c91f26..95fb22618f4 100644 --- a/python/pyarrow/tests/test_fs.py +++ b/python/pyarrow/tests/test_fs.py @@ -19,6 +19,8 @@ import gzip import pathlib +import urllib.parse + import pytest import pyarrow as pa @@ -608,3 +610,21 @@ def test_filesystem_from_uri(uri, expected_klass, expected_path): fs, path = FileSystem.from_uri(uri) assert isinstance(fs, expected_klass) assert path == expected_path + + +@pytest.mark.s3 +def test_filesystem_from_uri_s3(minio_server): + from pyarrow.fs import S3FileSystem + + address, access_key, secret_key = minio_server + uri = "s3://{}:{}@mybucket/foo/bar?scheme=http&endpoint_override={}" \ + .format(access_key, secret_key, urllib.parse.quote(address)) + + fs, path = FileSystem.from_uri(uri) + assert isinstance(fs, S3FileSystem) + assert path == "mybucket/foo/bar" + + fs.create_dir(path) + [st] = fs.get_target_stats([path]) + assert st.path == path + assert st.type == FileType.Directory From 3c73b760bec25806f38cbf0865870b285370d356 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 17 Feb 2020 14:54:59 +0100 Subject: [PATCH 2/3] Add implicit S3 initialization --- cpp/src/arrow/filesystem/filesystem.cc | 1 + cpp/src/arrow/filesystem/s3fs.cc | 27 +++++++++++++++++++++----- cpp/src/arrow/filesystem/s3fs.h | 5 +++++ 3 files changed, 28 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/filesystem/filesystem.cc b/cpp/src/arrow/filesystem/filesystem.cc index a4345557ea0..4e68eb96427 100644 --- a/cpp/src/arrow/filesystem/filesystem.cc +++ b/cpp/src/arrow/filesystem/filesystem.cc @@ -394,6 +394,7 @@ Result> FileSystemFromUriReal(const FileSystemUri& f } if (fsuri.scheme == "s3") { #ifdef ARROW_S3 + RETURN_NOT_OK(EnsureS3Initialized()); ARROW_ASSIGN_OR_RAISE(auto options, S3Options::FromUri(fsuri.uri, out_path)); ARROW_ASSIGN_OR_RAISE(auto s3fs, S3FileSystem::Make(options)); return s3fs; diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index b7d02ddb08c..1e0b9170492 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -95,12 +95,13 @@ const char* kS3DefaultRegion = "us-east-1"; static const char kSep = '/'; -static std::mutex aws_init_lock; -static Aws::SDKOptions aws_options; -static std::atomic aws_initialized(false); +namespace { -Status InitializeS3(const S3GlobalOptions& options) { - std::lock_guard lock(aws_init_lock); +std::mutex aws_init_lock; +Aws::SDKOptions aws_options; +std::atomic aws_initialized(false); + +Status DoInitializeS3(const S3GlobalOptions& options) { Aws::Utils::Logging::LogLevel aws_log_level; #define LOG_LEVEL_CASE(level_name) \ @@ -132,6 +133,13 @@ Status InitializeS3(const S3GlobalOptions& options) { return Status::OK(); } +} // namespace + +Status InitializeS3(const S3GlobalOptions& options) { + std::lock_guard lock(aws_init_lock); + return DoInitializeS3(options); +} + Status FinalizeS3() { std::lock_guard lock(aws_init_lock); Aws::ShutdownAPI(aws_options); @@ -139,6 +147,15 @@ Status FinalizeS3() { return Status::OK(); } +Status EnsureS3Initialized() { + std::lock_guard lock(aws_init_lock); + if (!aws_initialized.load()) { + S3GlobalOptions options{S3LogLevel::Fatal}; + return DoInitializeS3(options); + } + return Status::OK(); +} + void S3Options::ConfigureDefaultCredentials() { credentials_provider = std::make_shared(); diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h index c74f3261789..756f4ea2637 100644 --- a/cpp/src/arrow/filesystem/s3fs.h +++ b/cpp/src/arrow/filesystem/s3fs.h @@ -151,6 +151,11 @@ struct ARROW_EXPORT S3GlobalOptions { ARROW_EXPORT Status InitializeS3(const S3GlobalOptions& options); +/// Ensure the S3 APIs are initialized, but only if not already done. +/// If necessary, this will call InitializeS3() with some default options. +ARROW_EXPORT +Status EnsureS3Initialized(); + /// Shutdown the S3 APIs. ARROW_EXPORT Status FinalizeS3(); From 1c27624b4e4389f800551652519e5887016a6b08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Saint-Jacques?= Date: Mon, 17 Feb 2020 09:34:37 -0500 Subject: [PATCH 3/3] Add multiple fs support to dataset example --- cpp/examples/arrow/dataset-parquet-scan-example.cc | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/cpp/examples/arrow/dataset-parquet-scan-example.cc b/cpp/examples/arrow/dataset-parquet-scan-example.cc index 3c5decb7bd3..dc454b255b1 100644 --- a/cpp/examples/arrow/dataset-parquet-scan-example.cc +++ b/cpp/examples/arrow/dataset-parquet-scan-example.cc @@ -26,7 +26,7 @@ #include #include #include -#include +#include using arrow::field; using arrow::int16; @@ -65,6 +65,10 @@ struct Configuration { std::shared_ptr filter = ("total_amount"_ > 1000.0f).Copy(); } conf; +std::shared_ptr GetFileSystemFromUri(const std::string& uri, std::string* path) { + return fs::FileSystemFromUri(uri, path).ValueOrDie(); +} + std::shared_ptr GetDatasetFromPath(std::shared_ptr fs, std::shared_ptr format, std::string path) { @@ -113,7 +117,6 @@ std::shared_ptr GetTableFromScanner(std::shared_ptr scanner) } int main(int argc, char** argv) { - auto fs = std::make_shared(); auto format = std::make_shared(); if (argc != 2) { @@ -121,7 +124,10 @@ int main(int argc, char** argv) { return EXIT_SUCCESS; } - auto dataset = GetDatasetFromPath(fs, format, argv[1]); + std::string path; + auto fs = GetFileSystemFromUri(argv[1], &path); + + auto dataset = GetDatasetFromPath(fs, format, path); auto scanner = GetScannerFromDataset(dataset, conf.projected_columns, conf.filter, conf.use_threads);