Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,10 @@ if(ARROW_DATASET)
set(ARROW_FILESYSTEM ON)
endif()

if(ARROW_FILESYSTEM)
set(ARROW_WITH_URIPARSER ON)
endif()

if(MSVC)
# ORC doesn't build on windows
set(ARROW_ORC OFF)
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,17 @@ endif()
if(ARROW_FILESYSTEM)
add_subdirectory(filesystem)

if(ARROW_HDFS)
add_definitions(-DARROW_HDFS)
endif()

if(ARROW_S3)
add_definitions(-DARROW_S3)
endif()

list(APPEND ARROW_SRCS
filesystem/filesystem.cc
filesystem/filesystem_utils.cc
filesystem/localfs.cc
filesystem/mockfs.cc
filesystem/path_tree.cc
Expand All @@ -277,6 +286,10 @@ if(ARROW_FILESYSTEM)
list(APPEND ARROW_SRCS filesystem/s3fs.cc)
endif()

if(ARROW_HDFS)
list(APPEND ARROW_SRCS filesystem/hdfs.cc)
endif()

list(APPEND ARROW_TESTING_SRCS filesystem/test_util.cc)
endif()

Expand Down
9 changes: 9 additions & 0 deletions cpp/src/arrow/filesystem/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
# Headers: top level
arrow_install_all_headers("arrow/filesystem")

if(ARROW_S3)
add_definitions(-DARROW_S3)
endif()

add_arrow_test(filesystem_test)
add_arrow_test(filesystem_utils_test)
add_arrow_test(localfs_test)
add_arrow_test(path_tree_test)

Expand All @@ -32,3 +37,7 @@ if(ARROW_S3)
add_dependencies(arrow-tests arrow-s3fs-narrative-test)
endif()
endif()

if(ARROW_HDFS)
add_arrow_test(hdfs_test)
endif()
10 changes: 6 additions & 4 deletions cpp/src/arrow/filesystem/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
#ifndef ARROW_FILESYSTEM_API_H
#define ARROW_FILESYSTEM_API_H

#include "arrow/filesystem/filesystem.h" // IWYU pragma: export
#include "arrow/filesystem/localfs.h" // IWYU pragma: export
#include "arrow/filesystem/mockfs.h" // IWYU pragma: export
#include "arrow/filesystem/s3fs.h" // IWYU pragma: export
#include "arrow/filesystem/filesystem.h" // IWYU pragma: export
#include "arrow/filesystem/filesystem_utils.h" // IWYU pragma: export
#include "arrow/filesystem/hdfs.h" // IWYU pragma: export
#include "arrow/filesystem/localfs.h" // IWYU pragma: export
#include "arrow/filesystem/mockfs.h" // IWYU pragma: export
#include "arrow/filesystem/s3fs.h" // IWYU pragma: export

#endif // ARROW_FILESYSTEM_API_H
200 changes: 200 additions & 0 deletions cpp/src/arrow/filesystem/filesystem_utils.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

#include "arrow/filesystem/filesystem_utils.h"
#ifdef ARROW_HDFS
#include "arrow/filesystem/hdfs.h"
#endif
#include "arrow/filesystem/localfs.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/result.h"
#include "arrow/util/uri.h"

namespace arrow {

namespace fs {

enum class FileSystemType { HDFS, LOCAL, S3, UNKNOWN };

namespace {

class PathInfo {
public:
PathInfo() {}
~PathInfo() {}

static Status Make(const std::string& full_path, std::shared_ptr<PathInfo>* path_info) {
*path_info = std::make_shared<PathInfo>();
RETURN_NOT_OK((*path_info)->Init(full_path));
return Status::OK();
}

Status Init(const std::string& full_path) {
RETURN_NOT_OK(ParseURI(full_path));
return Status::OK();
}

FileSystemType GetFileSystemType() { return fs_type_; }

std::string GetHostName() {
auto search = options_.find("host_name");
if (search == options_.end()) {
return "";
}
return search->second;
}

int GetHostPort() {
auto search = options_.find("host_port");
if (search == options_.end()) {
return -1;
}
std::string port_text = search->second;
return std::stoi(port_text);
}

std::string GetUser() {
auto search = options_.find("user");
if (search != options_.end()) {
return search->second;
}
return "";
}

bool GetIfUseHdfs3() {
auto search = options_.find("use_hdfs3");
if (search != options_.end()) {
if (search->second.compare("1") == 0) {
return true;
} else {
return false;
}
} else {
return false;
}
}

int GetRepsNum() {
auto search = options_.find("replication");
if (search != options_.end()) {
if (search->second.empty()) {
return 3;
} else {
return std::stoi(search->second);
}
} else {
return 3;
}
}

private:
FileSystemType GetFileSystemTypeFromString(const std::string& s) {
if (s == "hdfs") {
return FileSystemType::HDFS;
}
if (s == "http") {
return FileSystemType::S3;
}
if (s == "https") {
return FileSystemType::S3;
}
if (s == "file") {
return FileSystemType::LOCAL;
}
if (s.empty()) {
return FileSystemType::LOCAL;
}
return FileSystemType::UNKNOWN;
}

Status ParseURI(const std::string& s) {
arrow::internal::Uri uri;
RETURN_NOT_OK(uri.Parse(s));
fs_type_ = GetFileSystemTypeFromString(uri.scheme());
switch (fs_type_) {
case FileSystemType::HDFS:
options_.emplace("host_name", uri.host());
Copy link
Member

@pitrou pitrou Nov 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all the HDFS specifics (options, etc.) should be moved to hfds.h / hdfs.cc.
For example you could have:

class ARROW_EXPORT HadoopFileSystem : public FileSystem {
 public:
  // ...

  /// Create a HdfsFileSystem instance from the given options.
  static Status Make(const HdfsOptions& options, std::shared_ptr<HadoopFileSystem>* out);
  /// Create a HdfsFileSystem instance from the given URI.
  static Status Make(const Uri& uri, std::shared_ptr<HadoopFileSystem>* out);
};

(and similarly LocalFileSystem::Make)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did as that way is because S3Options is configured outside S3FileSystem, so I am thinking to do the same thing here.

Copy link
Member

@pitrou pitrou Nov 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

S3Options is configured in s3fs.h / s3fs.cc. Similarly, HDFS options should be configured in hdfs.h / hdfs.cc.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little confused, if S3Options is configured internally in s3fs.cc why s3fs.h makes both S3Options and Make function which uses S3Options as input ARROW_EXPORT? Doesn’t that means users should create and configure S3Options instance firstly then pass to Make function as parameter? Can you give me a more specific example of how you would like to see ?

options_.emplace("host_port", uri.port_text());
RETURN_NOT_OK(ParseOptions(&uri));
break;
case FileSystemType::LOCAL:
RETURN_NOT_OK(ParseOptions(&uri));
break;
case FileSystemType::S3:
return Status::NotImplemented("S3 is not supported yet.");
default:
break;
}
return Status::OK();
}

Status ParseOptions(arrow::internal::Uri* uri) {
ARROW_ASSIGN_OR_RAISE(auto options, uri->query_items());
for (auto option : options) {
options_.emplace(option.first, option.second);
}
return Status::OK();
}

FileSystemType fs_type_;
std::unordered_map<std::string, std::string> options_;
};
} // namespace

Status MakeFileSystem(const std::string& full_path, std::shared_ptr<FileSystem>* fs) {
std::shared_ptr<PathInfo> path_info;
RETURN_NOT_OK(PathInfo::Make(full_path, &path_info));
FileSystemType fs_type = path_info->GetFileSystemType();

switch (fs_type) {
#ifdef ARROW_HDFS
case FileSystemType::HDFS: {
// Init Hdfs FileSystem
HdfsOptions hdfs_options;
hdfs_options.ConfigureEndPoint(path_info->GetHostName(), path_info->GetHostPort());
hdfs_options.ConfigureHdfsDriver(path_info->GetIfUseHdfs3());
hdfs_options.ConfigureHdfsReplication(path_info->GetRepsNum());
hdfs_options.ConfigureHdfsUser(path_info->GetUser());

std::shared_ptr<HadoopFileSystem> hdfs;
RETURN_NOT_OK(HadoopFileSystem::Make(hdfs_options, &hdfs));

*fs = hdfs;
} break;
#endif
case FileSystemType::LOCAL: {
auto local_fs = std::make_shared<LocalFileSystem>();
*fs = local_fs;
} break;
#ifdef ARROW_S3
case FileSystemType::S3:
return Status::NotImplemented("S3 is not supported yet.");
#endif
default:
return Status::NotImplemented("This type of filesystem is not supported yet.");
}

return Status::OK();
}
} // namespace fs
} // namespace arrow
36 changes: 36 additions & 0 deletions cpp/src/arrow/filesystem/filesystem_utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <memory>
#include <string>
#include "arrow/filesystem/filesystem.h"

namespace arrow {

namespace fs {

/// \brief Creates a new FileSystem by path
///
/// \param[in] full_path a URI-based path, ex: hdfs:///some/path?replication=3
/// \param[out] fs FileSystemFactory instance.
/// \return Status
ARROW_EXPORT Status MakeFileSystem(const std::string& full_path,
std::shared_ptr<FileSystem>* fs);
} // namespace fs
} // namespace arrow
Loading