diff --git a/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java b/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java index bef6565fe708..9e9cd2fd85b2 100644 --- a/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java +++ b/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java @@ -21,12 +21,15 @@ import org.apache.paimon.annotation.Public; import org.apache.paimon.fs.FileIOLoader; import org.apache.paimon.fs.Path; +import org.apache.paimon.hadoop.SerializableConfiguration; import org.apache.paimon.options.Options; import org.apache.hadoop.conf.Configuration; import javax.annotation.Nullable; +import java.io.Serializable; + import static org.apache.paimon.options.CatalogOptions.WAREHOUSE; import static org.apache.paimon.utils.HadoopUtils.getHadoopConfiguration; import static org.apache.paimon.utils.Preconditions.checkNotNull; @@ -37,10 +40,12 @@ * @since 0.4.0 */ @Public -public class CatalogContext { +public class CatalogContext implements Serializable { + + private static final long serialVersionUID = 1L; private final Options options; - private final Configuration hadoopConf; + private final SerializableConfiguration hadoopConf; @Nullable private final FileIOLoader preferIOLoader; @Nullable private final FileIOLoader fallbackIOLoader; @@ -50,7 +55,9 @@ private CatalogContext( @Nullable FileIOLoader preferIOLoader, @Nullable FileIOLoader fallbackIOLoader) { this.options = checkNotNull(options); - this.hadoopConf = hadoopConf == null ? getHadoopConfiguration(options) : hadoopConf; + this.hadoopConf = + new SerializableConfiguration( + hadoopConf == null ? getHadoopConfiguration(options) : hadoopConf); this.preferIOLoader = preferIOLoader; this.fallbackIOLoader = fallbackIOLoader; } @@ -92,7 +99,7 @@ public Options options() { /** Return hadoop {@link Configuration}. */ public Configuration hadoopConf() { - return hadoopConf; + return hadoopConf.get(); } @Nullable diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogFactoryTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogFactoryTest.java index 744888586144..7f93b8d61c55 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogFactoryTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogFactoryTest.java @@ -22,6 +22,8 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.options.Options; import org.apache.paimon.table.CatalogTableType; +import org.apache.paimon.utils.HadoopUtilsITCase.TestFileIOLoader; +import org.apache.paimon.utils.InstantiationUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -84,4 +86,15 @@ public void testContextDefaultHadoopConf(@TempDir java.nio.file.Path path) { assertThat(conf.get("fs.defaultFS")).isEqualTo(defaultFS); assertThat(conf.get("dfs.replication")).isEqualTo(replication); } + + @Test + public void testContextSerializable() throws IOException, ClassNotFoundException { + Configuration conf = new Configuration(false); + conf.set("my_key", "my_value"); + CatalogContext context = + CatalogContext.create( + new Options(), conf, new TestFileIOLoader(), new TestFileIOLoader()); + context = InstantiationUtil.clone(context); + assertThat(context.hadoopConf().get("my_key")).isEqualTo(conf.get("my_key")); + } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/SerializableHadoopConfigWrapper.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/SerializableHadoopConfigWrapper.java deleted file mode 100644 index ae94fc3ac57d..000000000000 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/SerializableHadoopConfigWrapper.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.format.orc; - -import org.apache.paimon.io.DataInputDeserializer; -import org.apache.paimon.io.DataOutputSerializer; - -import org.apache.hadoop.conf.Configuration; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; - -import static org.apache.paimon.utils.Preconditions.checkNotNull; - -/** Utility class to make a {@link Configuration Hadoop Configuration} serializable. */ -public final class SerializableHadoopConfigWrapper implements Serializable { - - private static final long serialVersionUID = 1L; - - private transient Configuration hadoopConfig; - - public SerializableHadoopConfigWrapper(Configuration hadoopConfig) { - this.hadoopConfig = checkNotNull(hadoopConfig); - } - - public Configuration getHadoopConfig() { - return hadoopConfig; - } - - // ------------------------------------------------------------------------ - - private void writeObject(ObjectOutputStream out) throws IOException { - out.defaultWriteObject(); - - // we write the Hadoop config through a separate serializer to avoid cryptic exceptions when - // it - // corrupts the serialization stream - final DataOutputSerializer ser = new DataOutputSerializer(256); - hadoopConfig.write(ser); - out.writeInt(ser.length()); - out.write(ser.getSharedBuffer(), 0, ser.length()); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - - final byte[] data = new byte[in.readInt()]; - in.readFully(data); - final DataInputDeserializer deser = new DataInputDeserializer(data); - this.hadoopConfig = new Configuration(); - - try { - this.hadoopConfig.readFields(deser); - } catch (IOException e) { - throw new IOException( - "Could not deserialize Hadoop Configuration, the serialized and de-serialized don't match.", - e); - } - } -}