From 4ae0be0807628995676d6ee795d4eb1f7991e9fb Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Fri, 17 Jan 2025 11:14:51 +0800 Subject: [PATCH 1/2] [core] Make CatalogContext implements Serializable --- .../apache/paimon/catalog/CatalogContext.java | 15 +++++++++---- .../utils/SerializableHadoopConfig.java | 22 ++++++++----------- .../paimon/catalog/CatalogFactoryTest.java | 13 +++++++++++ 3 files changed, 33 insertions(+), 17 deletions(-) rename paimon-format/src/main/java/org/apache/paimon/format/orc/SerializableHadoopConfigWrapper.java => paimon-common/src/main/java/org/apache/paimon/utils/SerializableHadoopConfig.java (75%) diff --git a/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java b/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java index bef6565fe708..d8f50f199ecf 100644 --- a/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java +++ b/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java @@ -22,11 +22,14 @@ import org.apache.paimon.fs.FileIOLoader; import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; +import org.apache.paimon.utils.SerializableHadoopConfig; import org.apache.hadoop.conf.Configuration; import javax.annotation.Nullable; +import java.io.Serializable; + import static org.apache.paimon.options.CatalogOptions.WAREHOUSE; import static org.apache.paimon.utils.HadoopUtils.getHadoopConfiguration; import static org.apache.paimon.utils.Preconditions.checkNotNull; @@ -37,10 +40,12 @@ * @since 0.4.0 */ @Public -public class CatalogContext { +public class CatalogContext implements Serializable { + + private static final long serialVersionUID = 1L; private final Options options; - private final Configuration hadoopConf; + private final SerializableHadoopConfig hadoopConf; @Nullable private final FileIOLoader preferIOLoader; @Nullable private final FileIOLoader fallbackIOLoader; @@ -50,7 +55,9 @@ private CatalogContext( @Nullable FileIOLoader preferIOLoader, @Nullable FileIOLoader fallbackIOLoader) { this.options = checkNotNull(options); - this.hadoopConf = hadoopConf == null ? getHadoopConfiguration(options) : hadoopConf; + this.hadoopConf = + new SerializableHadoopConfig( + hadoopConf == null ? getHadoopConfiguration(options) : hadoopConf); this.preferIOLoader = preferIOLoader; this.fallbackIOLoader = fallbackIOLoader; } @@ -92,7 +99,7 @@ public Options options() { /** Return hadoop {@link Configuration}. */ public Configuration hadoopConf() { - return hadoopConf; + return hadoopConf.get(); } @Nullable diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/SerializableHadoopConfigWrapper.java b/paimon-common/src/main/java/org/apache/paimon/utils/SerializableHadoopConfig.java similarity index 75% rename from paimon-format/src/main/java/org/apache/paimon/format/orc/SerializableHadoopConfigWrapper.java rename to paimon-common/src/main/java/org/apache/paimon/utils/SerializableHadoopConfig.java index ae94fc3ac57d..4190bdac83a7 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/SerializableHadoopConfigWrapper.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/SerializableHadoopConfig.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.format.orc; +package org.apache.paimon.utils; import org.apache.paimon.io.DataInputDeserializer; import org.apache.paimon.io.DataOutputSerializer; @@ -31,17 +31,17 @@ import static org.apache.paimon.utils.Preconditions.checkNotNull; /** Utility class to make a {@link Configuration Hadoop Configuration} serializable. */ -public final class SerializableHadoopConfigWrapper implements Serializable { +public final class SerializableHadoopConfig implements Serializable { private static final long serialVersionUID = 1L; private transient Configuration hadoopConfig; - public SerializableHadoopConfigWrapper(Configuration hadoopConfig) { + public SerializableHadoopConfig(Configuration hadoopConfig) { this.hadoopConfig = checkNotNull(hadoopConfig); } - public Configuration getHadoopConfig() { + public Configuration get() { return hadoopConfig; } @@ -49,11 +49,7 @@ public Configuration getHadoopConfig() { private void writeObject(ObjectOutputStream out) throws IOException { out.defaultWriteObject(); - - // we write the Hadoop config through a separate serializer to avoid cryptic exceptions when - // it - // corrupts the serialization stream - final DataOutputSerializer ser = new DataOutputSerializer(256); + DataOutputSerializer ser = new DataOutputSerializer(256); hadoopConfig.write(ser); out.writeInt(ser.length()); out.write(ser.getSharedBuffer(), 0, ser.length()); @@ -62,13 +58,13 @@ private void writeObject(ObjectOutputStream out) throws IOException { private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); - final byte[] data = new byte[in.readInt()]; + byte[] data = new byte[in.readInt()]; in.readFully(data); - final DataInputDeserializer deser = new DataInputDeserializer(data); - this.hadoopConfig = new Configuration(); + DataInputDeserializer deserializer = new DataInputDeserializer(data); + this.hadoopConfig = new Configuration(false); try { - this.hadoopConfig.readFields(deser); + this.hadoopConfig.readFields(deserializer); } catch (IOException e) { throw new IOException( "Could not deserialize Hadoop Configuration, the serialized and de-serialized don't match.", diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogFactoryTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogFactoryTest.java index 744888586144..7f93b8d61c55 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogFactoryTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogFactoryTest.java @@ -22,6 +22,8 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.options.Options; import org.apache.paimon.table.CatalogTableType; +import org.apache.paimon.utils.HadoopUtilsITCase.TestFileIOLoader; +import org.apache.paimon.utils.InstantiationUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -84,4 +86,15 @@ public void testContextDefaultHadoopConf(@TempDir java.nio.file.Path path) { assertThat(conf.get("fs.defaultFS")).isEqualTo(defaultFS); assertThat(conf.get("dfs.replication")).isEqualTo(replication); } + + @Test + public void testContextSerializable() throws IOException, ClassNotFoundException { + Configuration conf = new Configuration(false); + conf.set("my_key", "my_value"); + CatalogContext context = + CatalogContext.create( + new Options(), conf, new TestFileIOLoader(), new TestFileIOLoader()); + context = InstantiationUtil.clone(context); + assertThat(context.hadoopConf().get("my_key")).isEqualTo(conf.get("my_key")); + } } From 156a5cba45086625e476bd8b11ad9cc2669a0cca Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Fri, 17 Jan 2025 13:12:19 +0800 Subject: [PATCH 2/2] fix comment --- .../apache/paimon/catalog/CatalogContext.java | 6 +- .../utils/SerializableHadoopConfig.java | 74 ------------------- 2 files changed, 3 insertions(+), 77 deletions(-) delete mode 100644 paimon-common/src/main/java/org/apache/paimon/utils/SerializableHadoopConfig.java diff --git a/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java b/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java index d8f50f199ecf..9e9cd2fd85b2 100644 --- a/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java +++ b/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java @@ -21,8 +21,8 @@ import org.apache.paimon.annotation.Public; import org.apache.paimon.fs.FileIOLoader; import org.apache.paimon.fs.Path; +import org.apache.paimon.hadoop.SerializableConfiguration; import org.apache.paimon.options.Options; -import org.apache.paimon.utils.SerializableHadoopConfig; import org.apache.hadoop.conf.Configuration; @@ -45,7 +45,7 @@ public class CatalogContext implements Serializable { private static final long serialVersionUID = 1L; private final Options options; - private final SerializableHadoopConfig hadoopConf; + private final SerializableConfiguration hadoopConf; @Nullable private final FileIOLoader preferIOLoader; @Nullable private final FileIOLoader fallbackIOLoader; @@ -56,7 +56,7 @@ private CatalogContext( @Nullable FileIOLoader fallbackIOLoader) { this.options = checkNotNull(options); this.hadoopConf = - new SerializableHadoopConfig( + new SerializableConfiguration( hadoopConf == null ? getHadoopConfiguration(options) : hadoopConf); this.preferIOLoader = preferIOLoader; this.fallbackIOLoader = fallbackIOLoader; diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/SerializableHadoopConfig.java b/paimon-common/src/main/java/org/apache/paimon/utils/SerializableHadoopConfig.java deleted file mode 100644 index 4190bdac83a7..000000000000 --- a/paimon-common/src/main/java/org/apache/paimon/utils/SerializableHadoopConfig.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.utils; - -import org.apache.paimon.io.DataInputDeserializer; -import org.apache.paimon.io.DataOutputSerializer; - -import org.apache.hadoop.conf.Configuration; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; - -import static org.apache.paimon.utils.Preconditions.checkNotNull; - -/** Utility class to make a {@link Configuration Hadoop Configuration} serializable. */ -public final class SerializableHadoopConfig implements Serializable { - - private static final long serialVersionUID = 1L; - - private transient Configuration hadoopConfig; - - public SerializableHadoopConfig(Configuration hadoopConfig) { - this.hadoopConfig = checkNotNull(hadoopConfig); - } - - public Configuration get() { - return hadoopConfig; - } - - // ------------------------------------------------------------------------ - - private void writeObject(ObjectOutputStream out) throws IOException { - out.defaultWriteObject(); - DataOutputSerializer ser = new DataOutputSerializer(256); - hadoopConfig.write(ser); - out.writeInt(ser.length()); - out.write(ser.getSharedBuffer(), 0, ser.length()); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - - byte[] data = new byte[in.readInt()]; - in.readFully(data); - DataInputDeserializer deserializer = new DataInputDeserializer(data); - this.hadoopConfig = new Configuration(false); - - try { - this.hadoopConfig.readFields(deserializer); - } catch (IOException e) { - throw new IOException( - "Could not deserialize Hadoop Configuration, the serialized and de-serialized don't match.", - e); - } - } -}