From 347ad3977322772a6d79755f945e4041628af1ac Mon Sep 17 00:00:00 2001 From: zhen wang Date: Mon, 31 Jul 2023 17:25:19 +0800 Subject: [PATCH 1/8] support offset/length and filter in scan option --- cpp/src/arrow/dataset/file_parquet.cc | 30 ++++++++++++++ cpp/src/arrow/dataset/file_parquet.h | 1 + cpp/src/arrow/dataset/scanner.cc | 16 ++++++++ cpp/src/arrow/dataset/scanner.h | 16 ++++++++ java/dataset/pom.xml | 2 +- java/dataset/src/main/cpp/jni_wrapper.cc | 40 +++++++++++++++++++ .../apache/arrow/dataset/jni/JniWrapper.java | 16 ++++++++ .../arrow/dataset/jni/NativeDataset.java | 10 +++++ .../arrow/dataset/scanner/ScanOptions.java | 22 ++++++++++ 9 files changed, 152 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 3afe4ec85cf..401f05da5a2 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -595,6 +595,10 @@ Result ParquetFileFormat::ScanBatchesAsync( ARROW_ASSIGN_OR_RAISE(row_groups, parquet_fragment->FilterRowGroups(options->filter)); pre_filtered = true; if (row_groups.empty()) return MakeEmptyGenerator>(); + if (options->start_offset != kDefaultStartOffset) { + ARROW_ASSIGN_OR_RAISE(row_groups, + parquet_fragment->FilterRangeRowGroups(options->start_offset, options->length)); + } } // Open the reader and pay the real IO cost. auto make_generator = @@ -607,6 +611,10 @@ Result ParquetFileFormat::ScanBatchesAsync( // row groups were not already filtered; do this now ARROW_ASSIGN_OR_RAISE(row_groups, parquet_fragment->FilterRowGroups(options->filter)); + if (options->start_offset != kDefaultStartOffset) { + ARROW_ASSIGN_OR_RAISE(row_groups, + parquet_fragment->FilterRangeRowGroups(options->start_offset, options->length)); + } if (row_groups.empty()) return MakeEmptyGenerator>(); } ARROW_ASSIGN_OR_RAISE(auto column_projection, @@ -881,6 +889,28 @@ Result> ParquetFileFragment::FilterRowGroups( return row_groups; } +Result> ParquetFileFragment::FilterRangeRowGroups( + int64_t start_offset, int64_t length) { + std::vector row_groups; + for (int row_group : *row_groups_) { + auto rg_metadata = metadata_->RowGroup(row_group); + std::shared_ptr cc0 = rg_metadata->ColumnChunk(0); + int64_t r_start = cc0->data_page_offset(); + if (cc0->has_dictionary_page() && r_start > cc0->dictionary_page_offset()) { + r_start = cc0->dictionary_page_offset(); + } + int64_t r_bytes = 0L; + for (int col_id = 0; col_id < rg_metadata->num_columns(); col_id++) { + r_bytes += rg_metadata->ColumnChunk(col_id)->total_compressed_size(); + } + int64_t midpoint = r_start + r_bytes / 2; + if (midpoint >= start_offset && midpoint < (start_offset + length)) { + row_groups.push_back(row_group); + } + } + return row_groups; +} + Result> ParquetFileFragment::TestRowGroups( compute::Expression predicate) { auto lock = physical_schema_mutex_.Lock(); diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index f527ce5d70a..4a84f5b6c8a 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -194,6 +194,7 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { /// Return a filtered subset of row group indices. Result> FilterRowGroups(compute::Expression predicate); + Result> FilterRangeRowGroups(int64_t start_offset, int64_t length); /// Simplify the predicate against the statistics of each row group. Result> TestRowGroups(compute::Expression predicate); /// Try to count rows matching the predicate using metadata. Expects diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 18981d14519..f8168eac428 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -935,6 +935,22 @@ Status ScannerBuilder::BatchSize(int64_t batch_size) { return Status::OK(); } +Status ScannerBuilder::StartOffset(int64_t start_offset) { + if (start_offset <= 0) { + return Status::Invalid("StartOffset must be greater than 0, got ", start_offset); + } + scan_options_->start_offset = start_offset; + return Status::OK(); +} + +Status ScannerBuilder::Length(int64_t length){ + if (length <= 0) { + return Status::Invalid("Length must be greater than 0, got ", length); + } + scan_options_->length = length; + return Status::OK(); +} + Status ScannerBuilder::BatchReadahead(int32_t batch_readahead) { if (batch_readahead < 0) { return Status::Invalid("BatchReadahead must be greater than or equal 0, got ", diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 4479158ff20..bae353d6a1a 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -55,6 +55,7 @@ constexpr int64_t kDefaultBatchSize = 1 << 17; // 128Ki rows constexpr int32_t kDefaultBatchReadahead = 16; constexpr int32_t kDefaultFragmentReadahead = 4; constexpr int32_t kDefaultBytesReadahead = 1 << 25; // 32MiB +constexpr int64_t kDefaultStartOffset = -1; /// Scan-specific options, which can be changed between scans of the same dataset. struct ARROW_DS_EXPORT ScanOptions { @@ -136,6 +137,9 @@ struct ARROW_DS_EXPORT ScanOptions { /// Parameters which control when the plan should pause for a slow consumer acero::BackpressureOptions backpressure = acero::BackpressureOptions::DefaultBackpressure(); + + int64_t start_offset = kDefaultStartOffset; + int64_t length; }; /// Scan-specific options, which can be changed between scans of the same dataset. @@ -496,6 +500,18 @@ class ARROW_DS_EXPORT ScannerBuilder { /// Schema. Status Filter(const compute::Expression& filter); + /// \brief set the start offset of the scanner + /// \param[in] start_offset start offset for the scan + /// + /// \return Failure if the start_offset is not greater than 0 + Status StartOffset(int64_t start_offset); + + /// \brief set the length of the scanner + /// \param[in] length length for the scan + /// + /// \return Failure if the length is not greater than 0 + Status Length(int64_t length); + /// \brief Indicate if the Scanner should make use of the available /// ThreadPool found in ScanOptions; Status UseThreads(bool use_threads = true); diff --git a/java/dataset/pom.xml b/java/dataset/pom.xml index b533a173352..4e1112c4af2 100644 --- a/java/dataset/pom.xml +++ b/java/dataset/pom.xml @@ -114,7 +114,7 @@ org.apache.arrow.orc arrow-orc - ${project.version} + 12.0.0 test diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index 49e0f172090..6402aa752e7 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -528,6 +528,46 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createScann JNI_METHOD_END(-1L) } +/* + * Class: org_apache_arrow_dataset_jni_JniWrapper + * Method: createRangeScanner + * Signature: (J[Ljava/lang/String;Ljava/lang/String;JJJJ)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createRangeScanner( + JNIEnv* env, jobject, jlong dataset_id, jobjectArray columns, jstring filter, + jlong batch_size, jlong start_offset, jlong length, jlong memory_pool_id) { + JNI_METHOD_START + arrow::MemoryPool* pool = reinterpret_cast(memory_pool_id); + if (pool == nullptr) { + JniThrow("Memory pool does not exist or has been closed"); + } + std::shared_ptr dataset = + RetrieveNativeInstance(dataset_id); + std::shared_ptr scanner_builder = + JniGetOrThrow(dataset->NewScan()); + JniAssertOkOrThrow(scanner_builder->Pool(pool)); + if (columns != nullptr) { + std::vector column_vector = ToStringVector(env, columns); + JniAssertOkOrThrow(scanner_builder->Project(column_vector)); + } + if (filter != nullptr) { + // TODO: PR 32625 OR PR 14287 +// arrow::compute::Expression filter_expr = JniGetOrThrow( +// arrow::compute::Expression::FromString(JStringToCString(env, filter))); +// JniAssertOkOrThrow(scanner_builder->Filter(filter_expr)); + } + JniAssertOkOrThrow(scanner_builder->StartOffset(start_offset)); + JniAssertOkOrThrow(scanner_builder->Length(length)); + JniAssertOkOrThrow(scanner_builder->BatchSize(batch_size)); + + auto scanner = JniGetOrThrow(scanner_builder->Finish()); + std::shared_ptr scanner_adaptor = + JniGetOrThrow(DisposableScannerAdaptor::Create(scanner)); + jlong id = CreateNativeRef(scanner_adaptor); + return id; + JNI_METHOD_END(-1L) +} + /* * Class: org_apache_arrow_dataset_jni_JniWrapper * Method: closeScanner diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java index a7df5be42f1..3f44cc7f99d 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java @@ -82,6 +82,22 @@ private JniWrapper() { public native long createScanner(long datasetId, String[] columns, ByteBuffer substraitProjection, ByteBuffer substraitFilter, long batchSize, long memoryPool); + /** + * Create Scanner from a Dataset and get the native pointer of the Dataset. + * @param datasetId the native pointer of the arrow::dataset::Dataset instance. + * @param columns desired column names. + * Columns not in this list will not be emitted when performing scan operation. Null equals + * to "all columns". + * @param filter the filter to apply on the scan + * @param batchSize batch size of scanned record batches. + * @param startOffset start offset of the range scan + * @param length length of the range scan + * @param memoryPool identifier of memory pool used in the native scanner. + * @return the native pointer of the arrow::dataset::Scanner instance. + */ + public native long createRangeScanner(long datasetId, String[] columns, String filter, long batchSize, + long startOffset, long length, long memoryPool); + /** * Get a serialized schema from native instance of a Scanner. * diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java index d9abad9971c..5fd54566126 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java @@ -49,6 +49,16 @@ public synchronized NativeScanner newScan(ScanOptions options) { return new NativeScanner(context, scannerId); } + public synchronized NativeScanner newRangeScan(ScanOptions options) { + if (closed) { + throw new NativeInstanceReleasedException(); + } + long scannerId = JniWrapper.get().createRangeScanner(datasetId, options.getColumns().orElse(null), + options.getFilter().orElse(null), options.getBatchSize(), options.getStartOffset(), options.getLength(), + context.getMemoryPool().getNativeInstanceId()); + return new NativeScanner(context, scannerId); + } + @Override public synchronized void close() { if (closed) { diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java index 995d05ac3b3..c81098c0121 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java @@ -27,7 +27,10 @@ */ public class ScanOptions { private final long batchSize; + private final long startOffset; + private final long length; private final Optional columns; + private final Optional filter; private final Optional substraitProjection; private final Optional substraitFilter; @@ -56,11 +59,18 @@ public ScanOptions(String[] columns, long batchSize) { * Only columns present in the Array will be scanned. */ public ScanOptions(long batchSize, Optional columns) { + this(batchSize, columns, -1, -1, Optional.empty()); + } + + public ScanOptions(long batchSize, Optional columns, long startOffset, long length, Optional filter) { Preconditions.checkNotNull(columns); this.batchSize = batchSize; this.columns = columns; this.substraitProjection = Optional.empty(); this.substraitFilter = Optional.empty(); + this.startOffset = startOffset; + this.length = length; + this.filter = filter; } public ScanOptions(long batchSize) { @@ -71,6 +81,18 @@ public Optional getColumns() { return columns; } + public Optional getFilter() { + return filter; + } + + public long getStartOffset() { + return startOffset; + } + + public long getLength() { + return length; + } + public long getBatchSize() { return batchSize; } From c414990cc17749f30e57226109f4cfb8f8031a49 Mon Sep 17 00:00:00 2001 From: zhen wang Date: Tue, 1 Aug 2023 11:45:47 +0800 Subject: [PATCH 2/8] remove accidental change --- java/dataset/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/dataset/pom.xml b/java/dataset/pom.xml index 4e1112c4af2..b533a173352 100644 --- a/java/dataset/pom.xml +++ b/java/dataset/pom.xml @@ -114,7 +114,7 @@ org.apache.arrow.orc arrow-orc - 12.0.0 + ${project.version} test From d998fa77bd41829602e226b0fc0fe5897c32f7fc Mon Sep 17 00:00:00 2001 From: zhen wang Date: Tue, 1 Aug 2023 16:41:57 +0800 Subject: [PATCH 3/8] address comments filter on prefilter result --- cpp/src/arrow/dataset/file_parquet.cc | 8 ++++---- cpp/src/arrow/dataset/file_parquet.h | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 401f05da5a2..8a85ec39cdb 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -597,7 +597,7 @@ Result ParquetFileFormat::ScanBatchesAsync( if (row_groups.empty()) return MakeEmptyGenerator>(); if (options->start_offset != kDefaultStartOffset) { ARROW_ASSIGN_OR_RAISE(row_groups, - parquet_fragment->FilterRangeRowGroups(options->start_offset, options->length)); + parquet_fragment->FilterRangeRowGroups(row_groups, options->start_offset, options->length)); } } // Open the reader and pay the real IO cost. @@ -613,7 +613,7 @@ Result ParquetFileFormat::ScanBatchesAsync( parquet_fragment->FilterRowGroups(options->filter)); if (options->start_offset != kDefaultStartOffset) { ARROW_ASSIGN_OR_RAISE(row_groups, - parquet_fragment->FilterRangeRowGroups(options->start_offset, options->length)); + parquet_fragment->FilterRangeRowGroups(row_groups, options->start_offset, options->length)); } if (row_groups.empty()) return MakeEmptyGenerator>(); } @@ -890,9 +890,9 @@ Result> ParquetFileFragment::FilterRowGroups( } Result> ParquetFileFragment::FilterRangeRowGroups( - int64_t start_offset, int64_t length) { + std::vector filtered_row_groups, int64_t start_offset, int64_t length) { std::vector row_groups; - for (int row_group : *row_groups_) { + for (int row_group : filtered_row_groups) { auto rg_metadata = metadata_->RowGroup(row_group); std::shared_ptr cc0 = rg_metadata->ColumnChunk(0); int64_t r_start = cc0->data_page_offset(); diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 4a84f5b6c8a..8dead9388ca 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -194,7 +194,7 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { /// Return a filtered subset of row group indices. Result> FilterRowGroups(compute::Expression predicate); - Result> FilterRangeRowGroups(int64_t start_offset, int64_t length); + Result> FilterRangeRowGroups(std::vector pre_filter, int64_t start_offset, int64_t length); /// Simplify the predicate against the statistics of each row group. Result> TestRowGroups(compute::Expression predicate); /// Try to count rows matching the predicate using metadata. Expects From fdd145708e22460fe94d09328d91722c9d881d30 Mon Sep 17 00:00:00 2001 From: zhen wang Date: Wed, 2 Aug 2023 14:30:28 +0800 Subject: [PATCH 4/8] address styles --- cpp/src/arrow/dataset/file_parquet.cc | 8 ++++---- cpp/src/arrow/dataset/file_parquet.h | 3 ++- cpp/src/arrow/dataset/scanner.cc | 2 +- .../java/org/apache/arrow/dataset/jni/JniWrapper.java | 2 +- .../org/apache/arrow/dataset/jni/NativeDataset.java | 5 +++++ .../org/apache/arrow/dataset/scanner/ScanOptions.java | 11 ++++++++++- 6 files changed, 23 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 8a85ec39cdb..7fde689000c 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -596,8 +596,8 @@ Result ParquetFileFormat::ScanBatchesAsync( pre_filtered = true; if (row_groups.empty()) return MakeEmptyGenerator>(); if (options->start_offset != kDefaultStartOffset) { - ARROW_ASSIGN_OR_RAISE(row_groups, - parquet_fragment->FilterRangeRowGroups(row_groups, options->start_offset, options->length)); + ARROW_ASSIGN_OR_RAISE(row_groups, parquet_fragment->FilterRangeRowGroups( + row_groups, options->start_offset, options->length)); } } // Open the reader and pay the real IO cost. @@ -612,8 +612,8 @@ Result ParquetFileFormat::ScanBatchesAsync( ARROW_ASSIGN_OR_RAISE(row_groups, parquet_fragment->FilterRowGroups(options->filter)); if (options->start_offset != kDefaultStartOffset) { - ARROW_ASSIGN_OR_RAISE(row_groups, - parquet_fragment->FilterRangeRowGroups(row_groups, options->start_offset, options->length)); + ARROW_ASSIGN_OR_RAISE(row_groups, parquet_fragment->FilterRangeRowGroups( + row_groups, options->start_offset, options->length)); } if (row_groups.empty()) return MakeEmptyGenerator>(); } diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 8dead9388ca..271c74c3ab3 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -194,7 +194,8 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { /// Return a filtered subset of row group indices. Result> FilterRowGroups(compute::Expression predicate); - Result> FilterRangeRowGroups(std::vector pre_filter, int64_t start_offset, int64_t length); + Result> FilterRangeRowGroups( + std::vector pre_filter, int64_t start_offset, int64_t length); /// Simplify the predicate against the statistics of each row group. Result> TestRowGroups(compute::Expression predicate); /// Try to count rows matching the predicate using metadata. Expects diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index f8168eac428..b689185fc07 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -943,7 +943,7 @@ Status ScannerBuilder::StartOffset(int64_t start_offset) { return Status::OK(); } -Status ScannerBuilder::Length(int64_t length){ +Status ScannerBuilder::Length(int64_t length) { if (length <= 0) { return Status::Invalid("Length must be greater than 0, got ", length); } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java index 3f44cc7f99d..3aa3b5a9fe6 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java @@ -96,7 +96,7 @@ public native long createScanner(long datasetId, String[] columns, ByteBuffer su * @return the native pointer of the arrow::dataset::Scanner instance. */ public native long createRangeScanner(long datasetId, String[] columns, String filter, long batchSize, - long startOffset, long length, long memoryPool); + long startOffset, long length, long memoryPool); /** * Get a serialized schema from native instance of a Scanner. diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java index 5fd54566126..907f160c4ec 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java @@ -49,6 +49,11 @@ public synchronized NativeScanner newScan(ScanOptions options) { return new NativeScanner(context, scannerId); } + /** + * Create a new scan with range options. + * @param options scan options with offset and length + * @return newly created native scanner + */ public synchronized NativeScanner newRangeScan(ScanOptions options) { if (closed) { throw new NativeInstanceReleasedException(); diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java index c81098c0121..ac12001bff1 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java @@ -62,7 +62,16 @@ public ScanOptions(long batchSize, Optional columns) { this(batchSize, columns, -1, -1, Optional.empty()); } - public ScanOptions(long batchSize, Optional columns, long startOffset, long length, Optional filter) { + /** + * Constructor. + * @param batchSize Maximum row number each batch returned + * @param columns (Optional) Projected columns + * @param startOffset scan range start offset + * @param length scan range length + * @param filter (Optional) filter to apply with the scan + */ + public ScanOptions(long batchSize, Optional columns, + long startOffset, long length, Optional filter) { Preconditions.checkNotNull(columns); this.batchSize = batchSize; this.columns = columns; From bbc83b259019eedf4eadfcd0319acd0022b3e40f Mon Sep 17 00:00:00 2001 From: zhen wang Date: Wed, 2 Aug 2023 19:58:31 +0800 Subject: [PATCH 5/8] adding a test case --- cpp/src/arrow/dataset/file_parquet_test.cc | 23 ++++++++++++++++++++++ cpp/src/arrow/dataset/test_util_internal.h | 5 +++++ 2 files changed, 28 insertions(+) diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 84d4342a25e..e2211fd8a6f 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -607,6 +607,29 @@ TEST_P(TestParquetFileFormatScan, PredicatePushdown) { kNumRowGroups - 5); } +TEST_P(TestParquetFileFormatScan, RangeScan) { + constexpr int64_t kNumRowGroups = 16; + constexpr int64_t kTotalNumRows = kNumRowGroups * (kNumRowGroups + 1) / 2; + + auto reader = ArithmeticDatasetFixture::GetRecordBatchReader(kNumRowGroups); + auto source = GetFileSource(reader.get()); + + SetSchema(reader->schema()->fields()); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); + + SetFilter(literal(true)); + SetRange(0, std::numeric_limits::max()); + CountRowsAndBatchesInScan(fragment, kTotalNumRows, kNumRowGroups); + + SetFilter(literal(true)); + SetRange(0, 0); + CountRowsAndBatchesInScan(fragment, 0, 0); + + SetFilter(literal(true)); + SetRange(0, 2048); + CountRowsAndBatchesInScan(fragment, 6, 3); +} + TEST_P(TestParquetFileFormatScan, PredicatePushdownRowGroupFragments) { constexpr int64_t kNumRowGroups = 16; diff --git a/cpp/src/arrow/dataset/test_util_internal.h b/cpp/src/arrow/dataset/test_util_internal.h index de0519afac9..2c34b2c9419 100644 --- a/cpp/src/arrow/dataset/test_util_internal.h +++ b/cpp/src/arrow/dataset/test_util_internal.h @@ -510,6 +510,11 @@ class FileFormatFixtureMixin : public ::testing::Test { ASSERT_OK_AND_ASSIGN(opts_->filter, filter.Bind(*opts_->dataset_schema)); } + void SetRange(int64_t start_offset, int64_t length) { + opts_->start_offset = start_offset; + opts_->length = length; + } + void Project(std::vector names) { ASSERT_OK_AND_ASSIGN(auto projection, ProjectionDescr::FromNames( std::move(names), *opts_->dataset_schema)); From c8f68b925947a35e38873c3aedd013256376d03a Mon Sep 17 00:00:00 2001 From: zhen wang Date: Thu, 24 Aug 2023 13:51:17 +0800 Subject: [PATCH 6/8] add doc string --- cpp/src/arrow/dataset/scanner.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index bae353d6a1a..d42d0833a56 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -138,7 +138,12 @@ struct ARROW_DS_EXPORT ScanOptions { acero::BackpressureOptions backpressure = acero::BackpressureOptions::DefaultBackpressure(); + /// Parameters which control where the scan starts + /// This is used in reading a big file by splitting into multiple scanners int64_t start_offset = kDefaultStartOffset; + + /// Parameters which control how long the scanner should read + /// This is used in reading a big file by splitting into multiple scanners int64_t length; }; From 94366a7952b6221a4aeecdbcc4c78a85c2038d77 Mon Sep 17 00:00:00 2001 From: zhen wang Date: Wed, 13 Dec 2023 13:01:37 +0800 Subject: [PATCH 7/8] address comments --- cpp/src/arrow/dataset/scanner.h | 8 ++++---- java/dataset/src/main/cpp/jni_wrapper.cc | 5 +---- .../java/org/apache/arrow/dataset/jni/JniWrapper.java | 6 +++--- 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index d42d0833a56..ca91abdbe93 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -138,12 +138,12 @@ struct ARROW_DS_EXPORT ScanOptions { acero::BackpressureOptions backpressure = acero::BackpressureOptions::DefaultBackpressure(); - /// Parameters which control where the scan starts - /// This is used in reading a big file by splitting into multiple scanners + /// Parameters which control where the scan starts. + /// This is used in reading a big file by splitting into multiple scanners. int64_t start_offset = kDefaultStartOffset; - /// Parameters which control how long the scanner should read - /// This is used in reading a big file by splitting into multiple scanners + /// Parameters which control how long the scanner should read. + /// This is used in reading a big file by splitting into multiple scanners. int64_t length; }; diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index 6402aa752e7..f43b0aa2b1e 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -551,10 +551,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createRange JniAssertOkOrThrow(scanner_builder->Project(column_vector)); } if (filter != nullptr) { - // TODO: PR 32625 OR PR 14287 -// arrow::compute::Expression filter_expr = JniGetOrThrow( -// arrow::compute::Expression::FromString(JStringToCString(env, filter))); -// JniAssertOkOrThrow(scanner_builder->Filter(filter_expr)); + // TODO: PR 32625 OR PR 14287. } JniAssertOkOrThrow(scanner_builder->StartOffset(start_offset)); JniAssertOkOrThrow(scanner_builder->Length(length)); diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java index 3aa3b5a9fe6..dc3ca0281b0 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java @@ -88,10 +88,10 @@ public native long createScanner(long datasetId, String[] columns, ByteBuffer su * @param columns desired column names. * Columns not in this list will not be emitted when performing scan operation. Null equals * to "all columns". - * @param filter the filter to apply on the scan + * @param filter the filter to apply on the scan. * @param batchSize batch size of scanned record batches. - * @param startOffset start offset of the range scan - * @param length length of the range scan + * @param startOffset start offset of the range scan. + * @param length length of the range scan. * @param memoryPool identifier of memory pool used in the native scanner. * @return the native pointer of the arrow::dataset::Scanner instance. */ From 1500a182a2d5035c94419ecd5edee621468118ce Mon Sep 17 00:00:00 2001 From: zhen wang Date: Thu, 14 Dec 2023 12:46:19 +0800 Subject: [PATCH 8/8] address comments --- java/dataset/src/main/cpp/jni_wrapper.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index f43b0aa2b1e..09eae93ddd2 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -551,7 +551,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createRange JniAssertOkOrThrow(scanner_builder->Project(column_vector)); } if (filter != nullptr) { - // TODO: PR 32625 OR PR 14287. + // TODO: issue 32625 https://github.com/apache/arrow/issues/32625 } JniAssertOkOrThrow(scanner_builder->StartOffset(start_offset)); JniAssertOkOrThrow(scanner_builder->Length(length));