From 41a8e959afd0989fe282db5dc898c4a46b6d040c Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Mon, 7 Sep 2020 12:10:22 +0200 Subject: [PATCH 1/5] Core: Fix Metrics serialization --- .../main/java/org/apache/iceberg/Metrics.java | 83 ++++++++++++ .../iceberg/TestMetricsSerialization.java | 123 ++++++++++++++++++ 2 files changed, 206 insertions(+) create mode 100644 api/src/test/java/org/apache/iceberg/TestMetricsSerialization.java diff --git a/api/src/main/java/org/apache/iceberg/Metrics.java b/api/src/main/java/org/apache/iceberg/Metrics.java index e4e0dbc64c0a..57a26c3ea8a9 100644 --- a/api/src/main/java/org/apache/iceberg/Metrics.java +++ b/api/src/main/java/org/apache/iceberg/Metrics.java @@ -19,9 +19,13 @@ package org.apache.iceberg; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; /** * Iceberg file format metrics. @@ -120,4 +124,83 @@ public Map lowerBounds() { public Map upperBounds() { return upperBounds; } + + /** + * Implemented the method to enable serialization of ByteBuffers. + * @param out The stream where to write + * @throws IOException On serialization error + */ + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeObject(rowCount); + out.writeObject(columnSizes); + out.writeObject(valueCounts); + out.writeObject(nullValueCounts); + + writeByteBufferMap(out, lowerBounds); + writeByteBufferMap(out, upperBounds); + } + + private static void writeByteBufferMap(ObjectOutputStream out, Map byteBufferMap) + throws IOException { + if (byteBufferMap == null) { + out.writeInt(-1); + + } else { + // Write the size + out.writeInt(byteBufferMap.size()); + + for (Map.Entry entry : byteBufferMap.entrySet()) { + // Write the key + out.writeObject(entry.getKey()); + + // Write the value + if (entry.getValue() != null) { + out.writeObject(entry.getValue().array()); + } else { + out.writeObject(null); + } + } + } + } + + /** + * Implemented the method to enable deserialization of ByteBuffers. + * @param in The stream to read from + * @throws IOException On serialization error + * @throws ClassNotFoundException If the class is not found + */ + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + rowCount = (Long) in.readObject(); + columnSizes = (Map) in.readObject(); + valueCounts = (Map) in.readObject(); + nullValueCounts = (Map) in.readObject(); + + lowerBounds = readByteBufferMap(in); + upperBounds = readByteBufferMap(in); + } + + private static Map readByteBufferMap(ObjectInputStream in) + throws IOException, ClassNotFoundException { + int size = in.readInt(); + + if (size == -1) { + return null; + + } else { + Map result = Maps.newHashMap(); + + for (int i = 0; i < size; ++i) { + Integer key = (Integer) in.readObject(); + byte[] data = (byte[]) in.readObject(); + + if (data != null) { + result.put(key, ByteBuffer.wrap(data)); + } else { + result.put(key, null); + } + } + + return result; + } + } } diff --git a/api/src/test/java/org/apache/iceberg/TestMetricsSerialization.java b/api/src/test/java/org/apache/iceberg/TestMetricsSerialization.java new file mode 100644 index 000000000000..79a0b64e6057 --- /dev/null +++ b/api/src/test/java/org/apache/iceberg/TestMetricsSerialization.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import org.junit.Assert; +import org.junit.Test; + +public class TestMetricsSerialization { + + @Test + public void testSerialization() throws IOException, ClassNotFoundException { + Metrics original = generateMetrics(); + + byte[] serialized = serialize(original); + Metrics result = deserialize(serialized); + + assertEquals(original, result); + } + + @Test + public void testSerializationWithNulls() throws IOException, ClassNotFoundException { + Metrics original = generateMetricsWithNulls(); + + byte[] serialized = serialize(original); + Metrics result = deserialize(serialized); + + assertEquals(original, result); + } + + private static byte[] serialize(Metrics metrics) throws IOException { + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) { + ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream); + objectOutputStream.writeObject(metrics); + + return byteArrayOutputStream.toByteArray(); + } + } + + private static Metrics deserialize(byte[] bytes) throws IOException, ClassNotFoundException { + try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes)) { + ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream); + + return (Metrics) objectInputStream.readObject(); + } + } + + private static Metrics generateMetrics() { + Map longMap1 = new HashMap<>(); + longMap1.put(1, 2L); + longMap1.put(3, 4L); + + Map longMap2 = new HashMap<>(); + longMap2.put(5, 6L); + + Map longMap3 = new HashMap<>(); + longMap3.put(7, 8L); + + Map byteMap1 = new HashMap<>(); + byteMap1.put(1, ByteBuffer.wrap(new byte[] {1, 2, 3})); + byteMap1.put(2, ByteBuffer.wrap(new byte[] {1, 2, 3, 4})); + + Map byteMap2 = new HashMap<>(); + byteMap1.put(3, ByteBuffer.wrap(new byte[] {1, 2})); + + return new Metrics(0L, longMap1, longMap2, longMap3, byteMap1, byteMap2); + } + + private static Metrics generateMetricsWithNulls() { + Map longMap = new HashMap<>(); + longMap.put(null, 1L); + longMap.put(2, null); + + Map byteMap = new HashMap<>(); + byteMap.put(null, ByteBuffer.wrap(new byte[] {1, 2, 3})); + byteMap.put(4, null); + + return new Metrics(null, null, longMap, longMap, null, byteMap); + } + + private static void assertEquals(Metrics expected, Metrics actual) { + Assert.assertEquals(expected.recordCount(), actual.recordCount()); + Assert.assertEquals(expected.columnSizes(), actual.columnSizes()); + Assert.assertEquals(expected.valueCounts(), actual.valueCounts()); + Assert.assertEquals(expected.nullValueCounts(), actual.nullValueCounts()); + + assertEquals(expected.lowerBounds(), actual.lowerBounds()); + assertEquals(expected.upperBounds(), actual.upperBounds()); + } + + private static void assertEquals(Map expected, Map actual) { + if (expected == null) { + Assert.assertNull(actual); + } else { + Assert.assertEquals(expected.size(), actual.size()); + expected.keySet().forEach(key -> Assert.assertEquals(expected.get(key), actual.get(key))); + } + } +} From 61a49ea813c47622cb7c666c8428c2acea3f39da Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Wed, 9 Sep 2020 13:30:26 +0200 Subject: [PATCH 2/5] Fix ByteBuffer serialization issue. --- api/src/main/java/org/apache/iceberg/Metrics.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/api/src/main/java/org/apache/iceberg/Metrics.java b/api/src/main/java/org/apache/iceberg/Metrics.java index 57a26c3ea8a9..bcd0c16bd598 100644 --- a/api/src/main/java/org/apache/iceberg/Metrics.java +++ b/api/src/main/java/org/apache/iceberg/Metrics.java @@ -155,7 +155,14 @@ private static void writeByteBufferMap(ObjectOutputStream out, Map Date: Wed, 9 Sep 2020 20:06:00 +0200 Subject: [PATCH 3/5] Removed guava dependency --- api/src/main/java/org/apache/iceberg/Metrics.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/Metrics.java b/api/src/main/java/org/apache/iceberg/Metrics.java index bcd0c16bd598..92d6f8d14301 100644 --- a/api/src/main/java/org/apache/iceberg/Metrics.java +++ b/api/src/main/java/org/apache/iceberg/Metrics.java @@ -24,8 +24,8 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.Map; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; /** * Iceberg file format metrics. @@ -194,7 +194,7 @@ private static Map readByteBufferMap(ObjectInputStream in) return null; } else { - Map result = Maps.newHashMap(); + Map result = new HashMap<>(size); for (int i = 0; i < size; ++i) { Integer key = (Integer) in.readObject(); From bc2ae6951a44f9798517c601350c936f057c6a07 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Mon, 14 Sep 2020 11:24:07 +0200 Subject: [PATCH 4/5] Used ByteBuffers.toByteArray to get the byte[] from the buffers. --- .../main/java/org/apache/iceberg/Metrics.java | 18 +++--------------- .../org/apache/iceberg/util/ByteBuffers.java | 0 2 files changed, 3 insertions(+), 15 deletions(-) rename {core => api}/src/main/java/org/apache/iceberg/util/ByteBuffers.java (100%) diff --git a/api/src/main/java/org/apache/iceberg/Metrics.java b/api/src/main/java/org/apache/iceberg/Metrics.java index 92d6f8d14301..52865d211a7d 100644 --- a/api/src/main/java/org/apache/iceberg/Metrics.java +++ b/api/src/main/java/org/apache/iceberg/Metrics.java @@ -26,6 +26,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; +import org.apache.iceberg.util.ByteBuffers; /** * Iceberg file format metrics. @@ -150,22 +151,9 @@ private static void writeByteBufferMap(ObjectOutputStream out, Map entry : byteBufferMap.entrySet()) { - // Write the key + // Write the key and the value converted to byte[] out.writeObject(entry.getKey()); - - // Write the value - if (entry.getValue() != null) { - // Copy the actual values from the buffer - ByteBuffer bb = entry.getValue(); - byte[] bytes = new byte[bb.remaining()]; - bb.get(bytes); - bb.position(bb.position() - bytes.length); // Restores the buffer position - - // Write out the data - out.writeObject(bytes); - } else { - out.writeObject(null); - } + out.writeObject(ByteBuffers.toByteArray(entry.getValue())); } } } diff --git a/core/src/main/java/org/apache/iceberg/util/ByteBuffers.java b/api/src/main/java/org/apache/iceberg/util/ByteBuffers.java similarity index 100% rename from core/src/main/java/org/apache/iceberg/util/ByteBuffers.java rename to api/src/main/java/org/apache/iceberg/util/ByteBuffers.java From e41675b841efbdaf1fdeddd34b6b95d0262c29e4 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Mon, 14 Sep 2020 11:31:29 +0200 Subject: [PATCH 5/5] Flushing the stream in the tests to prevent possible flakiness --- .../test/java/org/apache/iceberg/TestMetricsSerialization.java | 1 + 1 file changed, 1 insertion(+) diff --git a/api/src/test/java/org/apache/iceberg/TestMetricsSerialization.java b/api/src/test/java/org/apache/iceberg/TestMetricsSerialization.java index 79a0b64e6057..96d69682ebd6 100644 --- a/api/src/test/java/org/apache/iceberg/TestMetricsSerialization.java +++ b/api/src/test/java/org/apache/iceberg/TestMetricsSerialization.java @@ -56,6 +56,7 @@ private static byte[] serialize(Metrics metrics) throws IOException { try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) { ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream); objectOutputStream.writeObject(metrics); + objectOutputStream.flush(); return byteArrayOutputStream.toByteArray(); }