diff --git a/api/src/main/java/org/apache/iceberg/Metrics.java b/api/src/main/java/org/apache/iceberg/Metrics.java index e4e0dbc64c0a..52865d211a7d 100644 --- a/api/src/main/java/org/apache/iceberg/Metrics.java +++ b/api/src/main/java/org/apache/iceberg/Metrics.java @@ -19,9 +19,14 @@ package org.apache.iceberg; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.Serializable; import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.Map; +import org.apache.iceberg.util.ByteBuffers; /** * Iceberg file format metrics. @@ -120,4 +125,77 @@ public Map lowerBounds() { public Map upperBounds() { return upperBounds; } + + /** + * Implemented the method to enable serialization of ByteBuffers. + * @param out The stream where to write + * @throws IOException On serialization error + */ + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeObject(rowCount); + out.writeObject(columnSizes); + out.writeObject(valueCounts); + out.writeObject(nullValueCounts); + + writeByteBufferMap(out, lowerBounds); + writeByteBufferMap(out, upperBounds); + } + + private static void writeByteBufferMap(ObjectOutputStream out, Map byteBufferMap) + throws IOException { + if (byteBufferMap == null) { + out.writeInt(-1); + + } else { + // Write the size + out.writeInt(byteBufferMap.size()); + + for (Map.Entry entry : byteBufferMap.entrySet()) { + // Write the key and the value converted to byte[] + out.writeObject(entry.getKey()); + out.writeObject(ByteBuffers.toByteArray(entry.getValue())); + } + } + } + + /** + * Implemented the method to enable deserialization of ByteBuffers. + * @param in The stream to read from + * @throws IOException On serialization error + * @throws ClassNotFoundException If the class is not found + */ + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + rowCount = (Long) in.readObject(); + columnSizes = (Map) in.readObject(); + valueCounts = (Map) in.readObject(); + nullValueCounts = (Map) in.readObject(); + + lowerBounds = readByteBufferMap(in); + upperBounds = readByteBufferMap(in); + } + + private static Map readByteBufferMap(ObjectInputStream in) + throws IOException, ClassNotFoundException { + int size = in.readInt(); + + if (size == -1) { + return null; + + } else { + Map result = new HashMap<>(size); + + for (int i = 0; i < size; ++i) { + Integer key = (Integer) in.readObject(); + byte[] data = (byte[]) in.readObject(); + + if (data != null) { + result.put(key, ByteBuffer.wrap(data)); + } else { + result.put(key, null); + } + } + + return result; + } + } } diff --git a/core/src/main/java/org/apache/iceberg/util/ByteBuffers.java b/api/src/main/java/org/apache/iceberg/util/ByteBuffers.java similarity index 100% rename from core/src/main/java/org/apache/iceberg/util/ByteBuffers.java rename to api/src/main/java/org/apache/iceberg/util/ByteBuffers.java diff --git a/api/src/test/java/org/apache/iceberg/TestMetricsSerialization.java b/api/src/test/java/org/apache/iceberg/TestMetricsSerialization.java new file mode 100644 index 000000000000..96d69682ebd6 --- /dev/null +++ b/api/src/test/java/org/apache/iceberg/TestMetricsSerialization.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import org.junit.Assert; +import org.junit.Test; + +public class TestMetricsSerialization { + + @Test + public void testSerialization() throws IOException, ClassNotFoundException { + Metrics original = generateMetrics(); + + byte[] serialized = serialize(original); + Metrics result = deserialize(serialized); + + assertEquals(original, result); + } + + @Test + public void testSerializationWithNulls() throws IOException, ClassNotFoundException { + Metrics original = generateMetricsWithNulls(); + + byte[] serialized = serialize(original); + Metrics result = deserialize(serialized); + + assertEquals(original, result); + } + + private static byte[] serialize(Metrics metrics) throws IOException { + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) { + ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream); + objectOutputStream.writeObject(metrics); + objectOutputStream.flush(); + + return byteArrayOutputStream.toByteArray(); + } + } + + private static Metrics deserialize(byte[] bytes) throws IOException, ClassNotFoundException { + try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes)) { + ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream); + + return (Metrics) objectInputStream.readObject(); + } + } + + private static Metrics generateMetrics() { + Map longMap1 = new HashMap<>(); + longMap1.put(1, 2L); + longMap1.put(3, 4L); + + Map longMap2 = new HashMap<>(); + longMap2.put(5, 6L); + + Map longMap3 = new HashMap<>(); + longMap3.put(7, 8L); + + Map byteMap1 = new HashMap<>(); + byteMap1.put(1, ByteBuffer.wrap(new byte[] {1, 2, 3})); + byteMap1.put(2, ByteBuffer.wrap(new byte[] {1, 2, 3, 4})); + + Map byteMap2 = new HashMap<>(); + byteMap1.put(3, ByteBuffer.wrap(new byte[] {1, 2})); + + return new Metrics(0L, longMap1, longMap2, longMap3, byteMap1, byteMap2); + } + + private static Metrics generateMetricsWithNulls() { + Map longMap = new HashMap<>(); + longMap.put(null, 1L); + longMap.put(2, null); + + Map byteMap = new HashMap<>(); + byteMap.put(null, ByteBuffer.wrap(new byte[] {1, 2, 3})); + byteMap.put(4, null); + + return new Metrics(null, null, longMap, longMap, null, byteMap); + } + + private static void assertEquals(Metrics expected, Metrics actual) { + Assert.assertEquals(expected.recordCount(), actual.recordCount()); + Assert.assertEquals(expected.columnSizes(), actual.columnSizes()); + Assert.assertEquals(expected.valueCounts(), actual.valueCounts()); + Assert.assertEquals(expected.nullValueCounts(), actual.nullValueCounts()); + + assertEquals(expected.lowerBounds(), actual.lowerBounds()); + assertEquals(expected.upperBounds(), actual.upperBounds()); + } + + private static void assertEquals(Map expected, Map actual) { + if (expected == null) { + Assert.assertNull(actual); + } else { + Assert.assertEquals(expected.size(), actual.size()); + expected.keySet().forEach(key -> Assert.assertEquals(expected.get(key), actual.get(key))); + } + } +}