From 42bc1c319de2355900f200c6969439bd5c2b73d0 Mon Sep 17 00:00:00 2001 From: Mauricio Vargas Date: Fri, 2 Apr 2021 17:20:32 -0300 Subject: [PATCH 01/10] new PR --- r/R/feather.R | 7 +++++-- r/R/parquet.R | 10 ++++++++-- r/R/util.R | 13 +++++++++++++ r/tests/testthat/test-feather.R | 14 ++++++++++++++ 4 files changed, 40 insertions(+), 4 deletions(-) diff --git a/r/R/feather.R b/r/R/feather.R index 637ce23234a..e72f2eab504 100644 --- a/r/R/feather.R +++ b/r/R/feather.R @@ -147,14 +147,17 @@ read_feather <- function(file, col_select = NULL, as_data_frame = TRUE, ...) { file <- make_readable_file(file) on.exit(file$close()) } - reader <- FeatherReader$create(file, ...) + reader <- FeatherReader$create(file) col_select <- enquo(col_select) columns <- if (!quo_is_null(col_select)) { vars_select(names(reader), !!col_select) } - out <- reader$Read(columns) + out <- tryCatch( + reader$Read(columns), + error = function(e) { read_compressed_error(e) } + ) if (isTRUE(as_data_frame)) { out <- as.data.frame(out) diff --git a/r/R/parquet.R b/r/R/parquet.R index 45751b16170..689324113c2 100644 --- a/r/R/parquet.R +++ b/r/R/parquet.R @@ -52,10 +52,16 @@ read_parquet <- function(file, schema <- reader$GetSchema() names <- names(schema) indices <- match(vars_select(names, !!col_select), names) - 1L - tab <- reader$ReadTable(indices) + tab <- tryCatch( + reader$ReadTable(indices), + error = function(e) { read_compressed_error(e) } + ) } else { # read all columns - tab <- reader$ReadTable() + tab <- tryCatch( + reader$ReadTable(), + error = function(e) { read_compressed_error(e) } + ) } if (as_data_frame) { diff --git a/r/R/util.R b/r/R/util.R index 4680381e909..d5a3bc8ab80 100644 --- a/r/R/util.R +++ b/r/R/util.R @@ -86,3 +86,16 @@ all_names <- function(expr) { is_constant <- function(expr) { length(all_vars(expr)) == 0 } + +read_compressed_error <- function(e) { + e <- as.character(e) + compression <- sub(".*Support for codec '(.*)'.*", "\\1", e) + msg <- c( + sprintf("Unsupported compressed format %s", compression), + "\nTry setting the environment variable LIBARROW_MINIMAL=false and reinstalling", + "\nfor a more complete installation ", + sprintf("(including %s) or setting", compression), + sprintf("\nARROW_WITH_%s=ON and reinstalling to enable support for this codec.", toupper(compression)) + ) + stop(msg, call. = FALSE) +} diff --git a/r/tests/testthat/test-feather.R b/r/tests/testthat/test-feather.R index 52325c7f410..30a92f5e302 100644 --- a/r/tests/testthat/test-feather.R +++ b/r/tests/testthat/test-feather.R @@ -196,3 +196,17 @@ test_that("Character vectors > 2GB can write to feather", { }) unlink(feather_file) + +ft_file <- test_path("golden-files/data-arrow_2.0.0_lz4.feather") + +test_that("Error messages are shown when the compression algorithm lz4 is not found", { + if (codec_is_available("lz4")) { + d <- read_feather(ft_file) + expect_is(d, "data.frame") + } else { + expect_error( + read_feather(ft_file), + "Unsupported compressed format" + ) + } +}) From 9c0210018d13e7a855fef500776caeead30645ce Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 2 Apr 2021 17:28:54 -0400 Subject: [PATCH 02/10] ARROW-12161: [C++] Async streaming CSV reader deadlocking when being run synchronously from datasets Calling the async streaming CSV reader from the synchronous Scanner::Scan was causing a form of nested parallelism and causing nested deadlocks. This commit brings over some of the work in ARROW-7001 and allows the CSV scan task to be called in an async fashion. In addition, an async path is put in the scanner and dataset write so that all internal uses of ScanTask()->Execute happen in an async-friendly way. External uses of ScanTask()->Execute should already be outside the CPU thread pool and should not cause deadlock. Some of this PR will be obsoleted by ARROW-7001 but the work in file_csv and the test cases should remain fairly intact. Closes #9868 from westonpace/bugfix/arrow-12161 Lead-authored-by: Weston Pace Co-authored-by: Antoine Pitrou Signed-off-by: David Li --- cpp/src/arrow/dataset/file_base.cc | 132 ++++++++++++-------- cpp/src/arrow/dataset/file_base.h | 2 +- cpp/src/arrow/dataset/file_csv.cc | 66 ++++++---- cpp/src/arrow/dataset/file_test.cc | 29 +++++ cpp/src/arrow/dataset/scanner.cc | 32 +++-- cpp/src/arrow/dataset/scanner.h | 4 + cpp/src/arrow/dataset/scanner_internal.h | 97 ++++++++++++++- cpp/src/arrow/dataset/scanner_test.cc | 15 +++ cpp/src/arrow/dataset/test_util.h | 151 +++++++++++++++++++++++ cpp/src/arrow/util/async_generator.h | 34 +++++ r/tests/testthat/test-dataset.R | 1 + 11 files changed, 480 insertions(+), 83 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 86f14de46fd..8437c75ae1c 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -360,6 +360,64 @@ class WriteQueue { std::shared_ptr schema_; }; +struct WriteState { + explicit WriteState(FileSystemDatasetWriteOptions write_options) + : write_options(std::move(write_options)) {} + + FileSystemDatasetWriteOptions write_options; + util::Mutex mutex; + std::unordered_map> queues; +}; + +Status WriteNextBatch(WriteState& state, const std::shared_ptr& scan_task, + std::shared_ptr batch) { + ARROW_ASSIGN_OR_RAISE(auto groups, state.write_options.partitioning->Partition(batch)); + batch.reset(); // drop to hopefully conserve memory + + if (groups.batches.size() > static_cast(state.write_options.max_partitions)) { + return Status::Invalid("Fragment would be written into ", groups.batches.size(), + " partitions. This exceeds the maximum of ", + state.write_options.max_partitions); + } + + std::unordered_set need_flushed; + for (size_t i = 0; i < groups.batches.size(); ++i) { + auto partition_expression = and_(std::move(groups.expressions[i]), + scan_task->fragment()->partition_expression()); + auto batch = std::move(groups.batches[i]); + + ARROW_ASSIGN_OR_RAISE(auto part, + state.write_options.partitioning->Format(partition_expression)); + + WriteQueue* queue; + { + // lookup the queue to which batch should be appended + auto queues_lock = state.mutex.Lock(); + + queue = internal::GetOrInsertGenerated( + &state.queues, std::move(part), + [&](const std::string& emplaced_part) { + // lookup in `queues` also failed, + // generate a new WriteQueue + size_t queue_index = state.queues.size() - 1; + + return internal::make_unique(emplaced_part, queue_index, + batch->schema()); + }) + ->second.get(); + } + + queue->Push(std::move(batch)); + need_flushed.insert(queue); + } + + // flush all touched WriteQueues + for (auto queue : need_flushed) { + RETURN_NOT_OK(queue->Flush(state.write_options)); + } + return Status::OK(); +} + } // namespace Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_options, @@ -382,6 +440,7 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio ARROW_ASSIGN_OR_RAISE(auto fragment_it, scanner->GetFragments()); ARROW_ASSIGN_OR_RAISE(FragmentVector fragments, fragment_it.ToVector()); ScanTaskVector scan_tasks; + std::vector> scan_futs; for (const auto& fragment : fragments) { auto options = std::make_shared(*scanner->options()); @@ -399,68 +458,35 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio // to a WriteQueue which flushes batches into that partition's output file. In principle // any thread could produce a batch for any partition, so each task alternates between // pushing batches and flushing them to disk. - util::Mutex queues_mutex; - std::unordered_map> queues; + WriteState state(write_options); for (const auto& scan_task : scan_tasks) { - task_group->Append([&, scan_task] { - ARROW_ASSIGN_OR_RAISE(auto batches, scan_task->Execute()); - - for (auto maybe_batch : batches) { - ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch); - ARROW_ASSIGN_OR_RAISE(auto groups, write_options.partitioning->Partition(batch)); - batch.reset(); // drop to hopefully conserve memory - - if (groups.batches.size() > static_cast(write_options.max_partitions)) { - return Status::Invalid("Fragment would be written into ", groups.batches.size(), - " partitions. This exceeds the maximum of ", - write_options.max_partitions); - } - - std::unordered_set need_flushed; - for (size_t i = 0; i < groups.batches.size(); ++i) { - auto partition_expression = and_(std::move(groups.expressions[i]), - scan_task->fragment()->partition_expression()); - auto batch = std::move(groups.batches[i]); - - ARROW_ASSIGN_OR_RAISE(auto part, - write_options.partitioning->Format(partition_expression)); - - WriteQueue* queue; - { - // lookup the queue to which batch should be appended - auto queues_lock = queues_mutex.Lock(); - - queue = internal::GetOrInsertGenerated( - &queues, std::move(part), - [&](const std::string& emplaced_part) { - // lookup in `queues` also failed, - // generate a new WriteQueue - size_t queue_index = queues.size() - 1; - - return internal::make_unique( - emplaced_part, queue_index, batch->schema()); - }) - ->second.get(); - } - - queue->Push(std::move(batch)); - need_flushed.insert(queue); - } + if (scan_task->supports_async()) { + ARROW_ASSIGN_OR_RAISE(auto batches_gen, scan_task->ExecuteAsync()); + std::function batch)> batch_visitor = + [&, scan_task](std::shared_ptr batch) { + return WriteNextBatch(state, scan_task, std::move(batch)); + }; + scan_futs.push_back(VisitAsyncGenerator(batches_gen, batch_visitor)); + } else { + task_group->Append([&, scan_task] { + ARROW_ASSIGN_OR_RAISE(auto batches, scan_task->Execute()); - // flush all touched WriteQueues - for (auto queue : need_flushed) { - RETURN_NOT_OK(queue->Flush(write_options)); + for (auto maybe_batch : batches) { + ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch); + RETURN_NOT_OK(WriteNextBatch(state, scan_task, std::move(batch))); } - } - return Status::OK(); - }); + return Status::OK(); + }); + } } RETURN_NOT_OK(task_group->Finish()); + auto scan_futs_all_done = AllComplete(scan_futs); + RETURN_NOT_OK(scan_futs_all_done.status()); task_group = scanner->options()->TaskGroup(); - for (const auto& part_queue : queues) { + for (const auto& part_queue : state.queues) { task_group->Append([&] { return part_queue.second->writer()->Finish(); }); } return task_group->Finish(); diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 9c613c00aff..e4e7167aa75 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -285,7 +285,7 @@ class ARROW_DS_EXPORT FileWriter { Status Write(RecordBatchReader* batches); - Status Finish(); + virtual Status Finish(); const std::shared_ptr& format() const { return options_->format(); } const std::shared_ptr& schema() const { return schema_; } diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index e736d06753b..b55c23dfdef 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -34,6 +34,7 @@ #include "arrow/io/compressed.h" #include "arrow/result.h" #include "arrow/type.h" +#include "arrow/util/async_generator.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" @@ -42,6 +43,7 @@ namespace dataset { using internal::checked_cast; using internal::checked_pointer_cast; +using RecordBatchGenerator = AsyncGenerator>; Result> GetColumnNames( const csv::ParseOptions& parse_options, util::string_view first_block, @@ -110,35 +112,47 @@ static inline Result GetReadOptions( return read_options; } -static inline Result> OpenReader( +static inline Future> OpenReaderAsync( const FileSource& source, const CsvFileFormat& format, const std::shared_ptr& scan_options = nullptr, MemoryPool* pool = default_memory_pool()) { ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options)); - util::string_view first_block; ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed()); ARROW_ASSIGN_OR_RAISE( input, io::BufferedInputStream::Create(reader_options.block_size, default_memory_pool(), std::move(input))); - ARROW_ASSIGN_OR_RAISE(first_block, input->Peek(reader_options.block_size)); - - const auto& parse_options = format.parse_options; - auto convert_options = csv::ConvertOptions::Defaults(); - if (scan_options != nullptr) { - ARROW_ASSIGN_OR_RAISE(convert_options, - GetConvertOptions(format, scan_options, first_block, pool)); - } - auto maybe_reader = - csv::StreamingReader::Make(io::IOContext(pool), std::move(input), reader_options, - parse_options, convert_options); - if (!maybe_reader.ok()) { - return maybe_reader.status().WithMessage("Could not open CSV input source '", - source.path(), "': ", maybe_reader.status()); - } + auto peek_fut = DeferNotOk(input->io_context().executor()->Submit( + [input, reader_options] { return input->Peek(reader_options.block_size); })); + + return peek_fut.Then([=](const util::string_view& first_block) + -> Future> { + const auto& parse_options = format.parse_options; + auto convert_options = csv::ConvertOptions::Defaults(); + if (scan_options != nullptr) { + ARROW_ASSIGN_OR_RAISE(convert_options, + GetConvertOptions(format, scan_options, first_block, pool)); + } + + return csv::StreamingReader::MakeAsync(io::default_io_context(), std::move(input), + reader_options, parse_options, convert_options) + .Then( + [](const std::shared_ptr& maybe_reader) + -> Result> { return maybe_reader; }, + [source](const Status& err) -> Result> { + return err.WithMessage("Could not open CSV input source '", source.path(), + "': ", err); + }); + }); +} - return std::move(maybe_reader).ValueOrDie(); +static inline Result> OpenReader( + const FileSource& source, const CsvFileFormat& format, + const std::shared_ptr& scan_options = nullptr, + MemoryPool* pool = default_memory_pool()) { + auto open_reader_fut = OpenReaderAsync(source, format, scan_options, pool); + return open_reader_fut.result(); } /// \brief A ScanTask backed by an Csv file. @@ -152,9 +166,19 @@ class CsvScanTask : public ScanTask { source_(fragment->source()) {} Result Execute() override { - ARROW_ASSIGN_OR_RAISE(auto reader, - OpenReader(source_, *format_, options(), options()->pool)); - return IteratorFromReader(std::move(reader)); + ARROW_ASSIGN_OR_RAISE(auto gen, ExecuteAsync()); + return MakeGeneratorIterator(std::move(gen)); + } + + bool supports_async() const override { return true; } + + Result ExecuteAsync() override { + auto reader_fut = OpenReaderAsync(source_, *format_, options(), options()->pool); + auto generator_fut = reader_fut.Then( + [](const std::shared_ptr& reader) -> RecordBatchGenerator { + return [reader]() { return reader->ReadNextAsync(); }; + }); + return MakeFromFuture(generator_fut); } private: diff --git a/cpp/src/arrow/dataset/file_test.cc b/cpp/src/arrow/dataset/file_test.cc index c7ce5154d0a..fdbb4512758 100644 --- a/cpp/src/arrow/dataset/file_test.cc +++ b/cpp/src/arrow/dataset/file_test.cc @@ -249,6 +249,35 @@ TEST_F(TestFileSystemDataset, FragmentPartitions) { }); } +class TestFilesystemDatasetNestedParallelism : public NestedParallelismMixin {}; + +TEST_F(TestFilesystemDatasetNestedParallelism, Write) { + constexpr int NUM_BATCHES = 32; + RecordBatchVector batches; + for (int i = 0; i < NUM_BATCHES; i++) { + batches.push_back(ConstantArrayGenerator::Zeroes(/*size=*/1, schema_)); + } + auto dataset = std::make_shared(schema_, std::move(batches)); + ScannerBuilder builder{dataset, options_}; + ASSERT_OK_AND_ASSIGN(auto scanner, builder.Finish()); + + ASSERT_OK_AND_ASSIGN(auto output_dir, TemporaryDir::Make("nested-parallel-dataset")); + + auto format = std::make_shared(); + auto rows_written = std::make_shared>(0); + std::shared_ptr file_write_options = + std::make_shared(rows_written); + FileSystemDatasetWriteOptions dataset_write_options; + dataset_write_options.file_write_options = file_write_options; + dataset_write_options.basename_template = "{i}"; + dataset_write_options.partitioning = std::make_shared(schema({})); + dataset_write_options.base_dir = output_dir->path().ToString(); + dataset_write_options.filesystem = std::make_shared(); + + ASSERT_OK(FileSystemDataset::Write(dataset_write_options, scanner)); + ASSERT_EQ(NUM_BATCHES, rows_written->load()); +} + // Tests of subtree pruning struct TestPathTree { diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index dee96ceb836..2258a10d141 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -61,6 +61,12 @@ Result InMemoryScanTask::Execute() { return MakeVectorIterator(record_batches_); } +Result ScanTask::ExecuteAsync() { + return Status::NotImplemented("Async is not implemented for this scan task yet"); +} + +bool ScanTask::supports_async() const { return false; } + Result Scanner::GetFragments() { if (fragment_ != nullptr) { return MakeVectorIterator(FragmentVector{fragment_}); @@ -203,19 +209,31 @@ Result> Scanner::ToTable() { auto state = std::make_shared(); size_t scan_task_id = 0; + std::vector> scan_futures; for (auto maybe_scan_task : scan_task_it) { ARROW_ASSIGN_OR_RAISE(auto scan_task, maybe_scan_task); auto id = scan_task_id++; - task_group->Append([state, id, scan_task] { - ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute()); - ARROW_ASSIGN_OR_RAISE(auto local, batch_it.ToVector()); - state->Emplace(std::move(local), id); - return Status::OK(); - }); + if (scan_task->supports_async()) { + ARROW_ASSIGN_OR_RAISE(auto scan_gen, scan_task->ExecuteAsync()); + auto scan_fut = CollectAsyncGenerator(std::move(scan_gen)) + .Then([state, id](const RecordBatchVector& rbs) { + state->Emplace(rbs, id); + }); + scan_futures.push_back(std::move(scan_fut)); + } else { + task_group->Append([state, id, scan_task] { + ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute()); + ARROW_ASSIGN_OR_RAISE(auto local, batch_it.ToVector()); + state->Emplace(std::move(local), id); + return Status::OK(); + }); + } } + // Wait for all async tasks to complete, or the first error + RETURN_NOT_OK(AllComplete(scan_futures).status()); - // Wait for all tasks to complete, or the first error. + // Wait for all sync tasks to complete, or the first error. RETURN_NOT_OK(task_group->Finish()); return Table::FromRecordBatches(scan_options_->projected_schema, diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index df5f7954afe..c3cce00d8c5 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -31,9 +31,11 @@ #include "arrow/dataset/visibility.h" #include "arrow/memory_pool.h" #include "arrow/type_fwd.h" +#include "arrow/util/async_generator.h" #include "arrow/util/type_fwd.h" namespace arrow { +using RecordBatchGenerator = AsyncGenerator>; namespace dataset { constexpr int64_t kDefaultBatchSize = 1 << 20; @@ -101,6 +103,8 @@ class ARROW_DS_EXPORT ScanTask { /// resulting from the Scan. Execution semantics are encapsulated in the /// particular ScanTask implementation virtual Result Execute() = 0; + virtual Result ExecuteAsync(); + virtual bool supports_async() const; virtual ~ScanTask() = default; diff --git a/cpp/src/arrow/dataset/scanner_internal.h b/cpp/src/arrow/dataset/scanner_internal.h index e666d251cd1..3101be477fd 100644 --- a/cpp/src/arrow/dataset/scanner_internal.h +++ b/cpp/src/arrow/dataset/scanner_internal.h @@ -36,6 +36,8 @@ using internal::checked_cast; namespace dataset { +// TODO(ARROW-7001) This synchronous version is no longer needed, can use async version +// regardless of sync/async of source inline RecordBatchIterator FilterRecordBatch(RecordBatchIterator it, Expression filter, MemoryPool* pool) { return MakeMaybeMapIterator( @@ -60,6 +62,38 @@ inline RecordBatchIterator FilterRecordBatch(RecordBatchIterator it, Expression std::move(it)); } +inline Result> DoFilterRecordBatch( + const Expression& filter, MemoryPool* pool, const std::shared_ptr& in) { + compute::ExecContext exec_context{pool}; + ARROW_ASSIGN_OR_RAISE(Datum mask, + ExecuteScalarExpression(filter, Datum(in), &exec_context)); + + if (mask.is_scalar()) { + const auto& mask_scalar = mask.scalar_as(); + if (mask_scalar.is_valid && mask_scalar.value) { + return std::move(in); + } + return in->Slice(0, 0); + } + + ARROW_ASSIGN_OR_RAISE( + Datum filtered, + compute::Filter(in, mask, compute::FilterOptions::Defaults(), &exec_context)); + return filtered.record_batch(); +} + +inline RecordBatchGenerator FilterRecordBatch(RecordBatchGenerator rbs, Expression filter, + MemoryPool* pool) { + // TODO(ARROW-7001) This changes to auto + std::function>(const std::shared_ptr&)> + mapper = [=](const std::shared_ptr& in) { + return DoFilterRecordBatch(filter, pool, in); + }; + return MakeMappedGenerator(std::move(rbs), mapper); +} + +// TODO(ARROW-7001) This synchronous version is no longer needed, all branches use async +// version inline RecordBatchIterator ProjectRecordBatch(RecordBatchIterator it, Expression projection, MemoryPool* pool) { return MakeMaybeMapIterator( @@ -83,6 +117,35 @@ inline RecordBatchIterator ProjectRecordBatch(RecordBatchIterator it, std::move(it)); } +inline Result> DoProjectRecordBatch( + const Expression& projection, MemoryPool* pool, + const std::shared_ptr& in) { + compute::ExecContext exec_context{pool}; + ARROW_ASSIGN_OR_RAISE(Datum projected, + ExecuteScalarExpression(projection, Datum(in), &exec_context)); + DCHECK_EQ(projected.type()->id(), Type::STRUCT); + if (projected.shape() == ValueDescr::SCALAR) { + // Only virtual columns are projected. Broadcast to an array + ARROW_ASSIGN_OR_RAISE(projected, + MakeArrayFromScalar(*projected.scalar(), in->num_rows(), pool)); + } + + ARROW_ASSIGN_OR_RAISE(auto out, + RecordBatch::FromStructArray(projected.array_as())); + + return out->ReplaceSchemaMetadata(in->schema()->metadata()); +} + +inline RecordBatchGenerator ProjectRecordBatch(RecordBatchGenerator rbs, + Expression projection, MemoryPool* pool) { + // TODO(ARROW-7001) This changes to auto + std::function>(const std::shared_ptr&)> + mapper = [=](const std::shared_ptr& in) { + return DoProjectRecordBatch(projection, pool, in); + }; + return MakeMappedGenerator(std::move(rbs), mapper); +} + class FilterAndProjectScanTask : public ScanTask { public: explicit FilterAndProjectScanTask(std::shared_ptr task, Expression partition) @@ -90,7 +153,9 @@ class FilterAndProjectScanTask : public ScanTask { task_(std::move(task)), partition_(std::move(partition)) {} - Result Execute() override { + bool supports_async() const override { return task_->supports_async(); } + + Result ExecuteSync() { ARROW_ASSIGN_OR_RAISE(auto it, task_->Execute()); ARROW_ASSIGN_OR_RAISE(Expression simplified_filter, @@ -106,6 +171,36 @@ class FilterAndProjectScanTask : public ScanTask { options_->pool); } + Result Execute() override { + if (task_->supports_async()) { + ARROW_ASSIGN_OR_RAISE(auto gen, ExecuteAsync()); + return MakeGeneratorIterator(std::move(gen)); + } else { + return ExecuteSync(); + } + } + + Result ExecuteAsync() override { + if (!task_->supports_async()) { + return Status::Invalid( + "ExecuteAsync should not have been called on FilterAndProjectScanTask if the " + "source task did not support async"); + } + ARROW_ASSIGN_OR_RAISE(auto gen, task_->ExecuteAsync()); + + ARROW_ASSIGN_OR_RAISE(Expression simplified_filter, + SimplifyWithGuarantee(options()->filter, partition_)); + + ARROW_ASSIGN_OR_RAISE(Expression simplified_projection, + SimplifyWithGuarantee(options()->projection, partition_)); + + RecordBatchGenerator filter_gen = + FilterRecordBatch(std::move(gen), simplified_filter, options_->pool); + + return ProjectRecordBatch(std::move(filter_gen), simplified_projection, + options_->pool); + } + private: std::shared_ptr task_; Expression partition_; diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 66b1edff568..eec8ed21668 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -151,6 +151,21 @@ TEST_F(TestScanner, ToTable) { AssertTablesEqual(*expected, *actual); } +class TestScannerNestedParallelism : public NestedParallelismMixin {}; + +TEST_F(TestScannerNestedParallelism, Scan) { + constexpr int NUM_BATCHES = 32; + RecordBatchVector batches; + for (int i = 0; i < NUM_BATCHES; i++) { + batches.push_back(ConstantArrayGenerator::Zeroes(/*size=*/1, schema_)); + } + auto dataset = std::make_shared(schema_, std::move(batches)); + ScannerBuilder builder{dataset, options_}; + ASSERT_OK_AND_ASSIGN(auto scanner, builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table, scanner->ToTable()); + ASSERT_EQ(table->num_rows(), NUM_BATCHES); +} + class TestScannerBuilder : public ::testing::Test { void SetUp() override { DatasetVector sources; diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index 6a4c1eb8d13..86bb14b038d 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -780,5 +780,156 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin { std::shared_ptr scan_options_; }; +// These test cases will run on a thread pool with 1 thread. Any illegal (non-async) +// nested parallelism should deadlock the test +class NestedParallelismMixin : public ::testing::Test { + protected: + static void SetUpTestSuite() {} + + void TearDown() override { + if (old_capacity_ > 0) { + ASSERT_OK(internal::GetCpuThreadPool()->SetCapacity(old_capacity_)); + } + } + + void SetUp() override { + old_capacity_ = internal::GetCpuThreadPool()->GetCapacity(); + ASSERT_OK(internal::GetCpuThreadPool()->SetCapacity(1)); + schema_ = schema({field("i32", int32())}); + options_ = std::make_shared(); + options_->dataset_schema = schema_; + options_->use_threads = true; + } + + class NestedParallelismScanTask : public ScanTask { + public: + explicit NestedParallelismScanTask(std::shared_ptr target) + : ScanTask(target->options(), target->fragment()), target_(std::move(target)) {} + virtual ~NestedParallelismScanTask() = default; + + Result Execute() override { + // We could just return an invalid status here but this way it is easy to verify the + // test is checking what it is supposed to be checking by just changing + // supports_async() to false (will deadlock) + ADD_FAILURE() << "NestedParallelismScanTask::Execute should never be called. You " + "should be deadlocked right now"; + ARROW_ASSIGN_OR_RAISE(auto batch_gen, ExecuteAsync()); + return MakeGeneratorIterator(std::move(batch_gen)); + } + + Result ExecuteAsync() override { + ARROW_ASSIGN_OR_RAISE(auto batches_it, target_->Execute()); + ARROW_ASSIGN_OR_RAISE(auto batches, batches_it.ToVector()); + auto generator_fut = DeferNotOk(internal::GetCpuThreadPool()->Submit( + [batches] { return MakeVectorGenerator(batches); })); + return MakeFromFuture(generator_fut); + } + + bool supports_async() const override { return true; } + + private: + std::shared_ptr target_; + }; + + class NestedParallelismFragment : public InMemoryFragment { + public: + explicit NestedParallelismFragment(RecordBatchVector record_batches, + Expression expr = literal(true)) + : InMemoryFragment(std::move(record_batches), std::move(expr)) {} + + Result Scan(std::shared_ptr options) override { + ARROW_ASSIGN_OR_RAISE(auto scan_task_it, InMemoryFragment::Scan(options)); + return MakeMaybeMapIterator( + [](std::shared_ptr task) -> Result> { + return std::make_shared(std::move(task)); + }, + std::move(scan_task_it)); + } + }; + + class NestedParallelismDataset : public InMemoryDataset { + public: + NestedParallelismDataset(std::shared_ptr sch, RecordBatchVector batches) + : InMemoryDataset(std::move(sch), std::move(batches)) {} + + protected: + Result GetFragmentsImpl(Expression) override { + auto schema = this->schema(); + + auto create_fragment = + [schema]( + std::shared_ptr batch) -> Result> { + RecordBatchVector batches{batch}; + return std::make_shared(std::move(batches)); + }; + + return MakeMaybeMapIterator(std::move(create_fragment), get_batches_->Get()); + } + }; + + class DiscardingRowCountingFileWriteOptions : public FileWriteOptions { + public: + explicit DiscardingRowCountingFileWriteOptions( + std::shared_ptr> row_counter) + : FileWriteOptions( + std::make_shared(std::move(row_counter))) {} + }; + + class DiscardingRowCountingFileWriter : public FileWriter { + public: + explicit DiscardingRowCountingFileWriter(std::shared_ptr> row_count) + : FileWriter(NULL, NULL, NULL), row_count_(std::move(row_count)) {} + virtual ~DiscardingRowCountingFileWriter() = default; + + Status Write(const std::shared_ptr& batch) override { + row_count_->fetch_add(static_cast(batch->num_rows())); + return Status::OK(); + } + Status Finish() override { return Status::OK(); }; + + protected: + Status FinishInternal() override { return Status::OK(); }; + + private: + std::shared_ptr> row_count_; + }; + + class DiscardingRowCountingFormat : public FileFormat { + public: + DiscardingRowCountingFormat() : row_count_(std::make_shared>(0)) {} + explicit DiscardingRowCountingFormat(std::shared_ptr> row_count) + : row_count_(std::move(row_count)) {} + virtual ~DiscardingRowCountingFormat() = default; + + std::string type_name() const override { return "discarding-row-counting"; } + bool Equals(const FileFormat& other) const override { return true; } + Result IsSupported(const FileSource& source) const override { + return Status::NotImplemented("Should not be called"); + } + Result> Inspect(const FileSource& source) const override { + return Status::NotImplemented("Should not be called"); + } + Result ScanFile( + std::shared_ptr options, + const std::shared_ptr& file) const override { + return Status::NotImplemented("Should not be called"); + } + Result> MakeWriter( + std::shared_ptr destination, std::shared_ptr schema, + std::shared_ptr options) const override { + return std::make_shared(row_count_); + } + std::shared_ptr DefaultWriteOptions() override { return NULLPTR; } + + private: + std::shared_ptr> row_count_; + }; + + protected: + int old_capacity_ = 0; + std::shared_ptr schema_; + std::shared_ptr options_; +}; + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index 2a7ff46c4d2..db98243267b 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -640,6 +640,40 @@ class SerialReadaheadGenerator { std::shared_ptr state_; }; +template +class FutureFirstGenerator { + public: + explicit FutureFirstGenerator(Future> future) + : state_(std::make_shared(std::move(future))) {} + + Future operator()() { + if (state_->source_) { + return state_->source_(); + } else { + auto state = state_; + return state_->future_.Then([state](const AsyncGenerator& source) { + state->source_ = source; + return state->source_(); + }); + } + } + + private: + struct State { + explicit State(Future> future) : future_(future), source_() {} + + Future> future_; + AsyncGenerator source_; + }; + + std::shared_ptr state_; +}; + +template +AsyncGenerator MakeFromFuture(Future> future) { + return FutureFirstGenerator(std::move(future)); +} + /// \brief Creates a generator that will pull from the source into a queue. Unlike /// MakeReadaheadGenerator this will not pull reentrantly from the source. /// diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 2cc14c5f16d..3f2d63f89d6 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -270,6 +270,7 @@ test_that("IPC/Feather format data", { }) test_that("CSV dataset", { + skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-12181 ds <- open_dataset(csv_dir, partitioning = "part", format = "csv") expect_is(ds$format, "CsvFileFormat") expect_is(ds$filesystem, "LocalFileSystem") From 87844beb9e800b84d06c9d3aa19ef777a9a31331 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 3 Apr 2021 13:03:59 -0400 Subject: [PATCH 03/10] ARROW-12112: [Rust] Create and store less debug information in CI and integration tests # Rationale Rust debug symbols are quite verbose, taking up memory during the final link time as well as significant disk space. Turning off the creation of symbols should save us compile / test time for CI as well as space on the integration test # Change Do not produce debug symbols on Rust CI (keep enough to have line numbers in `panic!` traceback, but not enough to interpret a core file, which no one does to my knowledge anyways) Note that the integration test passed: https://github.com/apache/arrow/pull/9879/checks?check_run_id=2256148363 Closes #9879 from alamb/less_symbols_in_integration Authored-by: Andrew Lamb Signed-off-by: Andrew Lamb --- .github/workflows/rust.yml | 17 +++++++++++++++++ ci/scripts/rust_build.sh | 3 +++ 2 files changed, 20 insertions(+) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 1dd220ade94..af478de995f 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -41,6 +41,10 @@ jobs: rust: [stable] container: image: ${{ matrix.arch }}/rust + env: + # Disable full debug symbol generation to speed up CI build + # "1" means line tables only, which is useful for panic tracebacks. + RUSTFLAGS: "-C debuginfo=1" steps: - uses: actions/checkout@v2 - name: Cache Cargo @@ -83,6 +87,9 @@ jobs: container: image: ${{ matrix.arch }}/rust env: + # Disable full debug symbol generation to speed up CI build + # "1" means line tables only, which is useful for panic tracebacks. + RUSTFLAGS: "-C debuginfo=1" ARROW_TEST_DATA: /__w/arrow/arrow/testing/data PARQUET_TEST_DATA: /__w/arrow/arrow/cpp/submodules/parquet-testing/data steps: @@ -140,6 +147,9 @@ jobs: container: image: ${{ matrix.arch }}/rust env: + # Disable full debug symbol generation to speed up CI build + # "1" means line tables only, which is useful for panic tracebacks. + RUSTFLAGS: "-C debuginfo=1" ARROW_TEST_DATA: /__w/arrow/arrow/testing/data steps: - uses: actions/checkout@v2 @@ -207,6 +217,10 @@ jobs: rust: [stable] container: image: ${{ matrix.arch }}/rust + env: + # Disable full debug symbol generation to speed up CI build + # "1" means line tables only, which is useful for panic tracebacks. + RUSTFLAGS: "-C debuginfo=1" steps: - uses: actions/checkout@v2 with: @@ -367,6 +381,9 @@ jobs: container: image: ${{ matrix.arch }}/rust env: + # Disable full debug symbol generation to speed up CI build + # "1" means line tables only, which is useful for panic tracebacks. + RUSTFLAGS: "-C debuginfo=1" ARROW_TEST_DATA: /__w/arrow/arrow/testing/data PARQUET_TEST_DATA: /__w/arrow/arrow/cpp/submodules/parquet-testing/data steps: diff --git a/ci/scripts/rust_build.sh b/ci/scripts/rust_build.sh index bdea5e44f6b..94cf7560aca 100755 --- a/ci/scripts/rust_build.sh +++ b/ci/scripts/rust_build.sh @@ -21,6 +21,9 @@ set -ex source_dir=${1}/rust +# Disable full debug symbol generation to speed up CI build / reduce memory required +export RUSTFLAGS="-C debuginfo=1" + export ARROW_TEST_DATA=${arrow_dir}/testing/data export PARQUET_TEST_DATA=${arrow_dir}/cpp/submodules/parquet-testing/data From 5a09421e94991d85b0d63bb3a1f7b0a09e5e4f8c Mon Sep 17 00:00:00 2001 From: Ivan Smirnov Date: Sun, 4 Apr 2021 05:42:35 -0400 Subject: [PATCH 04/10] ARROW-12194: [Rust][Parquet] Bump zstd to v0.7 This updates zstd version used by parquet crate to zstd = "0.7.0+zstd.1.4.9". Closes #9881 from aldanor/feature/zstd-0.7 Authored-by: Ivan Smirnov Signed-off-by: Andrew Lamb --- rust/parquet/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/parquet/Cargo.toml b/rust/parquet/Cargo.toml index d449cad30af..e171196fbc8 100644 --- a/rust/parquet/Cargo.toml +++ b/rust/parquet/Cargo.toml @@ -38,7 +38,7 @@ snap = { version = "1.0", optional = true } brotli = { version = "3.3", optional = true } flate2 = { version = "1.0", optional = true } lz4 = { version = "1.23", optional = true } -zstd = { version = "0.6", optional = true } +zstd = { version = "0.7", optional = true } chrono = "0.4" num-bigint = "0.3" arrow = { path = "../arrow", version = "4.0.0-SNAPSHOT", optional = true } @@ -53,7 +53,7 @@ snap = "1.0" brotli = "3.3" flate2 = "1.0" lz4 = "1.23" -zstd = "0.6" +zstd = "0.7" arrow = { path = "../arrow", version = "4.0.0-SNAPSHOT" } serde_json = { version = "1.0", features = ["preserve_order"] } From 1399c0a21293ccf6225342d0e328f55ba2ab64a1 Mon Sep 17 00:00:00 2001 From: Mike Seddon Date: Sun, 4 Apr 2021 05:44:39 -0400 Subject: [PATCH 05/10] ARROW-12186: [Rust][DataFusion] Fix regexp_match test This just moves the tests to allow the feature-flag to be used and pass this kind of test (where previously it would fail) ```bash cargo test --no-default-features --features cli ``` Closes #9874 from seddonm1/regexp_match_test Authored-by: Mike Seddon Signed-off-by: Andrew Lamb --- rust/datafusion/tests/sql.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index 8c2c35ef6f0..5f3bc30b2c8 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -2499,6 +2499,17 @@ async fn test_regex_expressions() -> Result<()> { test_expression!("regexp_replace('foobarbaz', NULL, 'X\\1Y', 'g')", "NULL"); test_expression!("regexp_replace('Thomas', '.[mN]a.', 'M')", "ThM"); test_expression!("regexp_replace(NULL, 'b(..)', 'X\\1Y', 'g')", "NULL"); + test_expression!("regexp_match('foobarbequebaz', '')", "[]"); + test_expression!( + "regexp_match('foobarbequebaz', '(bar)(beque)')", + "[bar, beque]" + ); + test_expression!("regexp_match('foobarbequebaz', '(ba3r)(bequ34e)')", "NULL"); + test_expression!("regexp_match('aaa-0', '.*-(\\d)')", "[0]"); + test_expression!("regexp_match('bb-1', '.*-(\\d)')", "[1]"); + test_expression!("regexp_match('aa', '.*-(\\d)')", "NULL"); + test_expression!("regexp_match(NULL, '.*-(\\d)')", "NULL"); + test_expression!("regexp_match('aaa-0', NULL)", "NULL"); Ok(()) } @@ -2560,17 +2571,6 @@ async fn test_in_list_scalar() -> Result<()> { test_expression!("'2' IN ('a','b',NULL,1)", "NULL"); test_expression!("'1' NOT IN ('a','b',NULL,1)", "false"); test_expression!("'2' NOT IN ('a','b',NULL,1)", "NULL"); - test_expression!("regexp_match('foobarbequebaz', '')", "[]"); - test_expression!( - "regexp_match('foobarbequebaz', '(bar)(beque)')", - "[bar, beque]" - ); - test_expression!("regexp_match('foobarbequebaz', '(ba3r)(bequ34e)')", "NULL"); - test_expression!("regexp_match('aaa-0', '.*-(\\d)')", "[0]"); - test_expression!("regexp_match('bb-1', '.*-(\\d)')", "[1]"); - test_expression!("regexp_match('aa', '.*-(\\d)')", "NULL"); - test_expression!("regexp_match(NULL, '.*-(\\d)')", "NULL"); - test_expression!("regexp_match('aaa-0', NULL)", "NULL"); Ok(()) } From 80a0eee190b111472b9e22d96268f1881c4b8509 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Sun, 4 Apr 2021 09:38:05 -0700 Subject: [PATCH 06/10] ARROW-12034: [Developer Tools] Formalize Minor PRs Closes #9763 from emkornfield/trivial_prs Lead-authored-by: Micah Kornfield Co-authored-by: emkornfield Signed-off-by: Micah Kornfield --- .github/workflows/dev_pr/title_check.js | 3 +++ .github/workflows/dev_pr/title_check.md | 9 +++++++-- CONTRIBUTING.md | 10 ++++++++++ dev/merge_arrow_pr.py | 11 +++++++++-- 4 files changed, 29 insertions(+), 4 deletions(-) diff --git a/.github/workflows/dev_pr/title_check.js b/.github/workflows/dev_pr/title_check.js index 2a8d654567e..c1ebd9d3e4d 100644 --- a/.github/workflows/dev_pr/title_check.js +++ b/.github/workflows/dev_pr/title_check.js @@ -21,6 +21,9 @@ function haveJIRAID(title) { if (!title) { return false; } + if (title.startsWith("MINOR: ")) { + return true; + } return /^(WIP:?\s*)?(ARROW|PARQUET)-\d+/.test(title); } diff --git a/.github/workflows/dev_pr/title_check.md b/.github/workflows/dev_pr/title_check.md index f008f55620a..f38af6a2175 100644 --- a/.github/workflows/dev_pr/title_check.md +++ b/.github/workflows/dev_pr/title_check.md @@ -19,13 +19,18 @@ Thanks for opening a pull request! -Could you open an issue for this pull request on JIRA? -https://issues.apache.org/jira/browse/ARROW +If this is not a [minor PR](https://github.com/apache/arrow/blob/master/.github/CONTRIBUTING.md#Minor-Fixes). Could you open an issue for this pull request on JIRA? https://issues.apache.org/jira/browse/ARROW + +Opening JIRAs ahead of time contributes to the [Openness](http://theapacheway.com/open/#:~:text=Openness%20allows%20new%20users%20the,must%20happen%20in%20the%20open.) of the Apache Arrow project. Then could you also rename pull request title in the following format? ARROW-${JIRA_ID}: [${COMPONENT}] ${SUMMARY} +or + + MINOR: [${COMPONENT}] ${SUMMARY} + See also: * [Other pull requests](https://github.com/apache/arrow/pulls/) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 380886872fa..3e636d9cd2f 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -53,6 +53,16 @@ with the JIRA issue number and the component name in brackets. Respecting this convention makes it easier for us to process the backlog of submitted Pull Requests. +### Minor Fixes + +Any functionality change should have a JIRA opened. For minor changes that +affect documentation, you do not need to open up a JIRA. Instead you can +prefix the title of your PR with "MINOR: " if meets the following guidelines: + +* Grammar, usage and spelling fixes that affect no more than 2 files +* Documentation updates affecting no more than 2 files and not more + than 500 words. + ## Do you want to propose a significant new feature or an important refactoring? We ask that all discussions about major changes in the codebase happen diff --git a/dev/merge_arrow_pr.py b/dev/merge_arrow_pr.py index 9036c012313..373ceb8e20f 100755 --- a/dev/merge_arrow_pr.py +++ b/dev/merge_arrow_pr.py @@ -316,7 +316,10 @@ def show(self): print("\n=== Pull Request #%s ===" % self.number) print("title\t%s\nsource\t%s\ntarget\t%s\nurl\t%s" % (self.title, self.description, self.target_ref, self.url)) - self.jira_issue.show() + if self.jira_issue is not None: + self.jira_issue.show() + else: + print("Minor PR. Please ensure it meets guidelines for minor.\n") @property def is_merged(self): @@ -334,7 +337,7 @@ def _get_jira(self): jira_id = m.group(1) break - if jira_id is None: + if jira_id is None and not self.title.startswith("MINOR:"): options = ' or '.join('{0}-XXX'.format(project) for project in SUPPORTED_PROJECTS) self.cmd.fail("PR title should be prefixed by a jira id " @@ -585,6 +588,10 @@ def cli(): # merged hash not used pr.merge() + if pr.jira_issue is None: + print("Minor PR. No JIRA issue to update.\n") + return + cmd.continue_maybe("Would you like to update the associated JIRA?") jira_comment = ( "Issue resolved by pull request %s\n[%s/%s]" From 35a1ab37189fd4d6366e5a139dc3b6c106c0ba15 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Sun, 4 Apr 2021 10:13:06 -0700 Subject: [PATCH 07/10] ARROW-12193: [Dev][Release] Use downloadable URL for archive download This depends on ARROW-12192: https://github.com/apache/arrow-site/pull/99 Closes #9885 from kou/release-post-website-download Lead-authored-by: Sutou Kouhei Co-authored-by: Sutou Kouhei Signed-off-by: Neal Richardson --- dev/release/post-03-website.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dev/release/post-03-website.sh b/dev/release/post-03-website.sh index 2483dfe80c1..b427142ea98 100755 --- a/dev/release/post-03-website.sh +++ b/dev/release/post-03-website.sh @@ -242,8 +242,8 @@ current: github-tag-link: 'https://github.com/apache/arrow/releases/tag/${git_tag}' release-notes: 'https://arrow.apache.org/release/${version}.html' mirrors: 'https://www.apache.org/dyn/closer.lua/arrow/arrow-${version}/' - tarball_name: 'apache-arrow-${version}.tar.gz' - mirrors-tar: 'https://www.apache.org/dyn/closer.lua/arrow/arrow-${version}/apache-arrow-${version}.tar.gz' + tarball-name: 'apache-arrow-${version}.tar.gz' + tarball-url: 'https://www.apache.org/dyn/closer.lua?action=download&filename=arrow/arrow-${version}/apache-arrow-${version}.tar.gz' java-artifacts: 'http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.arrow%22%20AND%20v%3A%22${version}%22' asc: '${apache_download_url}/arrow/arrow-${version}/apache-arrow-${version}.tar.gz.asc' sha256: '${apache_download_url}/arrow/arrow-${version}/apache-arrow-${version}.tar.gz.sha256' From 08bac10767ced58f50a8af757c8558396867ff1d Mon Sep 17 00:00:00 2001 From: Mauricio Vargas Date: Mon, 5 Apr 2021 10:49:44 -0400 Subject: [PATCH 08/10] fixes all msgs in 9880 --- r/01-update-system.sh | 26 +++++++++++++++ r/02-environment-variables.sh | 63 +++++++++++++++++++++++++++++++++++ r/03-build-arrow-lib.sh | 35 +++++++++++++++++++ r/04-build-arrow-pkg.sh | 16 +++++++++ 4 files changed, 140 insertions(+) create mode 100644 r/01-update-system.sh create mode 100644 r/02-environment-variables.sh create mode 100644 r/03-build-arrow-lib.sh create mode 100644 r/04-build-arrow-pkg.sh diff --git a/r/01-update-system.sh b/r/01-update-system.sh new file mode 100644 index 00000000000..a59e2a67e3b --- /dev/null +++ b/r/01-update-system.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +# slightly based on +# https://github.com/apache/arrow/issues/6270 +# https://issues.apache.org/jira/browse/ARROW-10495 +# https://github.com/apache/arrow/issues/1514 +# https://embeddeddevelop.blogspot.com/2019/01/clang-tidy-cmake-on-ubuntu-1804.html +# https://apt.llvm.org/ +# https://arrow.apache.org/docs/r/#developing + + +if [[ $EUID -ne 0 ]]; then + echo "This script must be run as root!" + exit 1 +fi + +# the basics +apt-get -y update +apt-get -y install cmake build-essential + +# faster builds +apt-get -y install ninja-build + +# s3 dependencies +apt-get -y install libcurl4-openssl-dev + diff --git a/r/02-environment-variables.sh b/r/02-environment-variables.sh new file mode 100644 index 00000000000..029bf39dbad --- /dev/null +++ b/r/02-environment-variables.sh @@ -0,0 +1,63 @@ +#!/bin/bash + +# set environment variables + +source ~/.bashrc + +if [[ $ARROW_HOME == "" ]] +then + echo 'adding ARROW_HOME to .bashrc' + printf '\n#ARROW VARS\nexport ARROW_HOME=~/.arrow' | tee -a ~/.bashrc +else + echo 'ARROW_HOME is already set' +fi + +if [[ $LD_LIBRARY_PATH == "" ]] +then + echo 'adding LD_LIBRARY_PATH to .bashrc' + printf '\nexport LD_LIBRARY_PATH=${ARROW_HOME}/lib:${LD_LIBRARY_PATH}' | tee -a ~/.bashrc +else + echo 'LD_LIBRARY_PATH is already set' +fi + +if [[ $ARROW_R_DEV == "" ]] +then + echo 'adding ARROW_R_DEV to .bashrc' + printf '\nexport ARROW_R_DEV=true' | tee -a ~/.bashrc +else + echo 'ARROW_R_DEV is already set' +fi + +if [[ $PKG_CONFIG_PATH == "" ]] +then + echo 'adding PKG_CONFIG_PATH to .bashrc' + printf '\nexport PKG_CONFIG_PATH=${ARROW_HOME}/lib/pkgconfig' | tee -a ~/.bashrc +else + echo 'PKG_CONFIG_PATH is already set' +fi + +if [[ $ARROW_WITH_LZ4 == "" ]] +then + echo 'adding ARROW_WITH_LZ4 to .bashrc' + printf '\nexport ARROW_WITH_LZ4=ON' | tee -a ~/.bashrc +else + echo 'ARROW_WITH_LZ4 is already set' +fi + +if [[ $ARROW_WITH_BROTLI == "" ]] +then + echo 'adding ARROW_WITH_BROTLI to .bashrc' + printf '\nexport ARROW_WITH_BROTLI=ON' | tee -a ~/.bashrc +else + echo 'ARROW_WITH_BROTLI is already set' +fi + +# verify env vars +source ~/.bashrc +echo $ARROW_HOME +echo $LD_LIBRARY_PATH +echo $ARROW_R_DEV +echo $PKG_CONFIG_PATH +echo $ARROW_WITH_LZ4 +echo $ARROW_WITH_BROTLI + diff --git a/r/03-build-arrow-lib.sh b/r/03-build-arrow-lib.sh new file mode 100644 index 00000000000..a3582e3a2d6 --- /dev/null +++ b/r/03-build-arrow-lib.sh @@ -0,0 +1,35 @@ +#!/bin/bash + +export MAKEFLAGS="-j$(grep -c ^processor /proc/cpuinfo)" +mkdir -p ../cpp/build && pushd ../cpp/build + +# configure the build using ninja +# see https://arrow.apache.org/docs/developers/cpp/building.html#faster-builds-with-ninja +# no S3 AWS SDK support +cmake -DCMAKE_INSTALL_PREFIX=$ARROW_HOME \ + -DCMAKE_INSTALL_LIBDIR=lib \ + -DARROW_WITH_BZ2=ON \ + -DARROW_WITH_ZLIB=ON \ + -DARROW_WITH_ZSTD=ON \ + -DARROW_WITH_LZ4=$ARROW_WITH_LZ4 \ + -DARROW_WITH_SNAPPY=$ARROW_WITH_BROTLI \ + -DARROW_WITH_BROTLI=ON \ + -DARROW_PARQUET=ON \ + -DARROW_PYTHON=OFF \ + -DARROW_BUILD_TESTS=OFF \ + -DARROW_COMPUTE=ON \ + -DARROW_CSV=ON \ + -DARROW_DATASET=ON \ + -DARROW_FILESYSTEM=ON \ + -DARROW_JEMALLOC=ON \ + -DARROW_JSON=ON \ + -DCMAKE_BUILD_TYPE=release \ + -DARROW_INSTALL_NAME_RPATH=OFF \ + -DARROW_S3=ON \ + -GNinja \ + .. + +cmake --build . --target install + +pushd ../../r + diff --git a/r/04-build-arrow-pkg.sh b/r/04-build-arrow-pkg.sh new file mode 100644 index 00000000000..f90f708c941 --- /dev/null +++ b/r/04-build-arrow-pkg.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +Rscript -e " +options(repos = 'https://cloud.r-project.org/') +if (!require('assertthat')) install.packages('assertthat') +if (!require('bit64')) install.packages('bit64') +if (!require('purrr')) install.packages('purrr') +if (!require('R6')) install.packages('R6') +if (!require('rlang')) install.packages('rlang') +if (!require('tidyselect')) install.packages('tidyselect') +if (!require('vctrs')) install.packages('vctrs') +if (!require('cpp11')) install.packages('cpp11') +" + +make clean && R CMD INSTALL . && make test + From 801fe8f7f3b9c571a4c4911fb46293fad0883692 Mon Sep 17 00:00:00 2001 From: Mauricio Vargas Date: Mon, 5 Apr 2021 10:50:31 -0400 Subject: [PATCH 09/10] dont include setup scripts --- r/01-update-system.sh | 26 --------------- r/02-environment-variables.sh | 63 ----------------------------------- r/03-build-arrow-lib.sh | 35 ------------------- r/04-build-arrow-pkg.sh | 16 --------- 4 files changed, 140 deletions(-) delete mode 100644 r/01-update-system.sh delete mode 100644 r/02-environment-variables.sh delete mode 100644 r/03-build-arrow-lib.sh delete mode 100644 r/04-build-arrow-pkg.sh diff --git a/r/01-update-system.sh b/r/01-update-system.sh deleted file mode 100644 index a59e2a67e3b..00000000000 --- a/r/01-update-system.sh +++ /dev/null @@ -1,26 +0,0 @@ -#!/bin/bash - -# slightly based on -# https://github.com/apache/arrow/issues/6270 -# https://issues.apache.org/jira/browse/ARROW-10495 -# https://github.com/apache/arrow/issues/1514 -# https://embeddeddevelop.blogspot.com/2019/01/clang-tidy-cmake-on-ubuntu-1804.html -# https://apt.llvm.org/ -# https://arrow.apache.org/docs/r/#developing - - -if [[ $EUID -ne 0 ]]; then - echo "This script must be run as root!" - exit 1 -fi - -# the basics -apt-get -y update -apt-get -y install cmake build-essential - -# faster builds -apt-get -y install ninja-build - -# s3 dependencies -apt-get -y install libcurl4-openssl-dev - diff --git a/r/02-environment-variables.sh b/r/02-environment-variables.sh deleted file mode 100644 index 029bf39dbad..00000000000 --- a/r/02-environment-variables.sh +++ /dev/null @@ -1,63 +0,0 @@ -#!/bin/bash - -# set environment variables - -source ~/.bashrc - -if [[ $ARROW_HOME == "" ]] -then - echo 'adding ARROW_HOME to .bashrc' - printf '\n#ARROW VARS\nexport ARROW_HOME=~/.arrow' | tee -a ~/.bashrc -else - echo 'ARROW_HOME is already set' -fi - -if [[ $LD_LIBRARY_PATH == "" ]] -then - echo 'adding LD_LIBRARY_PATH to .bashrc' - printf '\nexport LD_LIBRARY_PATH=${ARROW_HOME}/lib:${LD_LIBRARY_PATH}' | tee -a ~/.bashrc -else - echo 'LD_LIBRARY_PATH is already set' -fi - -if [[ $ARROW_R_DEV == "" ]] -then - echo 'adding ARROW_R_DEV to .bashrc' - printf '\nexport ARROW_R_DEV=true' | tee -a ~/.bashrc -else - echo 'ARROW_R_DEV is already set' -fi - -if [[ $PKG_CONFIG_PATH == "" ]] -then - echo 'adding PKG_CONFIG_PATH to .bashrc' - printf '\nexport PKG_CONFIG_PATH=${ARROW_HOME}/lib/pkgconfig' | tee -a ~/.bashrc -else - echo 'PKG_CONFIG_PATH is already set' -fi - -if [[ $ARROW_WITH_LZ4 == "" ]] -then - echo 'adding ARROW_WITH_LZ4 to .bashrc' - printf '\nexport ARROW_WITH_LZ4=ON' | tee -a ~/.bashrc -else - echo 'ARROW_WITH_LZ4 is already set' -fi - -if [[ $ARROW_WITH_BROTLI == "" ]] -then - echo 'adding ARROW_WITH_BROTLI to .bashrc' - printf '\nexport ARROW_WITH_BROTLI=ON' | tee -a ~/.bashrc -else - echo 'ARROW_WITH_BROTLI is already set' -fi - -# verify env vars -source ~/.bashrc -echo $ARROW_HOME -echo $LD_LIBRARY_PATH -echo $ARROW_R_DEV -echo $PKG_CONFIG_PATH -echo $ARROW_WITH_LZ4 -echo $ARROW_WITH_BROTLI - diff --git a/r/03-build-arrow-lib.sh b/r/03-build-arrow-lib.sh deleted file mode 100644 index a3582e3a2d6..00000000000 --- a/r/03-build-arrow-lib.sh +++ /dev/null @@ -1,35 +0,0 @@ -#!/bin/bash - -export MAKEFLAGS="-j$(grep -c ^processor /proc/cpuinfo)" -mkdir -p ../cpp/build && pushd ../cpp/build - -# configure the build using ninja -# see https://arrow.apache.org/docs/developers/cpp/building.html#faster-builds-with-ninja -# no S3 AWS SDK support -cmake -DCMAKE_INSTALL_PREFIX=$ARROW_HOME \ - -DCMAKE_INSTALL_LIBDIR=lib \ - -DARROW_WITH_BZ2=ON \ - -DARROW_WITH_ZLIB=ON \ - -DARROW_WITH_ZSTD=ON \ - -DARROW_WITH_LZ4=$ARROW_WITH_LZ4 \ - -DARROW_WITH_SNAPPY=$ARROW_WITH_BROTLI \ - -DARROW_WITH_BROTLI=ON \ - -DARROW_PARQUET=ON \ - -DARROW_PYTHON=OFF \ - -DARROW_BUILD_TESTS=OFF \ - -DARROW_COMPUTE=ON \ - -DARROW_CSV=ON \ - -DARROW_DATASET=ON \ - -DARROW_FILESYSTEM=ON \ - -DARROW_JEMALLOC=ON \ - -DARROW_JSON=ON \ - -DCMAKE_BUILD_TYPE=release \ - -DARROW_INSTALL_NAME_RPATH=OFF \ - -DARROW_S3=ON \ - -GNinja \ - .. - -cmake --build . --target install - -pushd ../../r - diff --git a/r/04-build-arrow-pkg.sh b/r/04-build-arrow-pkg.sh deleted file mode 100644 index f90f708c941..00000000000 --- a/r/04-build-arrow-pkg.sh +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/bash - -Rscript -e " -options(repos = 'https://cloud.r-project.org/') -if (!require('assertthat')) install.packages('assertthat') -if (!require('bit64')) install.packages('bit64') -if (!require('purrr')) install.packages('purrr') -if (!require('R6')) install.packages('R6') -if (!require('rlang')) install.packages('rlang') -if (!require('tidyselect')) install.packages('tidyselect') -if (!require('vctrs')) install.packages('vctrs') -if (!require('cpp11')) install.packages('cpp11') -" - -make clean && R CMD INSTALL . && make test - From b12512f986d6bb4952995669a4b483e953c48b4b Mon Sep 17 00:00:00 2001 From: Mauricio Vargas Date: Mon, 5 Apr 2021 10:51:57 -0400 Subject: [PATCH 10/10] remove 1 --- .github/workflows/dev_pr/title_check.js | 56 ---- .github/workflows/dev_pr/title_check.md | 37 --- .github/workflows/rust.yml | 417 ------------------------ 3 files changed, 510 deletions(-) delete mode 100644 .github/workflows/dev_pr/title_check.js delete mode 100644 .github/workflows/dev_pr/title_check.md delete mode 100644 .github/workflows/rust.yml diff --git a/.github/workflows/dev_pr/title_check.js b/.github/workflows/dev_pr/title_check.js deleted file mode 100644 index c1ebd9d3e4d..00000000000 --- a/.github/workflows/dev_pr/title_check.js +++ /dev/null @@ -1,56 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -const fs = require("fs"); - -function haveJIRAID(title) { - if (!title) { - return false; - } - if (title.startsWith("MINOR: ")) { - return true; - } - return /^(WIP:?\s*)?(ARROW|PARQUET)-\d+/.test(title); -} - -async function commentOpenJIRAIssue(github, context, pullRequestNumber) { - const {data: comments} = await github.issues.listComments({ - owner: context.repo.owner, - repo: context.repo.repo, - issue_number: pullRequestNumber, - per_page: 1 - }); - if (comments.length > 0) { - return; - } - const commentPath = ".github/workflows/dev_pr/title_check.md"; - const comment = fs.readFileSync(commentPath).toString(); - await github.issues.createComment({ - owner: context.repo.owner, - repo: context.repo.repo, - issue_number: pullRequestNumber, - body: comment - }); -} - -module.exports = async ({github, context}) => { - const pullRequestNumber = context.payload.number; - const title = context.payload.pull_request.title; - if (!haveJIRAID(title)) { - await commentOpenJIRAIssue(github, context, pullRequestNumber); - } -}; diff --git a/.github/workflows/dev_pr/title_check.md b/.github/workflows/dev_pr/title_check.md deleted file mode 100644 index f38af6a2175..00000000000 --- a/.github/workflows/dev_pr/title_check.md +++ /dev/null @@ -1,37 +0,0 @@ - - -Thanks for opening a pull request! - -If this is not a [minor PR](https://github.com/apache/arrow/blob/master/.github/CONTRIBUTING.md#Minor-Fixes). Could you open an issue for this pull request on JIRA? https://issues.apache.org/jira/browse/ARROW - -Opening JIRAs ahead of time contributes to the [Openness](http://theapacheway.com/open/#:~:text=Openness%20allows%20new%20users%20the,must%20happen%20in%20the%20open.) of the Apache Arrow project. - -Then could you also rename pull request title in the following format? - - ARROW-${JIRA_ID}: [${COMPONENT}] ${SUMMARY} - -or - - MINOR: [${COMPONENT}] ${SUMMARY} - -See also: - - * [Other pull requests](https://github.com/apache/arrow/pulls/) - * [Contribution Guidelines - How to contribute patches](https://arrow.apache.org/docs/developers/contributing.html#how-to-contribute-patches) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml deleted file mode 100644 index af478de995f..00000000000 --- a/.github/workflows/rust.yml +++ /dev/null @@ -1,417 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -name: Rust - -on: - push: - paths: - - '.github/workflows/rust.yml' - - 'rust/**' - - 'format/Flight.proto' - pull_request: - paths: - - '.github/workflows/rust.yml' - - 'rust/**' - - 'format/Flight.proto' - -jobs: - - # build the library, a compilation step used by multiple steps below - linux-build-lib: - name: AMD64 Debian 10 Rust ${{ matrix.rust }} build libraries - runs-on: ubuntu-latest - strategy: - matrix: - arch: [amd64] - rust: [stable] - container: - image: ${{ matrix.arch }}/rust - env: - # Disable full debug symbol generation to speed up CI build - # "1" means line tables only, which is useful for panic tracebacks. - RUSTFLAGS: "-C debuginfo=1" - steps: - - uses: actions/checkout@v2 - - name: Cache Cargo - uses: actions/cache@v2 - with: - # these represent dependencies downloaded by cargo - # and thus do not depend on the OS, arch nor rust version. - path: /github/home/.cargo - key: cargo-cache- - - name: Cache Rust dependencies - uses: actions/cache@v2 - with: - # these represent compiled steps of both dependencies and arrow - # and thus are specific for a particular OS, arch and rust version. - path: /github/home/target - key: ${{ runner.os }}-${{ matrix.arch }}-target-cache-${{ matrix.rust }}- - - name: Setup Rust toolchain - run: | - rustup toolchain install ${{ matrix.rust }} - rustup default ${{ matrix.rust }} - rustup component add rustfmt - - name: Build - run: | - export CARGO_HOME="/github/home/.cargo" - export CARGO_TARGET_DIR="/github/home/target" - # do not produce debug symbols to keep memory usage down - export RUSTFLAGS="-C debuginfo=0" - cd rust - cargo build - - # test the crate - linux-test: - name: AMD64 Debian 10 Rust ${{ matrix.rust }} test workspace - needs: [linux-build-lib] - runs-on: ubuntu-latest - strategy: - matrix: - arch: [amd64] - rust: [stable] - container: - image: ${{ matrix.arch }}/rust - env: - # Disable full debug symbol generation to speed up CI build - # "1" means line tables only, which is useful for panic tracebacks. - RUSTFLAGS: "-C debuginfo=1" - ARROW_TEST_DATA: /__w/arrow/arrow/testing/data - PARQUET_TEST_DATA: /__w/arrow/arrow/cpp/submodules/parquet-testing/data - steps: - - uses: actions/checkout@v2 - with: - submodules: true - - name: Cache Cargo - uses: actions/cache@v2 - with: - path: /github/home/.cargo - # this key equals the ones on `linux-build-lib` for re-use - key: cargo-cache- - - name: Cache Rust dependencies - uses: actions/cache@v2 - with: - path: /github/home/target - # this key equals the ones on `linux-build-lib` for re-use - key: ${{ runner.os }}-${{ matrix.arch }}-target-cache-${{ matrix.rust }} - - name: Setup Rust toolchain - run: | - rustup toolchain install ${{ matrix.rust }} - rustup default ${{ matrix.rust }} - rustup component add rustfmt - - name: Run tests - run: | - export CARGO_HOME="/github/home/.cargo" - export CARGO_TARGET_DIR="/github/home/target" - # do not produce debug symbols to keep memory usage down - export RUSTFLAGS="-C debuginfo=0" - cd rust - # run tests on all workspace members with default feature list - cargo test - # test datafusion examples - cd datafusion-examples - cargo test --no-default-features - cargo run --example csv_sql - cargo run --example parquet_sql - cd .. - cd arrow - # re-run tests on arrow workspace with additional features - cargo test --features=prettyprint - cargo run --example builders - cargo run --example dynamic_types - cargo run --example read_csv - cargo run --example read_csv_infer_schema - - # test the --features "simd" of the arrow crate. This requires nightly. - linux-test-simd: - name: AMD64 Debian 10 Rust ${{ matrix.rust }} test arrow simd - runs-on: ubuntu-latest - strategy: - matrix: - arch: [amd64] - rust: [nightly-2020-11-24] - container: - image: ${{ matrix.arch }}/rust - env: - # Disable full debug symbol generation to speed up CI build - # "1" means line tables only, which is useful for panic tracebacks. - RUSTFLAGS: "-C debuginfo=1" - ARROW_TEST_DATA: /__w/arrow/arrow/testing/data - steps: - - uses: actions/checkout@v2 - with: - submodules: true - - name: Cache Cargo - uses: actions/cache@v2 - with: - path: /github/home/.cargo - # this key equals the ones on `linux-build-lib` for re-use - key: cargo-cache- - - name: Cache Rust dependencies - uses: actions/cache@v2 - with: - path: /github/home/target - # this key equals the ones on `linux-build-lib` for re-use - key: ${{ runner.os }}-${{ matrix.arch }}-target-cache-${{ matrix.rust }} - - name: Setup Rust toolchain - run: | - rustup toolchain install ${{ matrix.rust }} - rustup default ${{ matrix.rust }} - rustup component add rustfmt - - name: Run tests - run: | - export CARGO_HOME="/github/home/.cargo" - export CARGO_TARGET_DIR="/github/home/target" - # do not produce debug symbols to keep memory usage down - export RUSTFLAGS="-C debuginfo=0" - cd rust/arrow - cargo test --features "simd" - - windows-and-macos: - name: ${{ matrix.os }} Rust ${{ matrix.rust }} - runs-on: ${{ matrix.os }} - strategy: - matrix: - os: [windows-latest, macos-latest] - rust: [stable] - steps: - - uses: actions/checkout@v2 - with: - submodules: true - # TODO: this won't cache anything, which is expensive. Setup this action - # with a OS-dependent path. - - name: Setup Rust toolchain - run: | - rustup toolchain install ${{ matrix.rust }} - rustup default ${{ matrix.rust }} - rustup component add rustfmt - - name: Run tests - shell: bash - run: | - export ARROW_TEST_DATA=$(pwd)/testing/data - export PARQUET_TEST_DATA=$(pwd)/cpp/submodules/parquet-testing/data - cd rust - cargo test - - clippy: - name: Clippy - needs: [linux-build-lib] - runs-on: ubuntu-latest - strategy: - matrix: - arch: [amd64] - rust: [stable] - container: - image: ${{ matrix.arch }}/rust - env: - # Disable full debug symbol generation to speed up CI build - # "1" means line tables only, which is useful for panic tracebacks. - RUSTFLAGS: "-C debuginfo=1" - steps: - - uses: actions/checkout@v2 - with: - submodules: true - - name: Cache Cargo - uses: actions/cache@v2 - with: - path: /github/home/.cargo - # this key equals the ones on `linux-build-lib` for re-use - key: cargo-cache- - - name: Cache Rust dependencies - uses: actions/cache@v2 - with: - path: /github/home/target - # this key equals the ones on `linux-build-lib` for re-use - key: ${{ runner.os }}-${{ matrix.arch }}-target-cache-${{ matrix.rust }} - - name: Setup Rust toolchain - run: | - rustup toolchain install ${{ matrix.rust }} - rustup default ${{ matrix.rust }} - rustup component add rustfmt clippy - - name: Run clippy - run: | - export CARGO_HOME="/github/home/.cargo" - export CARGO_TARGET_DIR="/github/home/target" - cd rust - cargo clippy --all-targets --workspace -- -D warnings -A clippy::redundant_field_names - - miri-checks: - name: Miri Checks - runs-on: ubuntu-latest - strategy: - matrix: - arch: [amd64] - rust: [nightly-2021-01-19] - steps: - - uses: actions/checkout@v2 - with: - submodules: true - - uses: actions/cache@v2 - with: - path: | - ~/.cargo/registry - ~/.cargo/git - target - key: ${{ runner.os }}-cargo-miri-${{ hashFiles('**/Cargo.lock') }} - - name: Setup Rust toolchain - run: | - rustup toolchain install ${{ matrix.rust }} - rustup default ${{ matrix.rust }} - rustup component add rustfmt clippy miri - - name: Run Miri Checks - env: - RUST_BACKTRACE: full - RUST_LOG: 'trace' - run: | - export MIRIFLAGS="-Zmiri-disable-isolation" - cd rust - cargo miri setup - cargo clean - # Ignore MIRI errors until we can get a clean run - cargo miri test || true - - coverage: - name: Coverage - runs-on: ubuntu-latest - strategy: - matrix: - arch: [amd64] - rust: [stable] - steps: - - uses: actions/checkout@v2 - with: - submodules: true - - name: Cache Cargo - uses: actions/cache@v2 - with: - path: /home/runner/.cargo - # this key is not equal because the user is different than on a container (runner vs github) - key: cargo-coverage-cache- - - name: Cache Rust dependencies - uses: actions/cache@v2 - with: - path: /home/runner/target - # this key is not equal because coverage uses different compilation flags. - key: ${{ runner.os }}-${{ matrix.arch }}-target-coverage-cache-${{ matrix.rust }}- - - name: Run coverage - run: | - export CARGO_HOME="/home/runner/.cargo" - export CARGO_TARGET_DIR="/home/runner/target" - - export ARROW_TEST_DATA=$(pwd)/testing/data - export PARQUET_TEST_DATA=$(pwd)/cpp/submodules/parquet-testing/data - - # 2020-11-15: There is a cargo-tarpaulin regression in 0.17.0 - # see https://github.com/xd009642/tarpaulin/issues/618 - cargo install --version 0.16.0 cargo-tarpaulin - cd rust - cargo tarpaulin --out Xml - - name: Report coverage - continue-on-error: true - run: bash <(curl -s https://codecov.io/bash) - - # test FFI against the C-Data interface exposed by pyarrow - pyarrow-integration-test: - name: Pyarrow C data interface integration test - runs-on: ubuntu-latest - strategy: - matrix: - rust: [stable] - steps: - - uses: actions/checkout@v2 - with: - submodules: true - - name: Setup Rust toolchain - run: | - rustup toolchain install ${{ matrix.rust }} - rustup default ${{ matrix.rust }} - rustup component add rustfmt clippy - - name: Cache Cargo - uses: actions/cache@v2 - with: - path: /home/runner/.cargo - key: cargo-maturin-cache- - - name: Cache Rust dependencies - uses: actions/cache@v2 - with: - path: /home/runner/target - # this key is not equal because maturin uses different compilation flags. - key: ${{ runner.os }}-${{ matrix.arch }}-target-maturin-cache-${{ matrix.rust }}- - - uses: actions/setup-python@v2 - with: - python-version: '3.7' - - name: Install Python dependencies - run: python -m pip install --upgrade pip setuptools wheel - - name: Run tests - run: | - export CARGO_HOME="/home/runner/.cargo" - export CARGO_TARGET_DIR="/home/runner/target" - - cd rust/arrow-pyarrow-integration-testing - - python -m venv venv - source venv/bin/activate - - pip install maturin==0.8.2 toml==0.10.1 pyarrow==1.0.0 - maturin develop - python -m unittest discover tests - - # test the arrow crate builds against wasm32 in stable rust - wasm32-build: - name: AMD64 Debian 10 Rust ${{ matrix.rust }} test arrow wasm32 - runs-on: ubuntu-latest - strategy: - matrix: - arch: [amd64] - rust: [nightly-2020-11-24] - container: - image: ${{ matrix.arch }}/rust - env: - # Disable full debug symbol generation to speed up CI build - # "1" means line tables only, which is useful for panic tracebacks. - RUSTFLAGS: "-C debuginfo=1" - ARROW_TEST_DATA: /__w/arrow/arrow/testing/data - PARQUET_TEST_DATA: /__w/arrow/arrow/cpp/submodules/parquet-testing/data - steps: - - uses: actions/checkout@v2 - with: - submodules: true - - name: Cache Cargo - uses: actions/cache@v2 - with: - path: /github/home/.cargo - # this key equals the ones on `linux-build-lib` for re-use - key: cargo-cache- - - name: Cache Rust dependencies - uses: actions/cache@v2 - with: - path: /github/home/target - key: ${{ runner.os }}-${{ matrix.arch }}-target-wasm32-cache-${{ matrix.rust }} - - name: Setup Rust toolchain - run: | - rustup toolchain install ${{ matrix.rust }} - rustup override set ${{ matrix.rust }} - rustup component add rustfmt - rustup target add wasm32-unknown-unknown - - name: Build arrow crate - run: | - export CARGO_HOME="/github/home/.cargo" - export CARGO_TARGET_DIR="/github/home/target" - # do not produce debug symbols to keep memory usage down - export RUSTFLAGS="-C debuginfo=0" - cd rust/arrow - cargo build --target wasm32-unknown-unknown