Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.paimon.utils.Filter;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -125,18 +124,6 @@ default ReadBuilder withFilter(List<Predicate> predicates) {
*/
ReadBuilder withProjection(int[] projection);

/** Apply projection to the reader, only support top level projection. */
@Deprecated
default ReadBuilder withProjection(int[][] projection) {
if (projection == null) {
return this;
}
if (Arrays.stream(projection).anyMatch(arr -> arr.length > 1)) {
throw new IllegalStateException("Not support nested projection");
}
return withProjection(Arrays.stream(projection).mapToInt(arr -> arr[0]).toArray());
}

/** the row number pushed down. */
ReadBuilder withLimit(int limit);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,15 +243,18 @@ private List<DataField> readDataFields(List<DataField> allDataFields) {
.filter(f -> f.id() == dataField.id())
.findFirst()
.ifPresent(
field ->
readDataFields.add(
dataField.newType(
pruneDataType(
field.type(), dataField.type()))));
field -> {
DataType prunedType =
pruneDataType(field.type(), dataField.type());
if (prunedType != null) {
readDataFields.add(dataField.newType(prunedType));
}
});
}
return readDataFields;
}

@Nullable
private DataType pruneDataType(DataType readType, DataType dataType) {
switch (readType.getTypeRoot()) {
case ROW:
Expand All @@ -261,25 +264,40 @@ private DataType pruneDataType(DataType readType, DataType dataType) {
for (DataField rf : r.getFields()) {
if (d.containsField(rf.id())) {
DataField df = d.getField(rf.id());
newFields.add(df.newType(pruneDataType(rf.type(), df.type())));
DataType newType = pruneDataType(rf.type(), df.type());
if (newType == null) {
continue;
}
newFields.add(df.newType(newType));
}
}
if (newFields.isEmpty()) {
// When all fields are pruned, we should not return an empty row type
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add @nullable to pruneDataType

}
return d.copy(newFields);
case MAP:
return ((MapType) dataType)
.newKeyValueType(
pruneDataType(
((MapType) readType).getKeyType(),
((MapType) dataType).getKeyType()),
pruneDataType(
((MapType) readType).getValueType(),
((MapType) dataType).getValueType()));
DataType keyType =
pruneDataType(
((MapType) readType).getKeyType(),
((MapType) dataType).getKeyType());
DataType valueType =
pruneDataType(
((MapType) readType).getValueType(),
((MapType) dataType).getValueType());
if (keyType == null || valueType == null) {
return null;
}
return ((MapType) dataType).newKeyValueType(keyType, valueType);
case ARRAY:
return ((ArrayType) dataType)
.newElementType(
pruneDataType(
((ArrayType) readType).getElementType(),
((ArrayType) dataType).getElementType()));
DataType elementType =
pruneDataType(
((ArrayType) readType).getElementType(),
((ArrayType) dataType).getElementType());
if (elementType == null) {
return null;
}
return ((ArrayType) dataType).newElementType(elementType);
default:
return dataType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public void testTagIncremental() throws Exception {
GenericRow.of(fromString("+I"), 1, 6, 1));

// read tag1 tag3 projection
result = read(table, new int[][] {{1}}, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3"));
result = read(table, new int[] {1}, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG3"));
assertThat(result).containsExactlyInAnyOrder(GenericRow.of(2), GenericRow.of(6));

assertThatThrownBy(() -> read(table, Pair.of(INCREMENTAL_BETWEEN, "TAG2,TAG1")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ protected List<InternalRow> read(Table table, Pair<ConfigOption<?>, String>... d

protected List<InternalRow> read(
Table table,
@Nullable int[][] projection,
@Nullable int[] projection,
Pair<ConfigOption<?>, String>... dynamicOptions)
throws Exception {
Map<String, String> options = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void before() throws Exception {

@Test
public void testBucketsTable() throws Exception {
assertThat(read(bucketsTable, new int[][] {{0}, {1}, {2}, {4}}))
assertThat(read(bucketsTable, new int[] {0, 1, 2, 4}))
.containsExactlyInAnyOrder(
GenericRow.of(BinaryString.fromString("[1]"), 0, 2L, 2L),
GenericRow.of(BinaryString.fromString("[2]"), 0, 2L, 2L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testPartitionRecordCount() throws Exception {
expectedRow.add(GenericRow.of(BinaryString.fromString("[3]"), 1L));

// Only read partition and record count, record size may not stable.
List<InternalRow> result = read(partitionsTable, new int[][] {{0}, {1}});
List<InternalRow> result = read(partitionsTable, new int[] {0, 1});
assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
}

Expand All @@ -105,7 +105,7 @@ public void testPartitionTimeTravel() throws Exception {
read(
partitionsTable.copy(
Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), "1")),
new int[][] {{0}, {1}});
new int[] {0, 1});
assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
}

Expand All @@ -117,7 +117,7 @@ public void testPartitionValue() throws Exception {
expectedRow.add(GenericRow.of(BinaryString.fromString("[2]"), 1L, 1L));
expectedRow.add(GenericRow.of(BinaryString.fromString("[3]"), 1L, 1L));

List<InternalRow> result = read(partitionsTable, new int[][] {{0}, {1}, {3}});
List<InternalRow> result = read(partitionsTable, new int[] {0, 1, 3});
assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
}
}
Loading
Loading