diff --git a/api/src/main/java/org/apache/iceberg/expressions/Literal.java b/api/src/main/java/org/apache/iceberg/expressions/Literal.java index 282351e1496e..5351b727c5fe 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/Literal.java +++ b/api/src/main/java/org/apache/iceberg/expressions/Literal.java @@ -71,6 +71,18 @@ static Literal of(BigDecimal value) { return new Literals.DecimalLiteral(value); } + static Literal ofDateLiteral(int value) { + return new Literals.DateLiteral(value); + } + + static Literal ofTimeLiteral(long value) { + return new Literals.TimeLiteral(value); + } + + static Literal ofTimestampLiteral(long value) { + return new Literals.TimestampLiteral(value); + } + /** * Returns the value wrapped by this literal. */ diff --git a/api/src/main/java/org/apache/iceberg/expressions/Literals.java b/api/src/main/java/org/apache/iceberg/expressions/Literals.java index 483c2a1a7a79..2613bc8303c5 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/Literals.java +++ b/api/src/main/java/org/apache/iceberg/expressions/Literals.java @@ -149,7 +149,6 @@ public boolean equals(Object other) { public int hashCode() { return Objects.hashCode(value); } - } private abstract static class ComparableLiteral> extends BaseLiteral { @@ -399,6 +398,8 @@ static class DateLiteral extends ComparableLiteral { public Literal to(Type type) { if (type.typeId() == Type.TypeID.DATE) { return (Literal) this; + } else if (type.typeId() == Type.TypeID.STRING) { + return (Literal) new StringLiteral(LocalDate.ofEpochDay(value()).format(DateTimeFormatter.ISO_LOCAL_DATE)); } return null; } @@ -419,6 +420,9 @@ static class TimeLiteral extends ComparableLiteral { public Literal to(Type type) { if (type.typeId() == Type.TypeID.TIME) { return (Literal) this; + } else if (type.typeId() == Type.TypeID.STRING) { + return (Literal) new StringLiteral(LocalTime.ofNanoOfDay(value() * 1000) + .format(DateTimeFormatter.ISO_LOCAL_TIME)); } return null; } @@ -443,6 +447,10 @@ public Literal to(Type type) { case DATE: return (Literal) new DateLiteral((int) ChronoUnit.DAYS.between( EPOCH_DAY, EPOCH.plus(value(), ChronoUnit.MICROS).toLocalDate())); + case STRING: + // Always return the literal without timezone. + return (Literal) new StringLiteral(LocalDateTime.ofEpochSecond(value() / 1000000, + (int) (value() % 1000000) * 1000, ZoneOffset.UTC).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)); default: } return null; diff --git a/api/src/main/java/org/apache/iceberg/types/PruneColumns.java b/api/src/main/java/org/apache/iceberg/types/PruneColumns.java index 2944ec7bb5c0..e129e083b796 100644 --- a/api/src/main/java/org/apache/iceberg/types/PruneColumns.java +++ b/api/src/main/java/org/apache/iceberg/types/PruneColumns.java @@ -68,10 +68,12 @@ public Type struct(Types.StructType struct, List fieldResults) { sameTypes = false; // signal that some types were altered if (field.isOptional()) { selectedFields.add( - Types.NestedField.optional(field.fieldId(), field.name(), projectedType, field.doc())); + Types.NestedField.optional(field.fieldId(), field.name(), projectedType, field.doc(), + field.initialDefaultValue(), field.writeDefaultValue())); } else { selectedFields.add( - Types.NestedField.required(field.fieldId(), field.name(), projectedType, field.doc())); + Types.NestedField.required(field.fieldId(), field.name(), projectedType, field.doc(), + field.initialDefaultValue(), field.writeDefaultValue())); } } } diff --git a/api/src/main/java/org/apache/iceberg/types/Types.java b/api/src/main/java/org/apache/iceberg/types/Types.java index 1b050725afd0..90b15124205b 100644 --- a/api/src/main/java/org/apache/iceberg/types/Types.java +++ b/api/src/main/java/org/apache/iceberg/types/Types.java @@ -415,27 +415,42 @@ public int hashCode() { public static class NestedField implements Serializable { public static NestedField optional(int id, String name, Type type) { - return new NestedField(true, id, name, type, null); + return new NestedField(true, id, name, type, null, null, null); } public static NestedField optional(int id, String name, Type type, String doc) { - return new NestedField(true, id, name, type, doc); + return new NestedField(true, id, name, type, doc, null, null); + } + + public static NestedField optional(int id, String name, Type type, String doc, + Object initialDefault, Object writeDefault) { + return new NestedField(true, id, name, type, doc, initialDefault, writeDefault); } public static NestedField required(int id, String name, Type type) { - return new NestedField(false, id, name, type, null); + return new NestedField(false, id, name, type, null, null, null); } public static NestedField required(int id, String name, Type type, String doc) { - return new NestedField(false, id, name, type, doc); + return new NestedField(false, id, name, type, doc, null, null); + } + + public static NestedField required(int id, String name, Type type, String doc, + Object initialDefault, Object writeDefault) { + return new NestedField(false, id, name, type, doc, initialDefault, writeDefault); } public static NestedField of(int id, boolean isOptional, String name, Type type) { - return new NestedField(isOptional, id, name, type, null); + return new NestedField(isOptional, id, name, type, null, null, null); } public static NestedField of(int id, boolean isOptional, String name, Type type, String doc) { - return new NestedField(isOptional, id, name, type, doc); + return new NestedField(isOptional, id, name, type, doc, null, null); + } + + public static NestedField of(int id, boolean isOptional, String name, Type type, String doc, + Object initialDefault, Object writeDefault) { + return new NestedField(isOptional, id, name, type, doc, initialDefault, writeDefault); } private final boolean isOptional; @@ -443,8 +458,12 @@ public static NestedField of(int id, boolean isOptional, String name, Type type, private final String name; private final Type type; private final String doc; + private final Object initialDefault; + private final Object writeDefault; + - private NestedField(boolean isOptional, int id, String name, Type type, String doc) { + private NestedField(boolean isOptional, int id, String name, Type type, String doc, + Object initialDefault, Object writeDefault) { Preconditions.checkNotNull(name, "Name cannot be null"); Preconditions.checkNotNull(type, "Type cannot be null"); this.isOptional = isOptional; @@ -452,6 +471,8 @@ private NestedField(boolean isOptional, int id, String name, Type type, String d this.name = name; this.type = type; this.doc = doc; + this.initialDefault = initialDefault; + this.writeDefault = writeDefault; } public boolean isOptional() { @@ -462,7 +483,7 @@ public NestedField asOptional() { if (isOptional) { return this; } - return new NestedField(true, id, name, type, doc); + return new NestedField(true, id, name, type, doc, initialDefault, writeDefault); } public boolean isRequired() { @@ -473,7 +494,11 @@ public NestedField asRequired() { if (!isOptional) { return this; } - return new NestedField(false, id, name, type, doc); + return new NestedField(false, id, name, type, doc, initialDefault, writeDefault); + } + + public NestedField updateWriteDefault(Object newWriteDefault) { + return new NestedField(isOptional, id, name, type, doc, initialDefault, newWriteDefault); } public int fieldId() { @@ -492,6 +517,14 @@ public String doc() { return doc; } + public Object initialDefaultValue() { + return initialDefault; + } + + public Object writeDefaultValue() { + return writeDefault; + } + @Override public String toString() { return String.format("%d: %s: %s %s", diff --git a/core/src/main/java/org/apache/iceberg/DefaultValueParser.java b/core/src/main/java/org/apache/iceberg/DefaultValueParser.java new file mode 100644 index 000000000000..14b9f11be6ea --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/DefaultValueParser.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.databind.ser.std.ByteBufferSerializer; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.io.BaseEncoding; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +public class DefaultValueParser { + private DefaultValueParser() { + } + + private static final JsonFactory FACTORY = new JsonFactory(); + private static final ObjectMapper MAPPER; + + static { + MAPPER = new ObjectMapper(FACTORY); + SimpleModule customModule = new SimpleModule(); + customModule.addSerializer(ByteBuffer.class, new HexStringCustomByteBufferSerializer()); + MAPPER.registerModule(customModule); + } + + public static ObjectMapper mapper() { + return MAPPER; + } + + public static Object parseDefaultFromJson(Type type, JsonNode jsonNode) { + if (jsonNode == null) { + return null; + } + + switch (type.typeId()) { + case BOOLEAN: + return jsonNode.booleanValue(); + case INTEGER: + return jsonNode.intValue(); + case LONG: + return jsonNode.longValue(); + case FLOAT: + return jsonNode.floatValue(); + case DOUBLE: + return jsonNode.doubleValue(); + case DECIMAL: + return jsonNode.decimalValue(); + case STRING: + case UUID: + return jsonNode.textValue(); + case DATE: + case TIME: + case TIMESTAMP: + return Literal.of(jsonNode.textValue()).to(type).value(); + case FIXED: + byte[] fixedBytes = BaseEncoding.base16().decode(jsonNode.textValue().toUpperCase(Locale.ROOT).replaceFirst( + "^0X", + "")); + return ByteBuffer.allocate(((Types.FixedType) type).length()).put(fixedBytes); + case BINARY: + byte[] binaryBytes = BaseEncoding.base16().decode(jsonNode.textValue().toUpperCase(Locale.ROOT).replaceFirst( + "^0X", "")); + return ByteBuffer.wrap(binaryBytes); + case LIST: + List defaultList = Lists.newArrayList(); + for (JsonNode element : jsonNode) { + defaultList.add(parseDefaultFromJson(type.asListType().elementType(), element)); + } + return defaultList; + case MAP: + Map defaultMap = Maps.newHashMap(); + List keysAndValues = StreamSupport + .stream(jsonNode.spliterator(), false) + .collect(Collectors.toList()); + JsonNode keys = keysAndValues.get(0); + JsonNode values = keysAndValues.get(1); + + List keyList = Lists.newArrayList(keys.iterator()); + List valueList = Lists.newArrayList(values.iterator()); + + for (int i = 0; i < keyList.size(); i++) { + defaultMap.put( + parseDefaultFromJson(type.asMapType().keyType(), keyList.get(i)), + parseDefaultFromJson(type.asMapType().valueType(), valueList.get(i))); + } + return defaultMap; + case STRUCT: + Map defaultStruct = Maps.newHashMap(); + for (Types.NestedField subField : type.asStructType().fields()) { + String fieldIdAsString = String.valueOf(subField.fieldId()); + Object value = jsonNode.has(fieldIdAsString) ? parseDefaultFromJson( + subField.type(), + jsonNode.get(fieldIdAsString)) : null; + if (value != null) { + defaultStruct.put(subField.fieldId(), value); + } + } + return defaultStruct; + default: + return null; + } + } + + public static JsonNode validateDefault(String name, Type type, JsonNode defaultValue) { + if (defaultValue != null && !isValidDefault(type, defaultValue)) { + throw new ValidationException("Invalid default value for field %s: %s not a %s", name, defaultValue, type); + } + return defaultValue; + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + public static boolean isValidDefault(Type type, JsonNode defaultValue) { + if (defaultValue == null) { + return false; + } + switch (type.typeId()) { + case BOOLEAN: + return defaultValue.isBoolean(); + case INTEGER: + return defaultValue.isIntegralNumber() && defaultValue.canConvertToInt(); + case LONG: + return defaultValue.isIntegralNumber() && defaultValue.canConvertToLong(); + case FLOAT: + case DOUBLE: + case DECIMAL: + return defaultValue.isNumber(); + case STRING: + case UUID: + case DATE: + case TIME: + case TIMESTAMP: + return defaultValue.isTextual(); + case FIXED: + case BINARY: + return defaultValue.isTextual() && + (defaultValue.textValue().startsWith("0x") || defaultValue.textValue().startsWith("0X")); + case LIST: + if (!defaultValue.isArray()) { + return false; + } + for (JsonNode element : defaultValue) { + if (!isValidDefault(type.asListType().elementType(), element)) { + return false; + } + } + return true; + case MAP: + if (!defaultValue.isArray()) { + return false; + } + List keysAndValues = StreamSupport + .stream(defaultValue.spliterator(), false) + .collect(Collectors.toList()); + if (keysAndValues.size() != 2) { + return false; + } + JsonNode keys = keysAndValues.get(0); + JsonNode values = keysAndValues.get(1); + if (!keys.isArray() || !values.isArray()) { + return false; + } + List keyList = Lists.newArrayList(keys.iterator()); + List valueList = Lists.newArrayList(values.iterator()); + if (keyList.size() != valueList.size()) { + return false; + } + for (int i = 0; i < keyList.size(); i++) { + if (!isValidDefault(type.asMapType().keyType(), keyList.get(i)) || + !isValidDefault(type.asMapType().valueType(), valueList.get(i))) { + return false; + } + } + return true; + case STRUCT: + if (!defaultValue.isObject()) { + return false; + } + for (Types.NestedField subType : type.asStructType().fields()) { + String fieldId = String.valueOf(subType.fieldId()); + if (!isValidDefault(subType.type(), defaultValue.has(fieldId) ? defaultValue.get(fieldId) : null)) { + return false; + } + } + return true; + default: + return false; + } + } + + private static class HexStringCustomByteBufferSerializer extends ByteBufferSerializer { + @Override + public void serialize(ByteBuffer bbuf, JsonGenerator gen, SerializerProvider provider) throws IOException { + // The ByteBuffer should always wrap an array from how it's constructed during deserialization + Preconditions.checkState(bbuf.hasArray()); + gen.writeString("0X" + BaseEncoding.base16().encode(bbuf.array())); + } + } + + public static Object unparseJavaDefaultValue(Type type, Object value) { + switch (type.typeId()) { + case DATE: + return Literal.ofDateLiteral((int) value).to(Types.StringType.get()).value(); + case TIME: + return Literal.ofTimeLiteral((long) value).to(Types.StringType.get()).value(); + case TIMESTAMP: + String localDateTime = (String) Literal.ofTimestampLiteral((long) value).to(Types.StringType.get()).value(); + if (((Types.TimestampType) type).shouldAdjustToUTC()) { + return LocalDateTime.parse(localDateTime, DateTimeFormatter.ISO_LOCAL_DATE_TIME) + .atOffset(ZoneOffset.UTC) + .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME); + } + return localDateTime; + default: + return value; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/SchemaParser.java b/core/src/main/java/org/apache/iceberg/SchemaParser.java index 0a4f6f389868..a3f8966e1834 100644 --- a/core/src/main/java/org/apache/iceberg/SchemaParser.java +++ b/core/src/main/java/org/apache/iceberg/SchemaParser.java @@ -37,9 +37,6 @@ public class SchemaParser { - private SchemaParser() { - } - private static final String SCHEMA_ID = "schema-id"; private static final String IDENTIFIER_FIELD_IDS = "identifier-field-ids"; private static final String TYPE = "type"; @@ -59,13 +56,22 @@ private SchemaParser() { private static final String REQUIRED = "required"; private static final String ELEMENT_REQUIRED = "element-required"; private static final String VALUE_REQUIRED = "value-required"; + private static final String INITIAL_DEFAULT = "initial-default"; + private static final String WRITE_DEFAULT = "write-default"; + private static final Cache SCHEMA_CACHE = Caffeine.newBuilder() + .weakValues() + .build(); + + private SchemaParser() { + } private static void toJson(Types.StructType struct, JsonGenerator generator) throws IOException { toJson(struct, null, null, generator); } - private static void toJson(Types.StructType struct, Integer schemaId, Set identifierFieldIds, - JsonGenerator generator) throws IOException { + private static void toJson( + Types.StructType struct, Integer schemaId, Set identifierFieldIds, + JsonGenerator generator) throws IOException { generator.writeStartObject(); generator.writeStringField(TYPE, STRUCT); @@ -92,6 +98,20 @@ private static void toJson(Types.StructType struct, Integer schemaId, Set SCHEMA_CACHE = Caffeine.newBuilder() - .weakValues() - .build(); - public static Schema fromJson(String json) { return SCHEMA_CACHE.get(json, jsonKey -> { try { diff --git a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java index 249b07992ce6..7e6153af7d43 100644 --- a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java @@ -115,6 +115,17 @@ public static String getStringOrNull(String property, JsonNode node) { return pNode.asText(); } + public static JsonNode getChildNodeOrNull(String property, JsonNode node) { + if (!node.has(property)) { + return null; + } + JsonNode pNode = node.get(property); + if (pNode != null && pNode.isNull()) { + return null; + } + return pNode; + } + public static Map getStringMap(String property, JsonNode node) { Preconditions.checkArgument(node.has(property), "Cannot parse missing map %s", property); JsonNode pNode = node.get(property); diff --git a/core/src/test/java/org/apache/iceberg/TestDefaultValuesValidationAndParsing.java b/core/src/test/java/org/apache/iceberg/TestDefaultValuesValidationAndParsing.java new file mode 100644 index 000000000000..7eac4548660a --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestDefaultValuesValidationAndParsing.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Arrays; +import java.util.Collection; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +@RunWith(Parameterized.class) +public class TestDefaultValuesValidationAndParsing { + + private final Type type; + private final JsonNode defaultValue; + + public TestDefaultValuesValidationAndParsing(Type type, JsonNode defaultValue) { + this.type = type; + this.defaultValue = defaultValue; + } + + @Parameterized.Parameters + public static Collection data() { + return Arrays.asList(new Object[][] { + {Types.BooleanType.get(), stringToJsonNode("true")}, + {Types.IntegerType.get(), stringToJsonNode("1")}, + {Types.LongType.get(), stringToJsonNode("9999999")}, + {Types.FloatType.get(), stringToJsonNode("1.23")}, + {Types.DoubleType.get(), stringToJsonNode("123.456")}, + {Types.DateType.get(), stringToJsonNode("\"2007-12-03\"")}, + {Types.TimeType.get(), stringToJsonNode("\"10:15:30\"")}, + {Types.TimestampType.withoutZone(), stringToJsonNode("\"2007-12-03T10:15:30\"")}, + {Types.TimestampType.withZone(), stringToJsonNode("\"2007-12-03T10:15:30+01:00\"")}, + {Types.StringType.get(), stringToJsonNode("\"foo\"")}, + {Types.UUIDType.get(), stringToJsonNode("\"eb26bdb1-a1d8-4aa6-990e-da940875492c\"")}, + {Types.FixedType.ofLength(2), stringToJsonNode("\"0x111f\"")}, + {Types.BinaryType.get(), stringToJsonNode("\"0x0000ff\"")}, + {Types.DecimalType.of(9, 2), stringToJsonNode("123.45")}, + {Types.ListType.ofOptional(1, Types.IntegerType.get()), stringToJsonNode("[1, 2, 3]")}, + {Types.MapType.ofOptional(1, 2, Types.IntegerType.get(), Types.StringType.get()), + stringToJsonNode("[[1,2], [\"foo\", \"bar\"]]")}, + {Types.StructType.of( + required(1, "f1", Types.IntegerType.get(), "doc"), + optional(2, "f2", Types.StringType.get(), "doc")), + stringToJsonNode("{\"1\": 1, \"2\": \"bar\"}")} + }); + } + + private static JsonNode stringToJsonNode(String json) { + try { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readTree(json); + } catch (JsonProcessingException e) { + System.out.println("Failed to parse: " + json + "; reason: " + e.getMessage()); + throw new RuntimeException(e.getMessage()); + } + } + + // serialize to json and deserialize back should return the same result + private static Schema jsonRoundTrip(Schema schema) { + return SchemaParser.fromJson(SchemaParser.toJson(schema)); + } + + @Test + public void testTypeWithDefaultValue() { + Assert.assertTrue(DefaultValueParser.isValidDefault(type, defaultValue)); + Object javaDefaultValue = DefaultValueParser.parseDefaultFromJson(type, defaultValue); + + Schema schema = new Schema(Types.NestedField.required(999, "root", type, "doc", javaDefaultValue, + javaDefaultValue)); + Assert.assertTrue(schema.sameSchema(jsonRoundTrip(schema))); + } +}