Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.math.BigDecimal;
import java.nio.file.Files;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
Expand Down Expand Up @@ -172,6 +176,34 @@ public void showView() {
sql("DROP VIEW %s", "test");
}

@TestTemplate
public void testDecimalColumn() {
int rows = 100;
String tableName = tableName("decimal_table1");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we use the class level field this.tableName instead?

this.tableName = tableName("decimal_table1");

then we will have the @AfterEach cleanup
sql("DROP TABLE IF EXISTS %s", tableName)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we switch to class level field this.tableName, then we don't need the TRUNCATE TABLE below.

// Create table with a single DECIMAL column
sql("CREATE TABLE IF NOT EXISTS %s (amount DECIMAL(17, 5)) USING iceberg", tableName);
sql("TRUNCATE TABLE %s", tableName);

// Build and execute the INSERT statement
sql(
"INSERT INTO %s VALUES %s",
tableName,
IntStream.range(0, rows).mapToObj(i -> "(" + i + ")").collect(Collectors.joining(",")));

// Build expected results
List<Object[]> expected =
IntStream.range(0, rows)
.mapToObj(i -> new Object[] {new BigDecimal(i + ".00000")})
.collect(Collectors.toList());

// Query and validate
List<Object[]> actual = sql("SELECT * FROM %s", tableName);
for (int i = 0; i < expected.size(); i++) {
System.out.println(actual.get(i)[0]);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this?

assertEquals("Mismatch at row " + i, expected, actual);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you comparing row by row?

assertEquals("Mismatch at row " + i, expected.get(i)[0], actual.get(i)[0]);

}
}

private Table getTable(String name) {
return validationCatalog.loadTable(TableIdentifier.of("default", name));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void reset() {
}

this.importer = new CometSchemaImporter(new RootAllocator());
this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false);
this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, true, false);
this.initialized = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class CometConstantColumnReader<T> extends CometColumnReader {
super(field);
// use delegate to set constant value on the native side to be consumed by native execution.
setDelegate(
new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), false));
new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), true));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private static class DeleteColumnReader extends MetadataColumnReader {
DataTypes.BooleanType,
TypeUtil.convertToParquet(
new StructField("_deleted", DataTypes.BooleanType, false, Metadata.empty())),
false /* useDecimal128 = false */,
true /* useDecimal128 = true */,
false /* isConstant */);
this.isDeleted = new boolean[0];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@ private static class PositionColumnReader extends MetadataColumnReader {

PositionColumnReader(ColumnDescriptor descriptor) {
super(
DataTypes.LongType,
descriptor,
false /* useDecimal128 = false */,
false /* isConstant */);
DataTypes.LongType, descriptor, true /* useDecimal128 = true */, false /* isConstant */);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.math.BigDecimal;
import java.nio.file.Files;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
Expand Down Expand Up @@ -172,6 +176,34 @@ public void showView() {
sql("DROP VIEW %s", "test");
}

@TestTemplate
public void testDecimalColumn() {
int rows = 100;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to try some other values - The range of integers between 513 and 1024 is likely to show a mismatch between the byte array base dBigInteger/BigDecimal encoding and the 128 bit native integer.

String tableName = tableName("decimal_table1");
// Create table with a single DECIMAL column
sql("CREATE TABLE IF NOT EXISTS %s (amount DECIMAL(17, 5)) USING iceberg", tableName);
sql("TRUNCATE TABLE %s", tableName);

// Build and execute the INSERT statement
sql(
"INSERT INTO %s VALUES %s",
tableName,
IntStream.range(0, rows).mapToObj(i -> "(" + i + ")").collect(Collectors.joining(",")));

// Build expected results
List<Object[]> expected =
IntStream.range(0, rows)
.mapToObj(i -> new Object[] {new BigDecimal(i + ".00000")})
.collect(Collectors.toList());

// Query and validate
List<Object[]> actual = sql("SELECT * FROM %s", tableName);
for (int i = 0; i < expected.size(); i++) {
System.out.println(actual.get(i)[0]);
assertEquals("Mismatch at row " + i, expected, actual);
}
}

private Table getTable(String name) {
return validationCatalog.loadTable(TableIdentifier.of("default", name));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void reset() {
}

this.importer = new CometSchemaImporter(new RootAllocator());
this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false);
this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, true, false);
this.initialized = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class CometConstantColumnReader<T> extends CometColumnReader {
super(field);
// use delegate to set constant value on the native side to be consumed by native execution.
setDelegate(
new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), false));
new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), true));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private static class DeleteColumnReader extends MetadataColumnReader {
DataTypes.BooleanType,
TypeUtil.convertToParquet(
new StructField("_deleted", DataTypes.BooleanType, false, Metadata.empty())),
false /* useDecimal128 = false */,
true /* useDecimal128 = true */,
false /* isConstant = false */);
this.isDeleted = new boolean[0];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private static class PositionColumnReader extends MetadataColumnReader {
super(
DataTypes.LongType,
descriptor,
false /* useDecimal128 = false */,
true /* useDecimal128 = true */,
false /* isConstant = false */);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.math.BigDecimal;
import java.nio.file.Files;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
Expand Down Expand Up @@ -172,6 +176,34 @@ public void showView() {
sql("DROP VIEW %s", "test");
}

@TestTemplate
public void testDecimalColumn() {
int rows = 100;
String tableName = tableName("decimal_table1");
// Create table with a single DECIMAL column
sql("CREATE TABLE IF NOT EXISTS %s (amount DECIMAL(17, 5)) USING iceberg", tableName);
sql("TRUNCATE TABLE %s", tableName);

// Build and execute the INSERT statement
sql(
"INSERT INTO %s VALUES %s",
tableName,
IntStream.range(0, rows).mapToObj(i -> "(" + i + ")").collect(Collectors.joining(",")));

// Build expected results
List<Object[]> expected =
IntStream.range(0, rows)
.mapToObj(i -> new Object[] {new BigDecimal(i + ".00000")})
.collect(Collectors.toList());

// Query and validate
List<Object[]> actual = sql("SELECT * FROM %s", tableName);
for (int i = 0; i < expected.size(); i++) {
System.out.println(actual.get(i)[0]);
assertEquals("Mismatch at row " + i, expected, actual);
}
}

private Table getTable(String name) {
return validationCatalog.loadTable(TableIdentifier.of("default", name));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void reset() {
}

this.importer = new CometSchemaImporter(new RootAllocator());
this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false);
this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, true, false);
this.initialized = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class CometConstantColumnReader<T> extends CometColumnReader {
super(field);
// use delegate to set constant value on the native side to be consumed by native execution.
setDelegate(
new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), false));
new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), true));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private static class DeleteColumnReader extends MetadataColumnReader {
DataTypes.BooleanType,
TypeUtil.convertToParquet(
new StructField("_deleted", DataTypes.BooleanType, false, Metadata.empty())),
false /* useDecimal128 = false */,
true /* useDecimal128 = true */,
false /* isConstant = false */);
this.isDeleted = new boolean[0];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private static class PositionColumnReader extends MetadataColumnReader {
super(
DataTypes.LongType,
descriptor,
false /* useDecimal128 = false */,
true /* useDecimal128 = true */,
false /* isConstant = false */);
}

Expand Down