Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions cpp/src/arrow/filesystem/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "arrow/util/parallel.h"
#include "arrow/util/uri.h"
#include "arrow/util/windows_fixup.h"

Expand Down Expand Up @@ -439,6 +440,35 @@ Result<std::shared_ptr<io::OutputStream>> SlowFileSystem::OpenAppendStream(
return base_fs_->OpenAppendStream(path);
}

Status CopyFiles(const std::vector<FileLocator>& sources,
const std::vector<FileLocator>& destinations, int64_t chunk_size,
bool use_threads) {
if (sources.size() != destinations.size()) {
return Status::Invalid("Trying to copy ", sources.size(), " files into ",
destinations.size(), " paths.");
}

return ::arrow::internal::OptionalParallelFor(
use_threads, static_cast<int>(sources.size()), [&](int i) {
if (sources[i].filesystem->Equals(destinations[i].filesystem)) {
return sources[i].filesystem->CopyFile(sources[i].path, destinations[i].path);
}

ARROW_ASSIGN_OR_RAISE(auto source,
sources[i].filesystem->OpenInputStream(sources[i].path));

auto dest_dir = internal::GetAbstractPathParent(destinations[i].path).first;
if (!dest_dir.empty()) {
RETURN_NOT_OK(destinations[i].filesystem->CreateDir(dest_dir));
}

ARROW_ASSIGN_OR_RAISE(
auto destination,
destinations[i].filesystem->OpenOutputStream(destinations[i].path));
return internal::CopyStream(source, destination, chunk_size);
});
}

namespace {

Result<Uri> ParseFileSystemUri(const std::string& uri_string) {
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/arrow/filesystem/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ struct ARROW_EXPORT FileSelector {
FileSelector() {}
};

/// \brief FileSystem, path pair
struct ARROW_EXPORT FileLocator {
std::shared_ptr<FileSystem> filesystem;
std::string path;
};

/// \brief Abstract file system API
class ARROW_EXPORT FileSystem : public std::enable_shared_from_this<FileSystem> {
public:
Expand Down Expand Up @@ -387,6 +393,12 @@ Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath(

/// @}

/// \brief Copy files from one FileSystem to another
ARROW_EXPORT
Status CopyFiles(const std::vector<FileLocator>& sources,
const std::vector<FileLocator>& destinations,
int64_t chunk_size = 1024 * 1024, bool use_threads = true);

struct FileSystemGlobalOptions {
/// Path to a single PEM file holding all TLS CA certificates
///
Expand Down
22 changes: 22 additions & 0 deletions cpp/src/arrow/filesystem/filesystem_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,28 @@ TEST_F(TestSubTreeFileSystem, CopyFile) {
{"sub/tree/cd", time_, "data"}});
}

TEST_F(TestSubTreeFileSystem, CopyFiles) {
CreateFile("ab", "ab");
CreateFile("cd", "cd");
CreateFile("ef", "ef");

ASSERT_OK(fs_->CreateDir("sub/copy"));
auto dest_fs = std::make_shared<SubTreeFileSystem>("sub/copy", fs_);

ASSERT_OK(
CopyFiles({{subfs_, "ab"}, {subfs_, "cd"}, {subfs_, "ef"}},
{{dest_fs, "AB/ab"}, {dest_fs, "CD/CD/cd"}, {dest_fs, "EF/EF/EF/ef"}}));

CheckFiles({
{"sub/copy/AB/ab", time_, "ab"},
{"sub/copy/CD/CD/cd", time_, "cd"},
{"sub/copy/EF/EF/EF/ef", time_, "ef"},
{"sub/tree/ab", time_, "ab"},
{"sub/tree/cd", time_, "cd"},
{"sub/tree/ef", time_, "ef"},
});
}

TEST_F(TestSubTreeFileSystem, OpenInputStream) {
std::shared_ptr<io::InputStream> stream;
CreateFile("ab", "data");
Expand Down
5 changes: 1 addition & 4 deletions cpp/src/arrow/filesystem/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,7 @@ void CreateFile(FileSystem* fs, const std::string& path, const std::string& data
}

void SortInfos(std::vector<FileInfo>* infos) {
std::sort(infos->begin(), infos->end(),
[](const FileInfo& left, const FileInfo& right) -> bool {
return left.path() < right.path();
});
std::sort(infos->begin(), infos->end(), FileInfo::ByPath{});
}

void AssertFileInfo(const FileInfo& info, const std::string& path, FileType type) {
Expand Down
4 changes: 4 additions & 0 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions r/R/filesystem.R
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,21 @@ SubTreeFileSystem$create <- function(base_path, base_fs) {
shared_ptr(SubTreeFileSystem, xp)
}

#' Copy files between FileSystems
#'
#' @param src_fs The FileSystem from which files will be copied.
#' @param src_paths The paths of files to be copied.
#' @param dest_fs The FileSystem into which files will be copied.
#' @param dest_paths Where the copied files should be placed.
#' @param chunk_size The maximum size of block to read before flushing
#' to the destination file. A larger chunk_size will use more memory while
#' copying but may help accommodate high latency FileSystems.
copy_files <- function(src_fs, src_paths, dest_fs, dest_paths,
chunk_size = 1024L * 1024L) {
fs___CopyFiles(src_fs, src_paths, dest_fs, dest_paths,
chunk_size, option_use_threads())
}

clean_path_abs <- function(path) {
# Make sure we have a valid, absolute, forward-slashed path for passing to Arrow
normalizePath(path, winslash = "/", mustWork = FALSE)
Expand Down
22 changes: 22 additions & 0 deletions r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions r/src/filesystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,22 @@ cpp11::writable::list fs___FileSystemFromUri(const std::string& path) {
return cpp11::writable::list({"fs"_nm = file_system, "path"_nm = out_path});
}

// [[arrow::export]]
void fs___CopyFiles(const std::shared_ptr<fs::FileSystem>& src_fs,
const std::vector<std::string>& src_paths,
const std::shared_ptr<fs::FileSystem>& dest_fs,
const std::vector<std::string>& dest_paths, int64_t chunk_size,
bool use_threads) {
std::vector<fs::FileLocator> sources(src_paths.size()), destinations(dest_paths.size());

for (size_t i = 0; i < src_paths.size(); ++i) {
sources[i] = {src_fs, src_paths[i]};
destinations[i] = {dest_fs, dest_paths[i]};
}

StopIfNotOk(fs::CopyFiles(sources, destinations, chunk_size, use_threads));
}

#endif

#if defined(ARROW_R_WITH_S3)
Expand Down