diff --git a/spark/v3.4/spark-runtime/src/integration/java/org/apache/iceberg/spark/TestRoundTrip.java b/spark/v3.4/spark-runtime/src/integration/java/org/apache/iceberg/spark/TestRoundTrip.java index 29f725615a21..382ee5f0c8df 100644 --- a/spark/v3.4/spark-runtime/src/integration/java/org/apache/iceberg/spark/TestRoundTrip.java +++ b/spark/v3.4/spark-runtime/src/integration/java/org/apache/iceberg/spark/TestRoundTrip.java @@ -21,7 +21,11 @@ import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; +import java.math.BigDecimal; import java.nio.file.Files; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; @@ -172,6 +176,34 @@ public void showView() { sql("DROP VIEW %s", "test"); } + @TestTemplate + public void testDecimalColumn() { + int rows = 100; + String tableName = tableName("decimal_table1"); + // Create table with a single DECIMAL column + sql("CREATE TABLE IF NOT EXISTS %s (amount DECIMAL(17, 5)) USING iceberg", tableName); + sql("TRUNCATE TABLE %s", tableName); + + // Build and execute the INSERT statement + sql( + "INSERT INTO %s VALUES %s", + tableName, + IntStream.range(0, rows).mapToObj(i -> "(" + i + ")").collect(Collectors.joining(","))); + + // Build expected results + List expected = + IntStream.range(0, rows) + .mapToObj(i -> new Object[] {new BigDecimal(i + ".00000")}) + .collect(Collectors.toList()); + + // Query and validate + List actual = sql("SELECT * FROM %s", tableName); + for (int i = 0; i < expected.size(); i++) { + System.out.println(actual.get(i)[0]); + assertEquals("Mismatch at row " + i, expected, actual); + } + } + private Table getTable(String name) { return validationCatalog.loadTable(TableIdentifier.of("default", name)); } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java index 81b7d83a7077..f6c6c95380c9 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java @@ -92,7 +92,7 @@ public void reset() { } this.importer = new CometSchemaImporter(new RootAllocator()); - this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false); + this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, true, false); this.initialized = true; } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java index c665002e8f66..2b8dd3d6049e 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java @@ -34,7 +34,7 @@ class CometConstantColumnReader extends CometColumnReader { super(field); // use delegate to set constant value on the native side to be consumed by native execution. setDelegate( - new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), false)); + new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), true)); } @Override diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java index 4a28fc51da9b..7e94b1790a09 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java @@ -53,7 +53,7 @@ private static class DeleteColumnReader extends MetadataColumnReader { DataTypes.BooleanType, TypeUtil.convertToParquet( new StructField("_deleted", DataTypes.BooleanType, false, Metadata.empty())), - false /* useDecimal128 = false */, + true /* useDecimal128 = true */, false /* isConstant */); this.isDeleted = new boolean[0]; } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java index 1949a717982a..256b3a6c83f8 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java @@ -43,10 +43,7 @@ private static class PositionColumnReader extends MetadataColumnReader { PositionColumnReader(ColumnDescriptor descriptor) { super( - DataTypes.LongType, - descriptor, - false /* useDecimal128 = false */, - false /* isConstant */); + DataTypes.LongType, descriptor, true /* useDecimal128 = true */, false /* isConstant */); } @Override diff --git a/spark/v3.5/spark-runtime/src/integration/java/org/apache/iceberg/spark/TestRoundTrip.java b/spark/v3.5/spark-runtime/src/integration/java/org/apache/iceberg/spark/TestRoundTrip.java index 29f725615a21..382ee5f0c8df 100644 --- a/spark/v3.5/spark-runtime/src/integration/java/org/apache/iceberg/spark/TestRoundTrip.java +++ b/spark/v3.5/spark-runtime/src/integration/java/org/apache/iceberg/spark/TestRoundTrip.java @@ -21,7 +21,11 @@ import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; +import java.math.BigDecimal; import java.nio.file.Files; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; @@ -172,6 +176,34 @@ public void showView() { sql("DROP VIEW %s", "test"); } + @TestTemplate + public void testDecimalColumn() { + int rows = 100; + String tableName = tableName("decimal_table1"); + // Create table with a single DECIMAL column + sql("CREATE TABLE IF NOT EXISTS %s (amount DECIMAL(17, 5)) USING iceberg", tableName); + sql("TRUNCATE TABLE %s", tableName); + + // Build and execute the INSERT statement + sql( + "INSERT INTO %s VALUES %s", + tableName, + IntStream.range(0, rows).mapToObj(i -> "(" + i + ")").collect(Collectors.joining(","))); + + // Build expected results + List expected = + IntStream.range(0, rows) + .mapToObj(i -> new Object[] {new BigDecimal(i + ".00000")}) + .collect(Collectors.toList()); + + // Query and validate + List actual = sql("SELECT * FROM %s", tableName); + for (int i = 0; i < expected.size(); i++) { + System.out.println(actual.get(i)[0]); + assertEquals("Mismatch at row " + i, expected, actual); + } + } + private Table getTable(String name) { return validationCatalog.loadTable(TableIdentifier.of("default", name)); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java index 81b7d83a7077..f6c6c95380c9 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java @@ -92,7 +92,7 @@ public void reset() { } this.importer = new CometSchemaImporter(new RootAllocator()); - this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false); + this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, true, false); this.initialized = true; } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java index 047c96314b13..d84ce62b76ac 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java @@ -34,7 +34,7 @@ class CometConstantColumnReader extends CometColumnReader { super(field); // use delegate to set constant value on the native side to be consumed by native execution. setDelegate( - new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), false)); + new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), true)); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java index 6235bfe4865e..34f579f4f120 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java @@ -53,7 +53,7 @@ private static class DeleteColumnReader extends MetadataColumnReader { DataTypes.BooleanType, TypeUtil.convertToParquet( new StructField("_deleted", DataTypes.BooleanType, false, Metadata.empty())), - false /* useDecimal128 = false */, + true /* useDecimal128 = true */, false /* isConstant = false */); this.isDeleted = new boolean[0]; } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java index bcc0e514c28d..a779bed384bf 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java @@ -45,7 +45,7 @@ private static class PositionColumnReader extends MetadataColumnReader { super( DataTypes.LongType, descriptor, - false /* useDecimal128 = false */, + true /* useDecimal128 = true */, false /* isConstant = false */); } diff --git a/spark/v4.0/spark-runtime/src/integration/java/org/apache/iceberg/spark/TestRoundTrip.java b/spark/v4.0/spark-runtime/src/integration/java/org/apache/iceberg/spark/TestRoundTrip.java index 29f725615a21..382ee5f0c8df 100644 --- a/spark/v4.0/spark-runtime/src/integration/java/org/apache/iceberg/spark/TestRoundTrip.java +++ b/spark/v4.0/spark-runtime/src/integration/java/org/apache/iceberg/spark/TestRoundTrip.java @@ -21,7 +21,11 @@ import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; +import java.math.BigDecimal; import java.nio.file.Files; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; @@ -172,6 +176,34 @@ public void showView() { sql("DROP VIEW %s", "test"); } + @TestTemplate + public void testDecimalColumn() { + int rows = 100; + String tableName = tableName("decimal_table1"); + // Create table with a single DECIMAL column + sql("CREATE TABLE IF NOT EXISTS %s (amount DECIMAL(17, 5)) USING iceberg", tableName); + sql("TRUNCATE TABLE %s", tableName); + + // Build and execute the INSERT statement + sql( + "INSERT INTO %s VALUES %s", + tableName, + IntStream.range(0, rows).mapToObj(i -> "(" + i + ")").collect(Collectors.joining(","))); + + // Build expected results + List expected = + IntStream.range(0, rows) + .mapToObj(i -> new Object[] {new BigDecimal(i + ".00000")}) + .collect(Collectors.toList()); + + // Query and validate + List actual = sql("SELECT * FROM %s", tableName); + for (int i = 0; i < expected.size(); i++) { + System.out.println(actual.get(i)[0]); + assertEquals("Mismatch at row " + i, expected, actual); + } + } + private Table getTable(String name) { return validationCatalog.loadTable(TableIdentifier.of("default", name)); } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java index 81b7d83a7077..f6c6c95380c9 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java @@ -92,7 +92,7 @@ public void reset() { } this.importer = new CometSchemaImporter(new RootAllocator()); - this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false); + this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, true, false); this.initialized = true; } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java index 047c96314b13..d84ce62b76ac 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java @@ -34,7 +34,7 @@ class CometConstantColumnReader extends CometColumnReader { super(field); // use delegate to set constant value on the native side to be consumed by native execution. setDelegate( - new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), false)); + new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), true)); } @Override diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java index 6235bfe4865e..34f579f4f120 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java @@ -53,7 +53,7 @@ private static class DeleteColumnReader extends MetadataColumnReader { DataTypes.BooleanType, TypeUtil.convertToParquet( new StructField("_deleted", DataTypes.BooleanType, false, Metadata.empty())), - false /* useDecimal128 = false */, + true /* useDecimal128 = true */, false /* isConstant = false */); this.isDeleted = new boolean[0]; } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java index bcc0e514c28d..a779bed384bf 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java @@ -45,7 +45,7 @@ private static class PositionColumnReader extends MetadataColumnReader { super( DataTypes.LongType, descriptor, - false /* useDecimal128 = false */, + true /* useDecimal128 = true */, false /* isConstant = false */); }