Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions cpp/cmake_modules/BuildUtils.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ function(ADD_TEST_CASE REL_TEST_NAME)
EXTRA_INCLUDES
EXTRA_DEPENDENCIES
LABELS
EXTRA_LABELS
PREFIX)
cmake_parse_arguments(ARG
"${options}"
Expand Down Expand Up @@ -652,10 +653,15 @@ function(ADD_TEST_CASE REL_TEST_NAME)
add_dependencies(${TARGET} ${TEST_NAME})
endforeach()

set(LABELS)
list(APPEND LABELS "unittest")
if(ARG_LABELS)
set(ARG_LABELS "unittest;${ARG_LABELS}")
else()
set(ARG_LABELS unittest)
list(APPEND LABELS ${ARG_LABELS})
endif()
# EXTRA_LABELS don't create their own dependencies, they are only used
# to ease running certain test categories.
if(ARG_EXTRA_LABELS)
list(APPEND LABELS ${ARG_EXTRA_LABELS})
endif()

foreach(LABEL ${ARG_LABELS})
Expand All @@ -673,7 +679,7 @@ function(ADD_TEST_CASE REL_TEST_NAME)
add_dependencies(${LABEL_TEST_NAME} ${TEST_NAME})
endforeach()

set_property(TEST ${TEST_NAME} APPEND PROPERTY LABELS ${ARG_LABELS})
set_property(TEST ${TEST_NAME} APPEND PROPERTY LABELS ${LABELS})
endfunction()

#
Expand Down
73 changes: 44 additions & 29 deletions cpp/src/arrow/dataset/discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,17 @@ Result<std::shared_ptr<Dataset>> UnionDatasetFactory::Finish(FinishOptions optio
}

FileSystemDatasetFactory::FileSystemDatasetFactory(
std::vector<std::string> paths, std::shared_ptr<fs::FileSystem> filesystem,
std::vector<fs::FileInfo> files, std::shared_ptr<fs::FileSystem> filesystem,
std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options)
: paths_(std::move(paths)),
: files_(std::move(files)),
fs_(std::move(filesystem)),
format_(std::move(format)),
options_(std::move(options)) {}

Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(
std::shared_ptr<fs::FileSystem> filesystem, const std::vector<std::string>& paths,
std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options) {
std::vector<std::string> filtered_paths;
std::vector<fs::FileInfo> filtered_files;
for (const auto& path : paths) {
if (options.exclude_invalid_files) {
ARROW_ASSIGN_OR_RAISE(auto supported,
Expand All @@ -123,11 +123,32 @@ Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(
}
}

filtered_paths.push_back(path);
filtered_files.emplace_back(path);
}

return std::shared_ptr<DatasetFactory>(
new FileSystemDatasetFactory(std::move(filtered_paths), std::move(filesystem),
new FileSystemDatasetFactory(std::move(filtered_files), std::move(filesystem),
std::move(format), std::move(options)));
}

Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(
std::shared_ptr<fs::FileSystem> filesystem, const std::vector<fs::FileInfo>& files,
std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options) {
std::vector<fs::FileInfo> filtered_files;
for (const auto& info : files) {
if (options.exclude_invalid_files) {
ARROW_ASSIGN_OR_RAISE(auto supported,
format->IsSupported(FileSource(info, filesystem)));
if (!supported) {
continue;
}
}

filtered_files.emplace_back(info);
}

return std::shared_ptr<DatasetFactory>(
new FileSystemDatasetFactory(std::move(filtered_files), std::move(filesystem),
std::move(format), std::move(options)));
}

Expand Down Expand Up @@ -156,27 +177,21 @@ Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(

ARROW_ASSIGN_OR_RAISE(auto files, filesystem->GetFileInfo(selector));

std::vector<std::string> paths;
for (const auto& info : files) {
const auto& path = info.path();

if (!info.IsFile()) {
// TODO(fsaintjacques): push this filtering into Selector logic so we
// don't copy big vector around.
continue;
}

if (StartsWithAnyOf(path, options.selector_ignore_prefixes)) {
continue;
}

paths.push_back(path);
}
// Filter out anything that's not a file or that's explicitly ignored
auto files_end =
std::remove_if(files.begin(), files.end(), [&](const fs::FileInfo& info) {
if (!info.IsFile() ||
StartsWithAnyOf(info.path(), options.selector_ignore_prefixes)) {
return true;
}
return false;
});
files.erase(files_end, files.end());

// Sorting by path guarantees a stability sometimes needed by unit tests.
std::sort(paths.begin(), paths.end());
std::sort(files.begin(), files.end(), fs::FileInfo::ByPath());

return Make(std::move(filesystem), std::move(paths), std::move(format),
return Make(std::move(filesystem), std::move(files), std::move(format),
std::move(options));
}

Expand All @@ -186,15 +201,15 @@ Result<std::vector<std::shared_ptr<Schema>>> FileSystemDatasetFactory::InspectSc

const bool has_fragments_limit = options.fragments >= 0;
int fragments = options.fragments;
for (const auto& path : paths_) {
for (const auto& info : files_) {
if (has_fragments_limit && fragments-- == 0) break;
ARROW_ASSIGN_OR_RAISE(auto schema, format_->Inspect({path, fs_}));
ARROW_ASSIGN_OR_RAISE(auto schema, format_->Inspect({info, fs_}));
schemas.push_back(schema);
}

ARROW_ASSIGN_OR_RAISE(auto partition_schema,
options_.partitioning.GetOrInferSchema(
StripPrefixAndFilename(paths_, options_.partition_base_dir)));
StripPrefixAndFilename(files_, options_.partition_base_dir)));
schemas.push_back(partition_schema);

return schemas;
Expand Down Expand Up @@ -223,10 +238,10 @@ Result<std::shared_ptr<Dataset>> FileSystemDatasetFactory::Finish(FinishOptions
}

std::vector<std::shared_ptr<FileFragment>> fragments;
for (const auto& path : paths_) {
auto fixed_path = StripPrefixAndFilename(path, options_.partition_base_dir);
for (const auto& info : files_) {
auto fixed_path = StripPrefixAndFilename(info.path(), options_.partition_base_dir);
ARROW_ASSIGN_OR_RAISE(auto partition, partitioning->Parse(fixed_path));
ARROW_ASSIGN_OR_RAISE(auto fragment, format_->MakeFragment({path, fs_}, partition));
ARROW_ASSIGN_OR_RAISE(auto fragment, format_->MakeFragment({info, fs_}, partition));
fragments.push_back(fragment);
}

Expand Down
8 changes: 6 additions & 2 deletions cpp/src/arrow/dataset/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,18 @@ class ARROW_DS_EXPORT FileSystemDatasetFactory : public DatasetFactory {
Result<std::shared_ptr<Dataset>> Finish(FinishOptions options) override;

protected:
FileSystemDatasetFactory(std::vector<std::string> paths,
static Result<std::shared_ptr<DatasetFactory>> Make(
std::shared_ptr<fs::FileSystem> filesystem, const std::vector<fs::FileInfo>& files,
std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options);

FileSystemDatasetFactory(std::vector<fs::FileInfo> files,
std::shared_ptr<fs::FileSystem> filesystem,
std::shared_ptr<FileFormat> format,
FileSystemFactoryOptions options);

Result<std::shared_ptr<Schema>> PartitionSchema();

std::vector<std::string> paths_;
std::vector<fs::FileInfo> files_;
std::shared_ptr<fs::FileSystem> fs_;
std::shared_ptr<FileFormat> format_;
FileSystemFactoryOptions options_;
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/arrow/dataset/discovery_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,10 @@ TEST_F(FileSystemDatasetFactoryTest, MissingDirectories) {
factory_options_.partitioning = std::make_shared<HivePartitioning>(
schema({field("a", int32()), field("b", int32())}));

auto paths = std::vector<std::string>{partition_path, unpartition_path};

ASSERT_OK_AND_ASSIGN(
factory_, FileSystemDatasetFactory::Make(fs_, {partition_path, unpartition_path},
format_, factory_options_));
factory_, FileSystemDatasetFactory::Make(fs_, paths, format_, factory_options_));

InspectOptions options;
AssertInspect(schema({field("a", int32()), field("b", int32())}), options);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace dataset {

Result<std::shared_ptr<arrow::io::RandomAccessFile>> FileSource::Open() const {
if (filesystem_) {
return filesystem_->OpenInputFile(path_);
return filesystem_->OpenInputFile(file_info_);
}

if (buffer_) {
Expand Down
12 changes: 9 additions & 3 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,13 @@ class ARROW_DS_EXPORT FileSource {
public:
FileSource(std::string path, std::shared_ptr<fs::FileSystem> filesystem,
Compression::type compression = Compression::UNCOMPRESSED)
: path_(std::move(path)),
: file_info_(std::move(path)),
filesystem_(std::move(filesystem)),
compression_(compression) {}

FileSource(fs::FileInfo info, std::shared_ptr<fs::FileSystem> filesystem,
Compression::type compression = Compression::UNCOMPRESSED)
: file_info_(std::move(info)),
filesystem_(std::move(filesystem)),
compression_(compression) {}

Expand Down Expand Up @@ -87,7 +93,7 @@ class ARROW_DS_EXPORT FileSource {
const std::string& path() const {
static std::string buffer_path = "<Buffer>";
static std::string custom_open_path = "<Buffer>";
return filesystem_ ? path_ : buffer_ ? buffer_path : custom_open_path;
return filesystem_ ? file_info_.path() : buffer_ ? buffer_path : custom_open_path;
}

/// \brief Return the filesystem, if any. Otherwise returns nullptr
Expand All @@ -104,7 +110,7 @@ class ARROW_DS_EXPORT FileSource {
return Status::Invalid("Called Open() on an uninitialized FileSource");
}

std::string path_;
fs::FileInfo file_info_;
std::shared_ptr<fs::FileSystem> filesystem_;
std::shared_ptr<Buffer> buffer_;
CustomOpen custom_open_;
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,8 @@ static inline Result<std::string> FileFromRowGroup(
}
}

// TODO Is it possible to infer the file size and return a populated FileInfo?
// This could avoid some spurious HEAD requests on S3 (ARROW-8950)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's not easily accessible from FileMetaData or so then this probably warrants a custom metadata field when writing _metadata.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

path = fs::internal::JoinAbstractPath(std::vector<std::string>{base_path, path});
// Normalizing path is required for Windows.
return filesystem->NormalizePath(std::move(path));
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/arrow/dataset/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -608,12 +608,23 @@ std::string StripPrefixAndFilename(const std::string& path, const std::string& p
std::vector<std::string> StripPrefixAndFilename(const std::vector<std::string>& paths,
const std::string& prefix) {
std::vector<std::string> result;
result.reserve(paths.size());
for (const auto& path : paths) {
result.emplace_back(StripPrefixAndFilename(path, prefix));
}
return result;
}

std::vector<std::string> StripPrefixAndFilename(const std::vector<fs::FileInfo>& files,
const std::string& prefix) {
std::vector<std::string> result;
result.reserve(files.size());
for (const auto& info : files) {
result.emplace_back(StripPrefixAndFilename(info.path(), prefix));
}
return result;
}

Result<std::shared_ptr<Schema>> PartitioningOrFactory::GetOrInferSchema(
const std::vector<std::string>& paths) {
if (auto part = partitioning()) {
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/dataset/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ ARROW_DS_EXPORT std::string StripPrefixAndFilename(const std::string& path,
ARROW_DS_EXPORT std::vector<std::string> StripPrefixAndFilename(
const std::vector<std::string>& paths, const std::string& prefix);

/// \brief Vector version of StripPrefixAndFilename.
ARROW_DS_EXPORT std::vector<std::string> StripPrefixAndFilename(
const std::vector<fs::FileInfo>& files, const std::string& prefix);

/// \brief Either a Partitioning or a PartitioningFactory
class ARROW_DS_EXPORT PartitioningOrFactory {
public:
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ struct MakeFileSystemDatasetMixin {
}

ASSERT_OK_AND_ASSIGN(auto fragment,
format->MakeFragment({info.path(), fs_}, partitions[i]));
format->MakeFragment({info, fs_}, partitions[i]));
fragments.push_back(std::move(fragment));
}

Expand Down
8 changes: 5 additions & 3 deletions cpp/src/arrow/filesystem/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ add_arrow_test(filesystem-test
SOURCES
filesystem_test.cc
localfs_test.cc
path_forest_test.cc)
path_forest_test.cc
EXTRA_LABELS
filesystem)

if(ARROW_S3)
add_arrow_test(s3fs_test)
add_arrow_test(s3fs_test EXTRA_LABELS filesystem)

if(ARROW_BUILD_TESTS)
add_executable(arrow-s3fs-narrative-test s3fs_narrative_test.cc)
Expand All @@ -48,5 +50,5 @@ if(ARROW_S3)
endif()

if(ARROW_HDFS)
add_arrow_test(hdfs_test)
add_arrow_test(hdfs_test EXTRA_LABELS filesystem)
endif()
Loading