Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ private RecordReader.RecordIterator<InternalRow> createPrimitiveIterator(
rows.add(GenericRow.of(randomRowValues));
}

return getRecordIterator(PRIMITIVE_TYPE, rows, projection);
return getRecordIterator(PRIMITIVE_TYPE, rows, projection, true);
}

@TestTemplate
Expand Down Expand Up @@ -244,7 +244,7 @@ public void testArrayType() throws Exception {
}

RecordReader.RecordIterator<InternalRow> iterator =
getRecordIterator(nestedArrayType, rows);
getRecordIterator(nestedArrayType, rows, null, testMode.equals("per_row"));
try (RootAllocator allocator = new RootAllocator()) {
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedArrayType, allocator);
ArrowBatchConverter arrowWriter = createArrowWriter(iterator, nestedArrayType, vsr);
Expand Down Expand Up @@ -308,7 +308,8 @@ public void testMapType() throws Exception {
expectedMaps.add(map1);
}

RecordReader.RecordIterator<InternalRow> iterator = getRecordIterator(nestedMapType, rows);
RecordReader.RecordIterator<InternalRow> iterator =
getRecordIterator(nestedMapType, rows, null, testMode.equals("per_row"));
try (RootAllocator allocator = new RootAllocator()) {
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedMapType, allocator);
ArrowBatchConverter arrowWriter = createArrowWriter(iterator, nestedMapType, vsr);
Expand Down Expand Up @@ -365,7 +366,11 @@ public void testMapRowType() throws Exception {
InternalRow row3 = GenericRow.of(new GenericMap(map3));

RecordReader.RecordIterator<InternalRow> iterator =
getRecordIterator(nestedMapRowType, Arrays.asList(row1, row2, row3));
getRecordIterator(
nestedMapRowType,
Arrays.asList(row1, row2, row3),
null,
testMode.equals("per_row"));
try (RootAllocator allocator = new RootAllocator()) {
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedMapRowType, allocator);
ArrowBatchConverter arrowWriter = createArrowWriter(iterator, nestedMapRowType, vsr);
Expand Down Expand Up @@ -423,7 +428,8 @@ private void testRowTypeImpl(boolean allNull) throws Exception {
rows.add(GenericRow.of(GenericRow.of(randomRowValues)));
}

RecordReader.RecordIterator<InternalRow> iterator = getRecordIterator(nestedRowType, rows);
RecordReader.RecordIterator<InternalRow> iterator =
getRecordIterator(nestedRowType, rows, null, testMode.equals("per_row"));
try (RootAllocator allocator = new RootAllocator()) {
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedRowType, allocator);
ArrowBatchConverter arrowWriter = createArrowWriter(iterator, nestedRowType, vsr);
Expand Down Expand Up @@ -464,7 +470,8 @@ public void testSliceIntType() throws Exception {
rows.add(GenericRow.of(i));
}

RecordReader.RecordIterator<InternalRow> iterator = getRecordIterator(rowType, rows);
RecordReader.RecordIterator<InternalRow> iterator =
getRecordIterator(rowType, rows, null, true);
try (RootAllocator allocator = new RootAllocator()) {
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(rowType, allocator);
ArrowBatchConverter arrowWriter = createArrowWriter(iterator, rowType, vsr);
Expand Down Expand Up @@ -515,7 +522,7 @@ public void testDvWithSimpleRowType() throws Exception {
int[] projection = readEmpty ? new int[0] : null;
RecordReader.RecordIterator<InternalRow> iterator =
getApplyDeletionFileRecordIterator(
rowType, rows, deleted, Collections.singletonList("pk"), projection);
rowType, rows, deleted, Collections.singletonList("pk"), projection, true);
if (readEmpty) {
testReadEmpty(iterator, numRows - deleted.size());
} else {
Expand Down Expand Up @@ -588,7 +595,12 @@ public void testDvWithArrayType() throws Exception {
Set<Integer> deleted = getDeletedPks(numRows);
RecordReader.RecordIterator<InternalRow> iterator =
getApplyDeletionFileRecordIterator(
nestedArrayType, rows, deleted, Collections.singletonList("pk"), null);
nestedArrayType,
rows,
deleted,
Collections.singletonList("pk"),
null,
testMode.equals("per_row"));
try (RootAllocator allocator = new RootAllocator()) {
Set<Integer> expectedPks = getExpectedPks(numRows, deleted);
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedArrayType, allocator);
Expand Down Expand Up @@ -666,7 +678,12 @@ public void testDvWithMapType() throws Exception {
Set<Integer> deleted = getDeletedPks(numRows);
RecordReader.RecordIterator<InternalRow> iterator =
getApplyDeletionFileRecordIterator(
nestedMapType, rows, deleted, Collections.singletonList("pk"), null);
nestedMapType,
rows,
deleted,
Collections.singletonList("pk"),
null,
testMode.equals("per_row"));
try (RootAllocator allocator = new RootAllocator()) {
Set<Integer> expectedPks = getExpectedPks(numRows, deleted);
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedMapType, allocator);
Expand Down Expand Up @@ -735,7 +752,12 @@ public void testDvWithRowType() throws Exception {
Set<Integer> deleted = getDeletedPks(numRows);
RecordReader.RecordIterator<InternalRow> iterator =
getApplyDeletionFileRecordIterator(
nestedRowType, rows, deleted, Collections.singletonList("pk"), null);
nestedRowType,
rows,
deleted,
Collections.singletonList("pk"),
null,
testMode.equals("per_row"));
try (RootAllocator allocator = new RootAllocator()) {
Set<Integer> expectedPks = getExpectedPks(numRows, deleted);
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(nestedRowType, allocator);
Expand Down Expand Up @@ -803,14 +825,15 @@ private void testReadEmpty(
}

private RecordReader.RecordIterator<InternalRow> getRecordIterator(
RowType rowType, List<InternalRow> rows) throws Exception {
return getRecordIterator(rowType, rows, null);
}

private RecordReader.RecordIterator<InternalRow> getRecordIterator(
RowType rowType, List<InternalRow> rows, @Nullable int[] projection) throws Exception {
RowType rowType,
List<InternalRow> rows,
@Nullable int[] projection,
boolean canTestParquet)
throws Exception {
Map<String, String> options = new HashMap<>();
options.put(CoreOptions.FILE_FORMAT.key(), RND.nextBoolean() ? "orc" : "parquet");
options.put(
CoreOptions.FILE_FORMAT.key(),
canTestParquet && RND.nextBoolean() ? "parquet" : "orc");
FileStoreTable table = createFileStoreTable(rowType, Collections.emptyList(), options);

StreamTableWrite write = table.newStreamWriteBuilder().newWrite();
Expand All @@ -832,12 +855,15 @@ private RecordReader.RecordIterator<InternalRow> getApplyDeletionFileRecordItera
List<GenericRow> rows,
Set<Integer> deletedPks,
List<String> primaryKeys,
@Nullable int[] projection)
@Nullable int[] projection,
boolean canTestParquet)
throws Exception {
Map<String, String> options = new HashMap<>();
options.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
options.put(CoreOptions.BUCKET.key(), "1");
options.put(CoreOptions.FILE_FORMAT.key(), RND.nextBoolean() ? "orc" : "parquet");
options.put(
CoreOptions.FILE_FORMAT.key(),
canTestParquet && RND.nextBoolean() ? "parquet" : "orc");
FileStoreTable table = createFileStoreTable(rowType, primaryKeys, options);

StreamTableWrite write = table.newStreamWriteBuilder().newWrite();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,13 +383,14 @@ public ColumnarRowIterator readBatch() throws IOException {

/** Advances to the next batch of rows. Returns false if there are no more. */
private boolean nextBatch(ParquetReaderBatch batch) throws IOException {
if (rowsReturned >= totalRowCount) {
return false;
}

for (WritableColumnVector v : batch.writableVectors) {
v.reset();
}
batch.columnarBatch.setNumRows(0);
if (rowsReturned >= totalRowCount) {
return false;
}
if (rowsReturned == totalCountLoadedSoFar) {
readNextRowGroup();
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.format.parquet.position;

import javax.annotation.Nullable;

/** To represent struct's position in repeated type. */
public class RowPosition {
@Nullable private final boolean[] isNull;
private final int positionsCount;

public RowPosition(boolean[] isNull, int positionsCount) {
this.isNull = isNull;
this.positionsCount = positionsCount;
}

public boolean[] getIsNull() {
return isNull;
}

public int getPositionsCount() {
return positionsCount;
}
}
Loading
Loading